You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/21 07:48:23 UTC

[15/51] [partial] incubator-nifi git commit: NIFI-270 Made all changes identified by adam, mark, joey to prep for a cleaner build

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
deleted file mode 100644
index 511bb7d..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ /dev/null
@@ -1,3620 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.StreamingOutput;
-import javax.xml.XMLConstants;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import javax.xml.validation.Validator;
-
-import org.apache.nifi.admin.service.AuditService;
-import org.apache.nifi.cluster.BulletinsPayload;
-import org.apache.nifi.cluster.ClusterNodeInformation;
-import org.apache.nifi.cluster.HeartbeatPayload;
-import org.apache.nifi.cluster.NodeInformation;
-import org.apache.nifi.cluster.context.ClusterContext;
-import org.apache.nifi.cluster.context.ClusterContextImpl;
-import org.apache.nifi.cluster.event.Event;
-import org.apache.nifi.cluster.event.EventManager;
-import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
-import org.apache.nifi.cluster.flow.ClusterDataFlow;
-import org.apache.nifi.cluster.flow.DaoException;
-import org.apache.nifi.cluster.flow.DataFlowManagementService;
-import org.apache.nifi.cluster.flow.PersistedFlowState;
-import org.apache.nifi.cluster.manager.HttpClusterManager;
-import org.apache.nifi.cluster.manager.HttpRequestReplicator;
-import org.apache.nifi.cluster.manager.HttpResponseMapper;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
-import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
-import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
-import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
-import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
-import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
-import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
-import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
-import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException;
-import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
-import org.apache.nifi.cluster.manager.exception.NodeReconnectionException;
-import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
-import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException;
-import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
-import org.apache.nifi.cluster.manager.exception.UriConstructionException;
-import org.apache.nifi.cluster.node.Node;
-import org.apache.nifi.cluster.node.Node.Status;
-import org.apache.nifi.cluster.protocol.ConnectionRequest;
-import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeBulletins;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
-import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
-import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.Heartbeater;
-import org.apache.nifi.controller.ReportingTaskNode;
-import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
-import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
-import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
-import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
-import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
-import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.controller.service.StandardControllerServiceProvider;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.history.ComponentStatusRepository;
-import org.apache.nifi.controller.status.history.MetricDescriptor;
-import org.apache.nifi.controller.status.history.StatusHistory;
-import org.apache.nifi.controller.status.history.StatusHistoryUtil;
-import org.apache.nifi.controller.status.history.StatusSnapshot;
-import org.apache.nifi.diagnostics.GarbageCollection;
-import org.apache.nifi.diagnostics.StorageUsage;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
-import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.events.VolatileBulletinRepository;
-import org.apache.nifi.framework.security.util.SslContextFactory;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.logging.NiFiLog;
-import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.nar.NarCloseable;
-import org.apache.nifi.nar.NarThreadContextClassLoader;
-import org.apache.nifi.processor.StandardValidationContextFactory;
-import org.apache.nifi.remote.RemoteResourceManager;
-import org.apache.nifi.remote.RemoteSiteListener;
-import org.apache.nifi.remote.SocketRemoteSiteListener;
-import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol;
-import org.apache.nifi.reporting.Bulletin;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.reporting.ReportingInitializationContext;
-import org.apache.nifi.reporting.ReportingTask;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.DomUtils;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.NodeDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
-import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
-import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
-import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
-import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
-import org.apache.nifi.web.api.entity.FlowSnippetEntity;
-import org.apache.nifi.web.api.entity.ProcessGroupEntity;
-import org.apache.nifi.web.api.entity.ProcessorEntity;
-import org.apache.nifi.web.api.entity.ProcessorsEntity;
-import org.apache.nifi.web.api.entity.ProvenanceEntity;
-import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
-import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
-import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
-import org.apache.nifi.web.util.WebUtils;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.DOMException;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
-import org.xml.sax.SAXParseException;
-
-import com.sun.jersey.api.client.ClientResponse;
-
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
-
-/**
- * Provides a cluster manager implementation. The manager federates incoming
- * HTTP client requests to the nodes' external API using the HTTP protocol. The
- * manager also communicates with nodes using the nodes' internal socket
- * protocol.
- *
- * The manager's socket address may broadcasted using multicast if a
- * MulticastServiceBroadcaster instance is set on this instance. The manager
- * instance must be started after setting the broadcaster.
- *
- * The manager may be configured with an EventManager for recording noteworthy
- * lifecycle events (e.g., first heartbeat received, node status change).
- *
- * The start() and stop() methods must be called to initialize and stop the
- * instance.
- *
- * @author unattributed
- */
-public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider {
-
-    public static final String ROOT_GROUP_ID_ALIAS = "root";
-    public static final String BULLETIN_CATEGORY = "Clustering";
-
-    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(WebClusterManager.class));
-    private static final Logger heartbeatLogger = new NiFiLog(LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"));
-
-    /**
-     * The HTTP header to store a cluster context. An example of what may be
-     * stored in the context is a node's auditable actions in response to a
-     * cluster request. The cluster context is serialized using Java's
-     * serialization mechanism and hex encoded.
-     */
-    public static final String CLUSTER_CONTEXT_HTTP_HEADER = "X-ClusterContext";
-
-    /**
-     * HTTP Header that stores a unique ID for each request that is replicated
-     * to the nodes. This is used for logging purposes so that request
-     * information, such as timing, can be correlated between the NCM and the
-     * nodes
-     */
-    public static final String REQUEST_ID_HEADER = "X-RequestID";
-
-    /**
-     * The HTTP header that the NCM specifies to ask a node if they are able to
-     * process a given request. The value is always 150-NodeContinue. The node
-     * will respond with 150 CONTINUE if it is able to process the request, 417
-     * EXPECTATION_FAILED otherwise.
-     */
-    public static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects";
-    public static final int NODE_CONTINUE_STATUS_CODE = 150;
-
-    /**
-     * The HTTP header that the NCM specifies to indicate that a node should
-     * invalidate the specified user group. This is done to ensure that user
-     * cache is not stale when an administrator modifies a group through the UI.
-     */
-    public static final String CLUSTER_INVALIDATE_USER_GROUP_HEADER = "X-ClusterInvalidateUserGroup";
-
-    /**
-     * The HTTP header that the NCM specifies to indicate that a node should
-     * invalidate the specified user. This is done to ensure that user cache is
-     * not stale when an administrator modifies a user through the UI.
-     */
-    public static final String CLUSTER_INVALIDATE_USER_HEADER = "X-ClusterInvalidateUser";
-
-    /**
-     * The default number of seconds to respond to a connecting node if the
-     * manager cannot provide it with a current data flow.
-     */
-    private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5;
-
-    public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
-
-    public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
-    public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
-
-    public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
-    public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}");
-
-    public static final Pattern PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))");
-    public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
-    public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance");
-
-    public static final String PROVENANCE_URI = "/nifi-api/controller/provenance";
-    public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
-    public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
-
-    private final NiFiProperties properties;
-    private final HttpRequestReplicator httpRequestReplicator;
-    private final HttpResponseMapper httpResponseMapper;
-    private final DataFlowManagementService dataFlowManagementService;
-    private final ClusterManagerProtocolSenderListener senderListener;
-    private final StringEncryptor encryptor;
-    private final Queue<Heartbeat> pendingHeartbeats = new ConcurrentLinkedQueue<>();
-    private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock();
-    private final ClusterManagerLock readLock = new ClusterManagerLock(resourceRWLock.readLock(), "Read");
-    private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write");
-
-    private final Set<Node> nodes = new HashSet<>();
-    private final Set<ReportingTaskNode> reportingTasks = new HashSet<>();
-
-    // null means the dataflow should be read from disk
-    private StandardDataFlow cachedDataFlow = null;
-    private NodeIdentifier primaryNodeId = null;
-    private Revision revision = new Revision(0L, "");
-    private Timer heartbeatMonitor;
-    private Timer heartbeatProcessor;
-    private volatile ClusterServicesBroadcaster servicesBroadcaster = null;
-    private volatile EventManager eventManager = null;
-    private volatile ClusterNodeFirewall clusterFirewall = null;
-    private volatile AuditService auditService = null;
-    private volatile ControllerServiceProvider controllerServiceProvider = null;
-
-    private final RemoteSiteListener remoteSiteListener;
-    private final Integer remoteInputPort;
-    private final Boolean remoteCommsSecure;
-    private final BulletinRepository bulletinRepository;
-    private final String instanceId;
-    private final FlowEngine reportingTaskEngine;
-    private final Map<NodeIdentifier, ComponentStatusRepository> componentMetricsRepositoryMap = new HashMap<>();
-    private final StandardProcessScheduler processScheduler;
-    private final long componentStatusSnapshotMillis;
-
-    public WebClusterManager(final HttpRequestReplicator httpRequestReplicator, final HttpResponseMapper httpResponseMapper,
-            final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener,
-            final NiFiProperties properties, final StringEncryptor encryptor) {
-
-        if (httpRequestReplicator == null) {
-            throw new IllegalArgumentException("HttpRequestReplicator may not be null.");
-        } else if (httpResponseMapper == null) {
-            throw new IllegalArgumentException("HttpResponseMapper may not be null.");
-        } else if (dataFlowManagementService == null) {
-            throw new IllegalArgumentException("DataFlowManagementService may not be null.");
-        } else if (senderListener == null) {
-            throw new IllegalArgumentException("ClusterManagerProtocolSenderListener may not be null.");
-        } else if (properties == null) {
-            throw new IllegalArgumentException("NiFiProperties may not be null.");
-        }
-
-        // Ensure that our encryptor/decryptor is properly initialized
-        this.httpRequestReplicator = httpRequestReplicator;
-        this.httpResponseMapper = httpResponseMapper;
-        this.dataFlowManagementService = dataFlowManagementService;
-        this.properties = properties;
-        this.controllerServiceProvider = new StandardControllerServiceProvider();
-        this.bulletinRepository = new VolatileBulletinRepository();
-        this.instanceId = UUID.randomUUID().toString();
-        this.senderListener = senderListener;
-        this.encryptor = encryptor;
-        senderListener.addHandler(this);
-        senderListener.setBulletinRepository(bulletinRepository);
-
-        final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
-        long snapshotMillis;
-        try {
-            snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS);
-        } catch (final Exception e) {
-            snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
-        }
-        componentStatusSnapshotMillis = snapshotMillis;
-
-        remoteInputPort = properties.getRemoteInputPort();
-        if (remoteInputPort == null) {
-            remoteSiteListener = null;
-            remoteCommsSecure = null;
-        } else {
-            // Register the ClusterManagerServerProtocol as the appropriate resource for site-to-site Server Protocol
-            RemoteResourceManager.setServerProtocolImplementation(ClusterManagerServerProtocol.RESOURCE_NAME, ClusterManagerServerProtocol.class);
-            remoteCommsSecure = properties.isSiteToSiteSecure();
-            if (remoteCommsSecure) {
-                final SSLContext sslContext = SslContextFactory.createSslContext(properties, false);
-
-                if (sslContext == null) {
-                    throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured");
-                }
-
-                remoteSiteListener = new SocketRemoteSiteListener(remoteInputPort.intValue(), sslContext, this);
-            } else {
-                remoteSiteListener = new SocketRemoteSiteListener(remoteInputPort.intValue(), null, this);
-            }
-        }
-
-        reportingTaskEngine = new FlowEngine(8, "Reporting Task Thread");
-
-        processScheduler = new StandardProcessScheduler(new Heartbeater() {
-            @Override
-            public void heartbeat() {
-            }
-        }, this, encryptor);
-        processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor));
-        processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10);
-        processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
-    }
-
-    public void start() throws IOException {
-        writeLock.lock();
-        try {
-
-            if (isRunning()) {
-                throw new IllegalStateException("Instance is already started.");
-            }
-
-            try {
-                // setup heartbeat monitoring
-                heartbeatMonitor = new Timer("Heartbeat Monitor", /* is daemon */ true);
-                heartbeatMonitor.scheduleAtFixedRate(new HeartbeatMonitoringTimerTask(), 0, getHeartbeatMonitoringIntervalSeconds() * 1000);
-
-                heartbeatProcessor = new Timer("Process Pending Heartbeats", true);
-                final int processPendingHeartbeatDelay = 1000 * Math.max(1, getClusterProtocolHeartbeatSeconds() / 2);
-                heartbeatProcessor.schedule(new ProcessPendingHeartbeatsTask(), processPendingHeartbeatDelay, processPendingHeartbeatDelay);
-
-                // start request replication service
-                httpRequestReplicator.start();
-
-                // start protocol service
-                senderListener.start();
-
-                // start flow management service
-                dataFlowManagementService.start();
-
-                if (remoteSiteListener != null) {
-                    remoteSiteListener.start();
-                }
-
-                // load flow
-                if (dataFlowManagementService.isFlowCurrent()) {
-                    final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow();
-                    cachedDataFlow = clusterDataFlow.getDataFlow();
-                    primaryNodeId = clusterDataFlow.getPrimaryNodeId();
-                } else {
-                    throw new IOException("Flow is not current.");
-                }
-
-                // start multicast broadcasting service, if configured
-                if (servicesBroadcaster != null) {
-                    servicesBroadcaster.start();
-                }
-
-                // start in safe mode
-                executeSafeModeTask();
-
-                // Load and start running Reporting Tasks
-                final File taskFile = new File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
-                reportingTasks.addAll(loadReportingTasks(taskFile));
-            } catch (final IOException ioe) {
-                logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
-                stop();
-                throw ioe;
-            }
-
-        } finally {
-            writeLock.unlock("START");
-        }
-    }
-
-    public void stop() throws IOException {
-        writeLock.lock();
-        try {
-
-            // returns true if any service is running
-            if (isRunning() == false) {
-                throw new IllegalArgumentException("Instance is already stopped.");
-            }
-
-            boolean encounteredException = false;
-
-            // stop the heartbeat monitoring
-            if (isHeartbeatMonitorRunning()) {
-                heartbeatMonitor.cancel();
-                heartbeatMonitor = null;
-            }
-
-            if (heartbeatProcessor != null) {
-                heartbeatProcessor.cancel();
-                heartbeatProcessor = null;
-            }
-
-            // stop the HTTP request replicator service
-            if (httpRequestReplicator.isRunning()) {
-                httpRequestReplicator.stop();
-            }
-
-            // stop the flow management service
-            if (dataFlowManagementService.isRunning()) {
-                dataFlowManagementService.stop();
-            }
-
-            if (remoteSiteListener != null) {
-                remoteSiteListener.stop();
-            }
-
-            // stop the protocol listener service
-            if (senderListener.isRunning()) {
-                try {
-                    senderListener.stop();
-                } catch (final IOException ioe) {
-                    encounteredException = true;
-                    logger.warn("Failed to shutdown protocol service due to: " + ioe, ioe);
-                }
-            }
-
-            // stop the service broadcaster
-            if (isBroadcasting()) {
-                servicesBroadcaster.stop();
-            }
-
-            if ( processScheduler != null ) {
-                processScheduler.shutdown();
-            }
-            
-            if (encounteredException) {
-                throw new IOException("Failed to shutdown Cluster Manager because one or more cluster services failed to shutdown.  Check the logs for details.");
-            }
-
-        } finally {
-            writeLock.unlock("STOP");
-        }
-    }
-
-    public boolean isRunning() {
-        readLock.lock();
-        try {
-            return isHeartbeatMonitorRunning()
-                    || httpRequestReplicator.isRunning()
-                    || senderListener.isRunning()
-                    || dataFlowManagementService.isRunning()
-                    || isBroadcasting();
-        } finally {
-            readLock.unlock("isRunning");
-        }
-    }
-
-    @Override
-    public boolean canHandle(ProtocolMessage msg) {
-        return MessageType.CONNECTION_REQUEST == msg.getType()
-                || MessageType.HEARTBEAT == msg.getType()
-                || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType()
-                || MessageType.BULLETINS == msg.getType()
-                || MessageType.RECONNECTION_FAILURE == msg.getType();
-    }
-
-    @Override
-    public ProtocolMessage handle(final ProtocolMessage protocolMessage) throws ProtocolException {
-        switch (protocolMessage.getType()) {
-            case CONNECTION_REQUEST:
-                return handleConnectionRequest((ConnectionRequestMessage) protocolMessage);
-            case HEARTBEAT:
-                final HeartbeatMessage heartbeatMessage = (HeartbeatMessage) protocolMessage;
-
-                final Heartbeat original = heartbeatMessage.getHeartbeat();
-                final NodeIdentifier originalNodeId = original.getNodeIdentifier();
-                final Heartbeat heartbeatWithDn = new Heartbeat(addRequestorDn(originalNodeId, heartbeatMessage.getRequestorDN()), original.isPrimary(), original.isConnected(), original.getPayload());
-
-                handleHeartbeat(heartbeatWithDn);
-                return null;
-            case CONTROLLER_STARTUP_FAILURE:
-                new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        handleControllerStartupFailure((ControllerStartupFailureMessage) protocolMessage);
-                    }
-                }, "Handle Controller Startup Failure Message from " + ((ControllerStartupFailureMessage) protocolMessage).getNodeId()).start();
-                return null;
-            case RECONNECTION_FAILURE:
-                new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        handleReconnectionFailure((ReconnectionFailureMessage) protocolMessage);
-                    }
-                }, "Handle Reconnection Failure Message from " + ((ReconnectionFailureMessage) protocolMessage).getNodeId()).start();
-                return null;
-            case BULLETINS:
-                final NodeBulletinsMessage bulletinsMessage = (NodeBulletinsMessage) protocolMessage;
-                handleBulletins(bulletinsMessage.getBulletins());
-                return null;
-            default:
-                throw new ProtocolException("No handler defined for message type: " + protocolMessage.getType());
-        }
-    }
-
-    /**
-     * Services connection requests. If the data flow management service is
-     * unable to provide a current copy of the data flow, then the returned
-     * connection response will indicate the node should try later. Otherwise,
-     * the connection response will contain the the flow and the node
-     * identifier.
-     *
-     * If this instance is configured with a firewall and the request is
-     * blocked, then the response will not contain a node identifier.
-     *
-     * @param request a connection request
-     *
-     * @return a connection response
-     */
-    @Override
-    public ConnectionResponse requestConnection(final ConnectionRequest request) {
-        final boolean lockObtained = writeLock.tryLock(3, TimeUnit.SECONDS);
-        if (!lockObtained) {
-            // Create try-later response because we are too busy to service the request right now. We do not want
-            // to wait long because we want Node/NCM comms to be very responsive
-            final int tryAgainSeconds;
-            if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
-                tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
-            } else {
-                tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds();
-            }
-
-            // record event
-            final String msg = "Connection requested from node, but manager was too busy to service request.  Instructing node to try again in " + tryAgainSeconds + " seconds.";
-            addEvent(request.getProposedNodeIdentifier(), msg);
-            addBulletin(request.getProposedNodeIdentifier(), Severity.INFO, msg);
-
-            // return try later response
-            return new ConnectionResponse(tryAgainSeconds);
-        }
-
-        try {
-            // resolve the proposed node identifier to a valid node identifier
-            final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(request.getProposedNodeIdentifier());
-
-            if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
-                // if the socket address is not listed in the firewall, then return a null response
-                logger.info("Firewall blocked connection request from node " + resolvedNodeIdentifier);
-                return ConnectionResponse.createBlockedByFirewallResponse();
-            }
-
-            // get a raw reference to the node (if it doesn't exist, node will be null)
-            Node node = getRawNode(resolvedNodeIdentifier.getId());
-
-            // create a new node if necessary and set status to connecting
-            if (node == null) {
-                node = new Node(resolvedNodeIdentifier, Status.CONNECTING);
-                addEvent(node.getNodeId(), "Connection requested from new node.  Setting status to connecting.");
-                nodes.add(node);
-            } else {
-                node.setStatus(Status.CONNECTING);
-                addEvent(resolvedNodeIdentifier, "Connection requested from existing node.  Setting status to connecting");
-            }
-
-            // record the time of the connection request
-            node.setConnectionRequestedTimestamp(new Date().getTime());
-
-            // clear out old heartbeat info
-            node.setHeartbeat(null);
-
-            // try to obtain a current flow
-            if (dataFlowManagementService.isFlowCurrent()) {
-                // if a cached copy does not exist, load it from disk
-                if (cachedDataFlow == null) {
-                    final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow();
-                    cachedDataFlow = clusterDataFlow.getDataFlow();
-                    primaryNodeId = clusterDataFlow.getPrimaryNodeId();
-                }
-
-                // determine if this node should be assigned the primary role
-                final boolean primaryRole;
-                if (primaryNodeId == null || primaryNodeId.logicallyEquals(node.getNodeId())) {
-                    setPrimaryNodeId(node.getNodeId());
-                    addEvent(node.getNodeId(), "Setting primary role in connection response.");
-                    primaryRole = true;
-                } else {
-                    primaryRole = false;
-                }
-
-                return new ConnectionResponse(node.getNodeId(), cachedDataFlow, primaryRole, remoteInputPort, remoteCommsSecure, instanceId);
-            }
-
-            /*
-             * The manager does not have a current copy of the data flow, 
-             * so it will instruct the node to try connecting at a later 
-             * time.  Meanwhile, the flow will be locked down from user 
-             * changes because the node is marked as connecting.
-             */
-
-            /*
-             * Create try-later response based on flow retrieval delay to give 
-             * the flow management service a chance to retrieve a curren flow
-             */
-            final int tryAgainSeconds;
-            if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
-                tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
-            } else {
-                tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds();
-            }
-
-            // record event
-            addEvent(node.getNodeId(), "Connection requested from node, but manager was unable to obtain current flow.  Instructing node to try again in " + tryAgainSeconds + " seconds.");
-
-            // return try later response
-            return new ConnectionResponse(tryAgainSeconds);
-
-        } finally {
-            writeLock.unlock("requestConnection");
-        }
-    }
-
-    /**
-     * Services reconnection requests for a given node. If the node indicates
-     * reconnection failure, then the node will be set to disconnected and if
-     * the node has primary role, then the role will be revoked. Otherwise, a
-     * reconnection request will be sent to the node, initiating the connection
-     * handshake.
-     *
-     * @param nodeId a node identifier
-     *
-     * @throws UnknownNodeException if the node does not exist
-     * @throws IllegalNodeReconnectionException if the node cannot be
-     * reconnected because the node is not disconnected
-     * @throws NodeReconnectionException if the reconnection message failed to
-     * be sent or the cluster could not provide a current data flow for the
-     * reconnection request
-     */
-    @Override
-    public void requestReconnection(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeReconnectionException {
-        Node node = null;
-
-        final boolean primaryRole;
-        final int tryAgainSeconds;
-
-        writeLock.lock();
-        try {
-            // check if we know about this node and that it is disconnected
-            node = getRawNode(nodeId);
-            logger.info("Request was made by {} to reconnect node {} to cluster", userDn, node == null ? nodeId : node);
-
-            if (node == null) {
-                throw new UnknownNodeException("Node does not exist.");
-            } else if (Status.DISCONNECTED != node.getStatus()) {
-                throw new IllegalNodeReconnectionException("Node must be disconnected before it can reconnect.");
-            }
-
-            // clear out old heartbeat info
-            node.setHeartbeat(null);
-
-            // get the dataflow to send with the reconnection request
-            if (!dataFlowManagementService.isFlowCurrent()) {
-                /* node remains disconnected */
-                final String msg = "Reconnection requested for node, but manager was unable to obtain current flow.  Setting node to disconnected.";
-                addEvent(node.getNodeId(), msg);
-                addBulletin(node, Severity.WARNING, msg);
-                throw new NodeReconnectionException("Manager was unable to obtain current flow to provide in reconnection request to node.  Try again in a few seconds.");
-            }
-
-            // if a cached copy does not exist, load it from disk
-            if (cachedDataFlow == null) {
-                final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow();
-                cachedDataFlow = clusterDataFlow.getDataFlow();
-                primaryNodeId = clusterDataFlow.getPrimaryNodeId();
-            }
-
-            node.setStatus(Status.CONNECTING);
-            addEvent(node.getNodeId(), "Reconnection requested for node.  Setting status to connecting.");
-
-            // determine if this node should be assigned the primary role
-            if (primaryNodeId == null || primaryNodeId.logicallyEquals(node.getNodeId())) {
-                setPrimaryNodeId(node.getNodeId());
-                addEvent(node.getNodeId(), "Setting primary role in reconnection request.");
-                primaryRole = true;
-            } else {
-                primaryRole = false;
-            }
-
-            if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
-                tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
-            } else {
-                tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds();
-            }
-        } catch (final UnknownNodeException | IllegalNodeReconnectionException | NodeReconnectionException une) {
-            throw une;
-        } catch (final Exception ex) {
-            logger.warn("Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + ex, ex);
-
-            node.setStatus(Status.DISCONNECTED);
-            final String eventMsg = "Problem encountered issuing reconnection request. Node will remain disconnected: " + ex;
-            addEvent(node.getNodeId(), eventMsg);
-            addBulletin(node, Severity.WARNING, eventMsg);
-
-            // Exception thrown will include node ID but event/bulletin do not because the node/id is passed along with the message
-            throw new NodeReconnectionException("Problem encountered issuing reconnection request to " + node.getNodeId() + ". Node will remain disconnected: " + ex, ex);
-        } finally {
-            writeLock.unlock("requestReconnection");
-        }
-
-        // Asynchronously start attempting reconnection. This is not completely thread-safe, as
-        // we do this by releasing the write lock and then obtaining a read lock for each attempt,
-        // so we suffer from the ABA problem. However, we are willing to accept the consequences of
-        // this situation in order to avoid holding a lock for the entire duration. "The consequences"
-        // are that a second thread could potentially be doing the same thing, issuing a reconnection request.
-        // However, this is very unlikely to happen, based on the conditions under which we issue a reconnection
-        // request. And if we do, the node will simply reconnect multiple times, which is not a big deal.
-        requestReconnectionAsynchronously(node, primaryRole, 10, tryAgainSeconds);
-    }
-
-    private void requestReconnectionAsynchronously(final Node node, final boolean primaryRole, final int reconnectionAttempts, final int retrySeconds) {
-        final Thread reconnectionThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                for (int i = 0; i < reconnectionAttempts; i++) {
-                    final ReconnectionRequestMessage request = new ReconnectionRequestMessage();
-
-                    try {
-                        readLock.lock();
-                        try {
-                            if (Status.CONNECTING != node.getStatus()) {
-                                // the node status has changed. It's no longer appropriate to attempt reconnection.
-                                return;
-                            }
-
-                            // create the request
-                            request.setNodeId(node.getNodeId());
-                            request.setDataFlow(cachedDataFlow);
-                            request.setPrimary(primaryRole);
-                            request.setManagerRemoteSiteCommsSecure(remoteCommsSecure);
-                            request.setManagerRemoteSiteListeningPort(remoteInputPort);
-                            request.setInstanceId(instanceId);
-                        } finally {
-                            readLock.unlock("Reconnect " + node.getNodeId());
-                        }
-
-                        // Issue a reconnection request to the node.
-                        senderListener.requestReconnection(request);
-
-                        node.setConnectionRequestedTimestamp(System.currentTimeMillis());
-
-                        // successfully told node to reconnect -- we're done!
-                        return;
-                    } catch (final Exception e) {
-                        logger.warn("Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + e);
-                        if (logger.isDebugEnabled()) {
-                            logger.warn("", e);
-                        }
-
-                        addBulletin(node, Severity.WARNING, "Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + e);
-                    }
-
-                    try {
-                        Thread.sleep(1000L * retrySeconds);
-                    } catch (final InterruptedException ie) {
-                        break;
-                    }
-                }
-
-                // We failed to reconnect 10 times. We must now mark node as disconnected.
-                writeLock.lock();
-                try {
-                    if (Status.CONNECTING == node.getStatus()) {
-                        requestDisconnectionQuietly(node.getNodeId(), "Failed to issue Reconnection Request " + reconnectionAttempts + " times");
-                    }
-                } finally {
-                    writeLock.unlock("Mark node as Disconnected as a result of reconnection failure");
-                }
-            }
-        }, "Reconnect " + node.getNodeId());
-
-        reconnectionThread.start();
-    }
-
-    private List<ReportingTaskNode> loadReportingTasks(final File taskConfigXml) {
-        final List<ReportingTaskNode> tasks = new ArrayList<>();
-        if (taskConfigXml == null) {
-            logger.info("No controller tasks to start");
-            return tasks;
-        }
-
-        try {
-            final URL schemaUrl = getClass().getResource("/ReportingTaskConfiguration.xsd");
-            final Document document = parse(taskConfigXml, schemaUrl);
-
-            final NodeList tasksNodes = document.getElementsByTagName("tasks");
-            final Element tasksElement = (Element) tasksNodes.item(0);
-
-            //optional properties for all ReportingTasks
-            for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "task")) {
-                //add global properties common to all tasks
-                Map<String, String> properties = new HashMap<>();
-
-                //get properties for the specific reporting task - id, name, class,
-                //and schedulingPeriod must be set
-                final String taskId = DomUtils.getChild(taskElement, "id").getTextContent().trim();
-                final String taskName = DomUtils.getChild(taskElement, "name").getTextContent().trim();
-
-                final List<Element> schedulingStrategyNodeList = DomUtils.getChildElementsByTagName(taskElement, "schedulingStrategy");
-                String schedulingStrategyValue = SchedulingStrategy.TIMER_DRIVEN.name();
-                if (schedulingStrategyNodeList.size() == 1) {
-                    final String specifiedValue = schedulingStrategyNodeList.get(0).getTextContent();
-
-                    try {
-                        schedulingStrategyValue = SchedulingStrategy.valueOf(specifiedValue).name();
-                    } catch (final Exception e) {
-                        throw new RuntimeException("Cannot start Reporting Task with id " + taskId + " because its Scheduling Strategy does not have a valid value", e);
-                    }
-                }
-
-                final SchedulingStrategy schedulingStrategy = SchedulingStrategy.valueOf(schedulingStrategyValue);
-                final String taskSchedulingPeriod = DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim();
-                final String taskClass = DomUtils.getChild(taskElement, "class").getTextContent().trim();
-
-                //optional task-specific properties
-                for (final Element optionalProperty : DomUtils.getChildElementsByTagName(taskElement, "property")) {
-                    final String name = optionalProperty.getAttribute("name");
-                    final String value = optionalProperty.getTextContent().trim();
-                    properties.put(name, value);
-                }
-
-                //set the class to be used for the configured reporting task
-                final ReportingTaskNode reportingTaskNode;
-                try {
-                    reportingTaskNode = createReportingTask(taskClass, taskId);
-                } catch (final ReportingTaskInstantiationException e) {
-                    logger.error("Unable to load reporting task {} due to {}", new Object[]{taskId, e});
-                    if (logger.isDebugEnabled()) {
-                        logger.error("", e);
-                    }
-                    continue;
-                }
-
-                final ReportingTask reportingTask = reportingTaskNode.getReportingTask();
-
-                final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName, schedulingStrategy, taskSchedulingPeriod, this);
-                reportingTask.initialize(config);
-
-                final Map<PropertyDescriptor, String> resolvedProps;
-                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
-                    resolvedProps = new HashMap<>();
-                    for (final Map.Entry<String, String> entry : properties.entrySet()) {
-                        final PropertyDescriptor descriptor = reportingTask.getPropertyDescriptor(entry.getKey());
-                        resolvedProps.put(descriptor, entry.getValue());
-                    }
-                }
-
-                for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) {
-                    reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue());
-                }
-
-                processScheduler.schedule(reportingTaskNode);
-                tasks.add(reportingTaskNode);
-            }
-        } catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
-            logger.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXml, t});
-            if (logger.isDebugEnabled()) {
-                logger.error("", t);
-            }
-        }
-
-        return tasks;
-    }
-
-    private ReportingTaskNode createReportingTask(final String type, final String id) throws ReportingTaskInstantiationException {
-        if (type == null) {
-            throw new NullPointerException();
-        }
-        ReportingTask task = null;
-        final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
-        try {
-            final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type);
-            final Class<?> rawClass;
-            if (detectedClassLoader == null) {
-                rawClass = Class.forName(type);
-            } else {
-                rawClass = Class.forName(type, false, detectedClassLoader);
-            }
-
-            Thread.currentThread().setContextClassLoader(detectedClassLoader);
-            final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class);
-            final Object reportingTaskObj = reportingTaskClass.newInstance();
-            task = reportingTaskClass.cast(reportingTaskObj);
-        } catch (final ClassNotFoundException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException t) {
-            throw new ReportingTaskInstantiationException(type, t);
-        } finally {
-            if (ctxClassLoader != null) {
-                Thread.currentThread().setContextClassLoader(ctxClassLoader);
-            }
-        }
-
-        final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
-        final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler,
-                new ClusteredEventAccess(this), bulletinRepository, controllerServiceProvider, validationContextFactory);
-        return taskNode;
-    }
-
-    private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
-        final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
-        final Schema schema = schemaFactory.newSchema(schemaUrl);
-        final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
-        docFactory.setSchema(schema);
-        final DocumentBuilder builder = docFactory.newDocumentBuilder();
-
-        builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
-            @Override
-            public void fatalError(final SAXParseException err) throws SAXException {
-                logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage());
-                if (logger.isDebugEnabled()) {
-                    logger.error("Error Stack Dump", err);
-                }
-                throw err;
-            }
-
-            @Override
-            public void error(final SAXParseException err) throws SAXParseException {
-                logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage());
-                if (logger.isDebugEnabled()) {
-                    logger.error("Error Stack Dump", err);
-                }
-                throw err;
-            }
-
-            @Override
-            public void warning(final SAXParseException err) throws SAXParseException {
-                logger.warn(" Config file line " + err.getLineNumber() + ", uri " + err.getSystemId() + " : message : " + err.getMessage());
-                if (logger.isDebugEnabled()) {
-                    logger.warn("Warning stack dump", err);
-                }
-                throw err;
-            }
-        });
-
-        // build the docuemnt
-        final Document document = builder.parse(xmlFile);
-
-        // ensure schema compliance
-        final Validator validator = schema.newValidator();
-        validator.validate(new DOMSource(document));
-
-        return document;
-    }
-
-    private void addBulletin(final Node node, final Severity severity, final String msg) {
-        addBulletin(node.getNodeId(), severity, msg);
-    }
-
-    private void addBulletin(final NodeIdentifier nodeId, final Severity severity, final String msg) {
-        bulletinRepository.addBulletin(BulletinFactory.createBulletin(BULLETIN_CATEGORY, severity.toString(),
-                nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg));
-    }
-
-    /**
-     * Services a disconnection request.
-     *
-     * @param nodeId a node identifier
-     * @param userDn the DN of the user requesting the disconnection
-     *
-     * @throws UnknownNodeException if the node does not exist
-     * @throws IllegalNodeDisconnectionException if the node cannot be
-     * disconnected due to the cluster's state (e.g., node is last connected
-     * node or node is primary)
-     * @throws NodeDisconnectionException if the disconnection message fails to
-     * be sent.
-     */
-    @Override
-    public void requestDisconnection(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException {
-        writeLock.lock();
-        try {
-            // check that the node is known
-            final Node node = getNode(nodeId);
-            if (node == null) {
-                throw new UnknownNodeException("Node does not exist.");
-            }
-            requestDisconnection(node.getNodeId(), /* ignore last node */ false, "User " + userDn + " Disconnected Node");
-        } finally {
-            writeLock.unlock("requestDisconnection(String)");
-        }
-    }
-
-    /**
-     * Requests a disconnection to the node with the given node ID, but any
-     * exception thrown is suppressed.
-     *
-     * @param nodeId the node ID
-     */
-    private void requestDisconnectionQuietly(final NodeIdentifier nodeId, final String explanation) {
-        try {
-            requestDisconnection(nodeId, /* ignore node check */ true, explanation);
-        } catch (final IllegalNodeDisconnectionException | NodeDisconnectionException ex) { /* suppress exception */ }
-    }
-
-    /**
-     * Issues a disconnection message to the node identified by the given node
-     * ID. If the node is not known, then a UnknownNodeException is thrown. If
-     * the node cannot be disconnected due to the cluster's state and
-     * ignoreLastNodeCheck is false, then a IllegalNodeDisconnectionException is
-     * thrown. Otherwise, a disconnection message is issued to the node.
-     *
-     * Whether the disconnection message is successfully sent to the node, the
-     * node is marked as disconnected and if the node is the primary node, then
-     * the primary role is revoked.
-     *
-     * @param nodeId the ID of the node
-     * @param ignoreNodeChecks if false, checks will be made to ensure the
-     * cluster supports the node's disconnection (e.g., the node is not the last
-     * connected node in the cluster; the node is not the primary); otherwise,
-     * the request is made regardless of the cluster state
-     * @param explanation
-     *
-     * @throws IllegalNodeDisconnectionException if the node cannot be
-     * disconnected due to the cluster's state (e.g., node is last connected
-     * node or node is primary). Not thrown if ignoreNodeChecks is true.
-     * @throws NodeDisconnectionException if the disconnection message fails to
-     * be sent.
-     */
-    private void requestDisconnection(final NodeIdentifier nodeId, final boolean ignoreNodeChecks, final String explanation)
-            throws IllegalNodeDisconnectionException, NodeDisconnectionException {
-
-        writeLock.lock();
-        try {
-
-            // check that the node is known
-            final Node node = getRawNode(nodeId.getId());
-            if (node == null) {
-                if (ignoreNodeChecks) {
-                    // issue the disconnection
-                    final DisconnectMessage request = new DisconnectMessage();
-                    request.setNodeId(nodeId);
-                    request.setExplanation(explanation);
-
-                    addEvent(nodeId, "Disconnection requested due to " + explanation);
-                    senderListener.disconnect(request);
-                    addEvent(nodeId, "Node disconnected due to " + explanation);
-                    addBulletin(nodeId, Severity.INFO, "Node disconnected due to " + explanation);
-                    return;
-                } else {
-                    throw new UnknownNodeException("Node does not exist");
-                }
-            }
-
-            // if necessary, check that the node may be disconnected
-            if (!ignoreNodeChecks) {
-                final Set<NodeIdentifier> connectedNodes = getNodeIds(Status.CONNECTED);
-                // cannot disconnect the last connected node in the cluster
-                if (connectedNodes.size() == 1 && connectedNodes.iterator().next().equals(nodeId)) {
-                    throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the only connected node in the cluster.");
-                } else if (isPrimaryNode(nodeId)) {
-                    // cannot disconnect the primary node in the cluster
-                    throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the primary node in the cluster.");
-                }
-            }
-
-            // update status
-            node.setStatus(Status.DISCONNECTED);
-            notifyDataFlowManagementServiceOfNodeStatusChange();
-
-            // issue the disconnection
-            final DisconnectMessage request = new DisconnectMessage();
-            request.setNodeId(nodeId);
-            request.setExplanation(explanation);
-
-            addEvent(nodeId, "Disconnection requested due to " + explanation);
-            senderListener.disconnect(request);
-            addEvent(nodeId, "Node disconnected due to " + explanation);
-            addBulletin(node, Severity.INFO, "Node disconnected due to " + explanation);
-        } finally {
-            writeLock.unlock("requestDisconnection(NodeIdentifier, boolean)");
-        }
-    }
-
-    /**
-     * Messages the node to have the primary role. If the messaging fails, then
-     * the node is marked as disconnected.
-     *
-     * @param nodeId the node ID to assign primary role
-     *
-     * @return true if primary role assigned; false otherwise
-     */
-    private boolean assignPrimaryRole(final NodeIdentifier nodeId) {
-        writeLock.lock();
-        try {
-            // create primary role message
-            final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage();
-            msg.setNodeId(nodeId);
-            msg.setPrimary(true);
-            logger.info("Attempting to assign primary role to node: " + nodeId);
-
-            // message 
-            senderListener.assignPrimaryRole(msg);
-
-            logger.info("Assigned primary role to node: " + nodeId);
-            addBulletin(nodeId, Severity.INFO, "Node assigned primary role");
-
-            // true indicates primary role assigned
-            return true;
-
-        } catch (final ProtocolException ex) {
-
-            logger.warn("Failed attempt to assign primary role to node " + nodeId + " due to " + ex);
-            addBulletin(nodeId, Severity.ERROR, "Failed to assign primary role to node due to: " + ex);
-
-            // mark node as disconnected and log/record the event
-            final Node node = getRawNode(nodeId.getId());
-            node.setStatus(Status.DISCONNECTED);
-            addEvent(node.getNodeId(), "Disconnected because of failed attempt to assign primary role.");
-
-            addBulletin(nodeId, Severity.WARNING, "Node disconnected because of failed attempt to assign primary role");
-
-            // false indicates primary role failed to be assigned
-            return false;
-        } finally {
-            writeLock.unlock("assignPrimaryRole");
-        }
-    }
-
-    /**
-     * Messages the node with the given node ID to no longer have the primary
-     * role. If the messaging fails, then the node is marked as disconnected.
-     *
-     * @return true if the primary role was revoked from the node; false
-     * otherwise
-     */
-    private boolean revokePrimaryRole(final NodeIdentifier nodeId) {
-        writeLock.lock();
-        try {
-            // create primary role message
-            final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage();
-            msg.setNodeId(nodeId);
-            msg.setPrimary(false);
-            logger.info("Attempting to revoke primary role from node: " + nodeId);
-
-            // send message
-            senderListener.assignPrimaryRole(msg);
-
-            logger.info("Revoked primary role from node: " + nodeId);
-            addBulletin(nodeId, Severity.INFO, "Primary Role revoked from node");
-
-            // true indicates primary role was revoked
-            return true;
-        } catch (final ProtocolException ex) {
-
-            logger.warn("Failed attempt to revoke primary role from node " + nodeId + " due to " + ex);
-
-            // mark node as disconnected and log/record the event
-            final Node node = getRawNode(nodeId.getId());
-            node.setStatus(Status.DISCONNECTED);
-            addEvent(node.getNodeId(), "Disconnected because of failed attempt to revoke primary role.");
-            addBulletin(node, Severity.ERROR, "Node disconnected because of failed attempt to revoke primary role");
-
-            // false indicates primary role failed to be revoked
-            return false;
-        } finally {
-            writeLock.unlock("revokePrimaryRole");
-        }
-    }
-
-    private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) {
-        return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(),
-                nodeId.getApiPort(), nodeId.getSocketAddress(), nodeId.getSocketPort(), dn);
-    }
-
-    private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage) {
-        final NodeIdentifier proposedIdentifier = requestMessage.getConnectionRequest().getProposedNodeIdentifier();
-        final ConnectionRequest requestWithDn = new ConnectionRequest(addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN()));
-
-        final ConnectionResponse response = requestConnection(requestWithDn);
-        final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
-        responseMessage.setConnectionResponse(response);
-        return responseMessage;
-    }
-
-    private void handleControllerStartupFailure(final ControllerStartupFailureMessage msg) {
-        writeLock.lock();
-        try {
-            final Node node = getRawNode(msg.getNodeId().getId());
-            if (node != null) {
-                node.setStatus(Status.DISCONNECTED);
-                addEvent(msg.getNodeId(), "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage());
-                addBulletin(node, Severity.ERROR, "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage());
-            }
-        } finally {
-            writeLock.unlock("handleControllerStartupFailure");
-        }
-    }
-
-    private void handleReconnectionFailure(final ReconnectionFailureMessage msg) {
-        writeLock.lock();
-        try {
-            final Node node = getRawNode(msg.getNodeId().getId());
-            if (node != null) {
-                node.setStatus(Status.DISCONNECTED);
-                final String errorMsg = "Node could not rejoin cluster. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage();
-                addEvent(msg.getNodeId(), errorMsg);
-                addBulletin(node, Severity.ERROR, errorMsg);
-            }
-        } finally {
-            writeLock.unlock("handleControllerStartupFailure");
-        }
-    }
-
-    /**
-     * Adds an instance of a specified controller service.
-     *
-     * @param type
-     * @param id
-     * @param properties
-     * @return
-     */
-    @Override
-    public ControllerServiceNode createControllerService(String type, String id, Map<String, String> properties) {
-        return controllerServiceProvider.createControllerService(type, id, properties);
-    }
-
-    @Override
-    public ControllerService getControllerService(String serviceIdentifier) {
-        return controllerServiceProvider.getControllerService(serviceIdentifier);
-    }
-
-    @Override
-    public ControllerServiceNode getControllerServiceNode(final String id) {
-        return controllerServiceProvider.getControllerServiceNode(id);
-    }
-
-    @Override
-    public boolean isControllerServiceEnabled(final ControllerService service) {
-        return controllerServiceProvider.isControllerServiceEnabled(service);
-    }
-
-    @Override
-    public boolean isControllerServiceEnabled(final String serviceIdentifier) {
-        return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
-    }
-
-    /**
-     * Handle a bulletins message.
-     *
-     * @param bulletins
-     */
-    public void handleBulletins(final NodeBulletins bulletins) {
-        final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier();
-        final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort();
-
-        // unmarshal the message
-        BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
-        for (final Bulletin bulletin : payload.getBulletins()) {
-            bulletin.setNodeAddress(nodeAddress);
-            bulletinRepository.addBulletin(bulletin);
-        }
-    }
-
-    /**
-     * Handles a node's heartbeat. If this heartbeat is a node's first heartbeat
-     * since its connection request, then the manager will mark the node as
-     * connected. If the node was previously disconnected due to a lack of
-     * heartbeat, then a reconnection request is issued. If the node was
-     * disconnected for other reasons, then a disconnection request is issued.
-     * If this instance is configured with a firewall and the heartbeat is
-     * blocked, then a disconnection request is issued.
-     *
-     * @param heartbeat
-     */
-    @Override
-    public void handleHeartbeat(final Heartbeat heartbeat) {
-        // sanity check heartbeat
-        if (heartbeat == null) {
-            throw new IllegalArgumentException("Heartbeat may not be null.");
-        } else if (heartbeat.getNodeIdentifier() == null) {
-            throw new IllegalArgumentException("Heartbeat does not contain a node ID.");
-        }
-
-        /*
-         * Processing a heartbeat requires a write lock, which may take a while
-         * to obtain.  Only the last heartbeat is necessary to process per node.
-         * Futhermore, since many could pile up, heartbeats are processed in 
-         * bulk.
-         * 
-         * The below queue stores the pending heartbeats.
-         */
-        pendingHeartbeats.add(heartbeat);
-    }
-
-    private void processPendingHeartbeats() {
-        Node node;
-
-        writeLock.lock();
-        try {
-            /*
-             * Get the most recent heartbeats for the nodes in the cluster.  This
-             * is achieved by "draining" the pending heartbeats queue, populating
-             * a map that associates a node identifier with its latest heartbeat, and
-             * finally, getting the values of the map.
-             */
-            final Map<NodeIdentifier, Heartbeat> mostRecentHeartbeatsMap = new HashMap<>();
-            Heartbeat aHeartbeat;
-            while ((aHeartbeat = pendingHeartbeats.poll()) != null) {
-                mostRecentHeartbeatsMap.put(aHeartbeat.getNodeIdentifier(), aHeartbeat);
-            }
-            final Collection<Heartbeat> mostRecentHeartbeats = new ArrayList<>(mostRecentHeartbeatsMap.values());
-
-            // return fast if no work to do
-            if (mostRecentHeartbeats.isEmpty()) {
-                return;
-            }
-
-            logNodes("Before Heartbeat Processing", heartbeatLogger);
-
-            final int numPendingHeartbeats = mostRecentHeartbeats.size();
-            if (heartbeatLogger.isDebugEnabled()) {
-                heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : ""));
-            }
-
-            for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
-                try {
-                    // resolve the proposed node identifier to valid node identifier
-                    final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(mostRecentHeartbeat.getNodeIdentifier());
-
-                    // get a raw reference to the node (if it doesn't exist, node will be null)
-                    node = getRawNode(resolvedNodeIdentifier.getId());
-
-                    // if the node thinks it has the primary role, but the manager has assigned the role to a different node, then revoke the role
-                    if (mostRecentHeartbeat.isPrimary() && !isPrimaryNode(resolvedNodeIdentifier)) {
-                        addEvent(resolvedNodeIdentifier, "Heartbeat indicates node is running as primary node.  Revoking primary role because primary role is assigned to a different node.");
-                        revokePrimaryRole(resolvedNodeIdentifier);
-                    }
-
-                    final boolean heartbeatIndicatesNotYetConnected = !mostRecentHeartbeat.isConnected();
-
-                    if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
-                        if (node == null) {
-                            logger.info("Firewall blocked heartbeat received from unknown node " + resolvedNodeIdentifier + ".  Issuing disconnection request.");
-                        } else {
-                            // record event
-                            addEvent(resolvedNodeIdentifier, "Firewall blocked received heartbeat.  Issuing disconnection request.");
-                        }
-
-                        // request node to disconnect
-                        requestDisconnectionQuietly(resolvedNodeIdentifier, "Blocked By Firewall");
-
-                    } else if (node == null) {  // unknown node, so issue reconnect request
-                        // create new node and add to node set
-                        final Node newNode = new Node(resolvedNodeIdentifier, Status.DISCONNECTED);
-                        nodes.add(newNode);
-
-                        // record event
-                        addEvent(newNode.getNodeId(), "Received heartbeat from unknown node.  Issuing reconnection request.");
-
-                        // record heartbeat
-                        newNode.setHeartbeat(mostRecentHeartbeat);
-                        requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
-                    } else if (heartbeatIndicatesNotYetConnected) {
-                        if (Status.CONNECTED == node.getStatus()) {
-                            // record event
-                            addEvent(node.getNodeId(), "Received heartbeat from node that thinks it is not yet part of the cluster, though the Manager thought it was. Marking as Disconnected and issuing reconnection request.");
-
-                            // record heartbeat
-                            node.setHeartbeat(null);
-                            node.setStatus(Status.DISCONNECTED);
-
-                            requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
-                        }
-                    } else if (Status.DISCONNECTED == node.getStatus()) {
-                        // ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is
-                        // the only node. We allow it if it is the only node because if we have a one-node cluster, then
-                        // we cannot manually reconnect it.
-                        if (node.isHeartbeatDisconnection() || nodes.size() == 1) {
-                            // record event
-                            if (node.isHeartbeatDisconnection()) {
-                                addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected due to lack of heartbeat.  Issuing reconnection request.");
-                            } else {
-                                addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected, but it is the only known node, so issuing reconnection request.");
-                            }
-
-                            // record heartbeat
-                            node.setHeartbeat(mostRecentHeartbeat);
-
-                            // request reconnection
-                            requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
-                        } else {
-                            // disconnected nodes should not heartbeat, so we need to issue a disconnection request
-                            heartbeatLogger.info("Ignoring received heartbeat from disconnected node " + resolvedNodeIdentifier + ".  Issuing disconnection request.");
-
-                            // request node to disconnect
-                            requestDisconnectionQuietly(resolvedNodeIdentifier, "Received Heartbeat from Node, but Manager has already marked Node as Disconnected");
-                        }
-
-                    } else if (Status.DISCONNECTING == node.getStatus()) {
-                        /* ignore spurious heartbeat */
-                    } else {  // node is either either connected or connecting
-                        // first heartbeat causes status change from connecting to connected
-                        if (Status.CONNECTING == node.getStatus()) {
-                            if (mostRecentHeartbeat.getCreatedTimestamp() < node.getConnectionRequestedTimestamp()) {
-                                heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + " but ignoring because it was generated before the node was last asked to reconnect.");
-                                continue;
-                            }
-
-                            // set status to connected
-                            node.setStatus(Status.CONNECTED);
-
-                            // record event
-                            addEvent(resolvedNodeIdentifier, "Received first heartbeat from connecting node.  Setting node to connected.");
-
-                            // notify service of updated node set
-                            notifyDataFlowManagementServiceOfNodeStatusChange();
-
-                            addBulletin(node, Severity.INFO, "Node Connected");
-                        } else {
-                            heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + ".");
-                        }
-
-                        // record heartbeat
-                        node.setHeartbeat(mostRecentHeartbeat);
-
-                        ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
-                        if (statusRepository == null) {
-                            statusRepository = createComponentStatusRepository();
-                            componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
-                        }
-
-                        // If it's been a while since we've captured, capture this metric.
-                        final Date lastCaptureDate = statusRepository.getLastCaptureDate();
-                        final long millisSinceLastCapture = (lastCaptureDate == null) ? Long.MAX_VALUE : (System.currentTimeMillis() - lastCaptureDate.getTime());
-
-                        if (millisSinceLastCapture > componentStatusSnapshotMillis) {
-                            statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
-                        }
-                    }
-                } catch (final Exception e) {
-                    logger.error("Failed to process heartbeat from {}:{} due to {}", mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString());
-                    if (logger.isDebugEnabled()) {
-                        logger.error("", e);
-                    }
-                }
-            }
-
-            logNodes("After Heartbeat Processing", heartbeatLogger);
-        } finally {
-            writeLock.unlock("processPendingHeartbeats");
-        }
-    }
-
-    private ComponentStatusRepository createComponentStatusRepository() {
-        final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
-        if (implementationClassName == null) {
-            throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
-        }
-
-        try {
-            return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class);
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Set<Node> getNodes(final Status... statuses) {
-        final Set<Status> desiredStatusSet = new HashSet<>();
-        for (final Status status : statuses) {
-            desiredStatusSet.add(status);
-        }
-
-        readLock.lock();
-        try {
-            final Set<Node> clonedNodes = new HashSet<>();
-            for (final Node node : nodes) {
-                if (desiredStatusSet.isEmpty() || desiredStatusSet.contains(node.getStatus())) {
-                    clonedNodes.add(node.clone());
-                }
-            }
-            return Collections.unmodifiableSet(clonedNodes);
-        } finally {
-            readLock.unlock("getNodes(Status...)");
-        }
-    }
-
-    @Override
-    public Node getNode(final String nodeId) {
-        readLock.lock();
-        try {
-            for (final Node node : nodes) {
-                if (node.getNodeId().getId().equals(nodeId)) {
-                    return node.clone();
-                }
-            }
-            return null;
-        } finally {
-            readLock.unlock("getNode(String)");
-        }
-    }
-
-    @Override
-    public Node getPrimaryNode() {
-        readLock.lock();
-        try {
-            if (primaryNodeId == null) {
-                return null;
-            } else {
-                return getNode(primaryNodeId.getId());
-            }
-        } finally {
-            readLock.unlock("getPrimaryNode");
-        }
-    }
-
-    @Override
-    public void deleteNode(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeDeletionException {
-        writeLock.lock();
-        try {
-            final Node node = getNode(nodeId);
-            if (node == null) {
-                throw new UnknownNodeException("Node does not exist.");
-            } else if (Status.DISCONNECTED == node.getStatus()) {
-                nodes.remove(node);
-
-                if (eventManager != null) {
-                    eventManager.clearEventHistory(node.getNodeId().getId());
-                }
-
-                logger.info("Removing node {} from cluster because this action was requested by {}", node, userDn);
-            } else {
-                throw new IllegalNodeDeletionException("Node may not be deleted because it is not disconnected.");
-            }
-        } finally {
-            writeLock.unlock("deleteNode");
-        }
-    }
-
-    @Override
-    public Set<NodeIdentifier> getNodeIds(final Status... statuses) {
-        readLock.lock();
-        try {
-            final Set<NodeIdentifier> nodeIds = new HashSet<>();
-            for (final Node node : nodes) {
-                if (statuses == null || statuses.length == 0) {
-                    nodeIds.add(node.getNodeId());
-                } else {
-                    for (final Node.Status status : statuses) {
-                        if (node.getStatus() == status) {
-                            nodeIds.add(node.getNodeId());
-                            break;
-                        }
-                    }
-                }
-            }
-            return nodeIds;
-        } finally {
-            readLock.unlock("getNodeIds(Status...)");
-        }
-    }
-
-    @Override
-    public void setPrimaryNode(final String nodeId, final String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException {
-        writeLock.lock();
-        try {
-
-            final Node node = getNode(nodeId);
-            if (node == null) {
-                throw new UnknownNodeException("Node does not exist.");
-            } else if (Status.CONNECTED != node.getStatus()) {
-                throw new IneligiblePrimaryNodeException("Node must be connected before it can be assigned as the primary node.");
-            }
-
-            // revoke primary role
-            final Node primaryNode;
-            if ((primaryNode = getPrimaryNode()) != null) {
-                if (primaryNode.getStatus() == Status.DISCONNECTED) {
-                    throw new PrimaryRoleAssignmentException("A disconnected, primary node exists.  Delete the node before assigning the primary role to a different node.");
-                } else if (revokePrimaryRole(primaryNode.getNodeId())) {
-                    addEvent(primaryNode.getNodeId(), "Role revoked from this node as part of primary role reassignment.");
-                } else {
-                    throw new PrimaryRoleAssignmentException(
-                            "Failed to revoke primary role from node. Primary node is now disconnected. Delete the node before assigning the primary role to a different node.");
-                }
-            }
-
-            // change the primary node ID to the given node
-            setPrimaryNodeId(node.getNodeId());
-
-            // assign primary role
-            if (assignPrimaryRole(node.getNodeId())) {
-                addEvent(node.getNodeId(), "Role assigned to this node as part of primary role reassignment. Action performed by " + userDn);
-                addBulletin(node, Severity.INFO, "Primary Role assigned to node by " + userDn);
-            } else {
-                throw new PrimaryRoleAssignmentException(
-                        "Cluster manager assigned primary role to node, but the node failed to accept the assignment.  Cluster manager disconnected node.");
-            }
-        } finally {
-            writeLock.unlock("setPrimaryNode");
-        }
-    }
-
-    private int getClusterProtocolHeartbeatSeconds() {
-        return (int) FormatUtils.getTimeDuration(properties.getClusterProtocolHeartbeatInterval(), TimeUnit.SECONDS);
-    }
-
-    @Override
-    public int getHeartbeatMonitoringIntervalSeconds() {
-        return 4 * getClusterProtocolHeartbeatSeconds();
-    }
-
-    @Override
-    public int getMaxHeartbeatGapSeconds() {
-        return 8 * getClusterProtocolHeartbeatSeconds();
-    }
-
-    @Override
-    public List<Event> getNodeEvents(final String nodeId) {
-        readLock.lock();
-        try {
-            List<Event> events = null;
-            final EventManager eventMgr = eventManager;
-            if (eventMgr != null) {
-                events = eventMgr.getEvents(nodeId);
-            }
-
-            if (events == null) {
-                return Collections.emptyList();
-            } else {
-                return Collections.unmodifiableList(events);
-            }
-        } finally {
-            readLock.unlock("getNodeEvents");
-        }
-    }
-
-    @Override
-    public NodeResponse applyRequest(final String method, final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers)
-            throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
-        return applyRequest(method, uri, parameters, headers, getNodeIds(Status.CONNECTED));
-    }
-
-    @Override
-    public NodeResponse applyRequest(final String method, final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers, final Set<NodeIdentifier> nodeIdentifiers)
-            throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
-
-        final boolean mutableRequest = canChangeNodeState(method, uri);
-        final ClusterManagerLock lock = mutableRequest ? writeLock : readLock;
-
-        lock.lock();
-        try {
-            // check that the request can be applied
-            if (mutableRequest) {
-                if (isInSafeMode()) {
-                    throw new SafeModeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while in safe mode");
-                } else if (!getNodeIds(Status.DISCONNECTED, Status.DISCONNECTING).isEmpty()) {
-                    throw new DisconnectedNodeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while a node is disconnected from the cluster");
-                } else if (!getNodeIds(Status.CONNECTING).isEmpty()) {
-                    // if any node is connecting and a request can change the flow, then we throw an exception
-                    throw new ConnectingNodeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while a node is trying to connect to the cluster");
-                }
-            }
-
-            final NodeResponse clientResponse = federateRequest(method, uri, parameters, null, headers, nodeIdentifiers);
-            if (clientResponse == null) {
-                if (mutableRequest) {
-                    throw new NoConnectedNodesException(String.format("All nodes were disconnected as a result of applying request %s %s", method, uri));
-                } else {
-                    throw new NoResponseFromNodesException("No nodes were able to process this request.");
-                }
-            } else {
-                return clientResponse;
-            }
-        } finally {
-            lock.unlock("applyRequest(String, URI, Map<String, List<String>>, Map<String, String>, Set<NodeIdentifier>");
-        }
-    }
-
-    @Override
-    public NodeResponse applyRequest(final String method, final URI uri, final Object entity, final Map<String, String> headers)
-            throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
-        return applyRequest(method, uri, entity, headers, getNodeIds(Status.CONNECTED));
-  

<TRUNCATED>