You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/16 03:29:22 UTC

[05/51] [partial] incubator-nifi git commit: Reworked overall directory structure to make releasing nifi vs maven plugis easier

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
deleted file mode 100644
index 346e801..0000000
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ /dev/null
@@ -1,3579 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import static java.util.Objects.requireNonNull;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.admin.service.UserService;
-import org.apache.nifi.cluster.BulletinsPayload;
-import org.apache.nifi.cluster.HeartbeatPayload;
-import org.apache.nifi.cluster.protocol.DataFlow;
-import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeBulletins;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.NodeProtocolSender;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
-import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.ConnectableType;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.connectable.Funnel;
-import org.apache.nifi.connectable.LocalPort;
-import org.apache.nifi.connectable.Port;
-import org.apache.nifi.connectable.Position;
-import org.apache.nifi.connectable.Size;
-import org.apache.nifi.connectable.StandardConnection;
-import org.apache.nifi.controller.exception.CommunicationsException;
-import org.apache.nifi.controller.exception.ProcessorInstantiationException;
-import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
-import org.apache.nifi.controller.label.Label;
-import org.apache.nifi.controller.label.StandardLabel;
-import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
-import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
-import org.apache.nifi.controller.repository.ContentRepository;
-import org.apache.nifi.controller.repository.CounterRepository;
-import org.apache.nifi.controller.repository.FlowFileEvent;
-import org.apache.nifi.controller.repository.FlowFileEventRepository;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.QueueProvider;
-import org.apache.nifi.controller.repository.RepositoryRecord;
-import org.apache.nifi.controller.repository.RepositoryStatusReport;
-import org.apache.nifi.controller.repository.StandardCounterRepository;
-import org.apache.nifi.controller.repository.StandardFlowFileRecord;
-import org.apache.nifi.controller.repository.StandardRepositoryRecord;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.claim.ContentDirection;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
-import org.apache.nifi.controller.repository.io.LimitedInputStream;
-import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
-import org.apache.nifi.controller.scheduling.ProcessContextFactory;
-import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
-import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
-import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
-import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.controller.service.StandardControllerServiceProvider;
-import org.apache.nifi.controller.status.ConnectionStatus;
-import org.apache.nifi.controller.status.PortStatus;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.RunStatus;
-import org.apache.nifi.controller.status.TransmissionStatus;
-import org.apache.nifi.controller.status.history.ComponentStatusRepository;
-import org.apache.nifi.controller.status.history.StatusHistoryUtil;
-import org.apache.nifi.controller.tasks.ExpireFlowFiles;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
-import org.apache.nifi.diagnostics.SystemDiagnosticsFactory;
-import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.events.NodeBulletinProcessingStrategy;
-import org.apache.nifi.events.VolatileBulletinRepository;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.framework.security.util.SslContextFactory;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.groups.RemoteProcessGroup;
-import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
-import org.apache.nifi.groups.StandardProcessGroup;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.logging.LogLevel;
-import org.apache.nifi.logging.LogRepository;
-import org.apache.nifi.logging.LogRepositoryFactory;
-import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.logging.ProcessorLogObserver;
-import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.nar.NarCloseable;
-import org.apache.nifi.nar.NarThreadContextClassLoader;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.QueueSize;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.processor.StandardProcessorInitializationContext;
-import org.apache.nifi.processor.StandardValidationContextFactory;
-import org.apache.nifi.processor.annotation.OnAdded;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventRepository;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.remote.RemoteGroupPort;
-import org.apache.nifi.remote.RemoteResourceManager;
-import org.apache.nifi.remote.RemoteSiteListener;
-import org.apache.nifi.remote.RootGroupPort;
-import org.apache.nifi.remote.SocketRemoteSiteListener;
-import org.apache.nifi.remote.StandardRemoteProcessGroup;
-import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
-import org.apache.nifi.remote.StandardRootGroupPort;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
-import org.apache.nifi.reporting.Bulletin;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.EventAccess;
-import org.apache.nifi.reporting.ReportingTask;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.ReflectionUtils;
-import org.apache.nifi.web.api.dto.ConnectableDTO;
-import org.apache.nifi.web.api.dto.ConnectionDTO;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.FunnelDTO;
-import org.apache.nifi.web.api.dto.LabelDTO;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.apache.nifi.web.api.dto.PositionDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.RelationshipDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
-import org.apache.nifi.web.api.dto.TemplateDTO;
-import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.sun.jersey.api.client.ClientHandlerException;
-
-public class FlowController implements EventAccess, ControllerServiceProvider, Heartbeater, QueueProvider {
-
-    // default repository implementations
-    public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
-    public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.FileSystemRepository";
-    public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.VolatileProvenanceRepository";
-    public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.FileSystemSwapManager";
-    public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
-
-    public static final String SCHEDULE_MINIMUM_NANOSECONDS = "flowcontroller.minimum.nanoseconds";
-    public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds";
-    public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
-    public static final int METRICS_RESERVOIR_SIZE = 288;   // 1 day worth of 5-minute captures
-
-    public static final String ROOT_GROUP_ID_ALIAS = "root";
-    public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
-
-    private final AtomicInteger maxTimerDrivenThreads;
-    private final AtomicInteger maxEventDrivenThreads;
-    private final AtomicReference<FlowEngine> timerDrivenEngineRef;
-    private final AtomicReference<FlowEngine> eventDrivenEngineRef;
-
-    private final ContentRepository contentRepository;
-    private final FlowFileRepository flowFileRepository;
-    private final FlowFileEventRepository flowFileEventRepository;
-    private final ProvenanceEventRepository provenanceEventRepository;
-    private final VolatileBulletinRepository bulletinRepository;
-    private final StandardProcessScheduler processScheduler;
-    private final TemplateManager templateManager;
-    private final SnippetManager snippetManager;
-    private final long gracefulShutdownSeconds;
-    private final ExtensionManager extensionManager;
-    private final NiFiProperties properties;
-    private final SSLContext sslContext;
-    private final RemoteSiteListener externalSiteListener;
-    private final AtomicReference<CounterRepository> counterRepositoryRef;
-    private final AtomicBoolean initialized = new AtomicBoolean(false);
-    private final ControllerServiceProvider controllerServiceProvider;
-    private final UserService userService;
-    private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
-    private final ComponentStatusRepository componentStatusRepository;
-    private final long systemStartTime = System.currentTimeMillis();    // time at which the node was started
-    private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
-
-    // The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may
-    // change while the instance is running. We do this because we want to generate heartbeats even if we
-    // are unable to obtain a read lock on the entire FlowController.
-    private final AtomicReference<HeartbeatBean> heartbeatBeanRef = new AtomicReference<>();
-    private final AtomicBoolean heartbeatsSuspended = new AtomicBoolean(false);
-
-    private final Integer remoteInputSocketPort;
-    private final Boolean isSiteToSiteSecure;
-    private Integer clusterManagerRemoteSitePort = null;
-    private Boolean clusterManagerRemoteSiteCommsSecure = null;
-
-    private ProcessGroup rootGroup;
-    private final List<Connectable> startConnectablesAfterInitialization;
-    private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
-
-    /**
-     * true if controller is configured to operate in a clustered environment
-     */
-    private final boolean configuredForClustering;
-
-    /**
-     * the time to wait between heartbeats
-     */
-    private final int heartbeatDelaySeconds;
-
-    /**
-     * The sensitive property string encryptor *
-     */
-    private final StringEncryptor encryptor;
-
-    /**
-     * cluster protocol sender
-     */
-    private final NodeProtocolSender protocolSender;
-
-    private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
-    private final ContentClaimManager contentClaimManager = new StandardContentClaimManager();
-
-    // guarded by rwLock
-    /**
-     * timer to periodically send heartbeats to the cluster
-     */
-    private ScheduledFuture<?> bulletinFuture;
-    private ScheduledFuture<?> heartbeatGeneratorFuture;
-    private ScheduledFuture<?> heartbeatSenderFuture;
-
-    // guarded by FlowController lock
-    /**
-     * timer task to generate heartbeats
-     */
-    private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
-
-    private AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber;
-
-    // guarded by rwLock
-    /**
-     * the node identifier;
-     */
-    private NodeIdentifier nodeId;
-
-    // guarded by rwLock
-    /**
-     * true if controller is connected or trying to connect to the cluster
-     */
-    private boolean clustered;
-    private String clusterManagerDN;
-
-    // guarded by rwLock
-    /**
-     * true if controller is the primary of the cluster
-     */
-    private boolean primary;
-
-    // guarded by rwLock
-    /**
-     * true if connected to a cluster
-     */
-    private boolean connected;
-
-    // guarded by rwLock
-    private String instanceId;
-
-    private volatile boolean shutdown = false;
-
-    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
-    private final Lock readLock = rwLock.readLock();
-    private final Lock writeLock = rwLock.writeLock();
-
-    private FlowFileSwapManager flowFileSwapManager;    // guarded by read/write lock
-
-    private static final Logger LOG = LoggerFactory.getLogger(FlowController.class);
-    private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat");
-
-    public static FlowController createStandaloneInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor) {
-        return new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ false,
-                /* NodeProtocolSender */ null);
-    }
-
-    public static FlowController createClusteredInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final NodeProtocolSender protocolSender) {
-        final FlowController flowController = new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ true,
-                /* NodeProtocolSender */ protocolSender);
-
-        flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.isSiteToSiteSecure());
-
-        return flowController;
-    }
-
-    private FlowController(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final boolean configuredForClustering,
-            final NodeProtocolSender protocolSender) {
-
-        maxTimerDrivenThreads = new AtomicInteger(10);
-        maxEventDrivenThreads = new AtomicInteger(5);
-
-        this.encryptor = encryptor;
-        this.properties = properties;
-        sslContext = SslContextFactory.createSslContext(properties, false);
-        extensionManager = new ExtensionManager();
-        controllerServiceProvider = new StandardControllerServiceProvider();
-
-        timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
-        eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
-
-        final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, contentClaimManager);
-        flowFileRepository = flowFileRepo;
-        flowFileEventRepository = flowFileEventRepo;
-        counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository());
-
-        bulletinRepository = new VolatileBulletinRepository();
-        nodeBulletinSubscriber = new AtomicReference<>();
-
-        try {
-            this.provenanceEventRepository = createProvenanceRepository(properties);
-            this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository));
-
-            this.contentRepository = createContentRepository(properties);
-        } catch (final Exception e) {
-            throw new RuntimeException("Unable to create Provenance Repository", e);
-        }
-
-        processScheduler = new StandardProcessScheduler(this, this, encryptor);
-        eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
-
-        final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
-        processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
-                eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
-
-        final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
-        final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
-        processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
-        processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent);
-        processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent);
-        processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);
-
-        startConnectablesAfterInitialization = new ArrayList<>();
-        startRemoteGroupPortsAfterInitialization = new ArrayList<>();
-        this.userService = userService;
-
-        final String gracefulShutdownSecondsVal = properties.getProperty(GRACEFUL_SHUTDOWN_PERIOD);
-        long shutdownSecs;
-        try {
-            shutdownSecs = Long.parseLong(gracefulShutdownSecondsVal);
-            if (shutdownSecs < 1) {
-                shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS;
-            }
-        } catch (final NumberFormatException nfe) {
-            shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS;
-        }
-        gracefulShutdownSeconds = shutdownSecs;
-
-        remoteInputSocketPort = properties.getRemoteInputPort();
-        isSiteToSiteSecure = properties.isSiteToSiteSecure();
-
-        if (isSiteToSiteSecure && sslContext == null && remoteInputSocketPort != null) {
-            throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured");
-        }
-
-        this.configuredForClustering = configuredForClustering;
-        this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS);
-        this.protocolSender = protocolSender;
-        try {
-            this.templateManager = new TemplateManager(properties.getTemplateDirectory());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-
-        this.snippetManager = new SnippetManager();
-
-        rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), this, processScheduler, properties, encryptor);
-        rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
-        instanceId = UUID.randomUUID().toString();
-
-        if (remoteInputSocketPort == null){
-            LOG.info("Not enabling Site-to-Site functionality because nifi.remote.input.socket.port is not set");
-            externalSiteListener = null;
-        } else if (isSiteToSiteSecure && sslContext == null) {
-            LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
-            externalSiteListener = null;
-        } else {
-            // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
-            RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class);
-            externalSiteListener = new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null);
-            externalSiteListener.setRootGroup(rootGroup);
-        }
-
-        // Determine frequency for obtaining component status snapshots
-        final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
-        long snapshotMillis;
-        try {
-            snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS);
-        } catch (final Exception e) {
-            snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
-        }
-
-        componentStatusRepository = createComponentStatusRepository();
-        timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                componentStatusRepository.capture(getControllerStatus());
-            }
-        }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
-
-        heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
-    }
-
-    private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ContentClaimManager contentClaimManager) {
-        final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
-        if (implementationClassName == null) {
-            throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
-        }
-
-        try {
-            final FlowFileRepository created = NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileRepository.class);
-            synchronized (created) {
-                created.initialize(contentClaimManager);
-            }
-            return created;
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static FlowFileSwapManager createSwapManager(final NiFiProperties properties) {
-        final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, DEFAULT_SWAP_MANAGER_IMPLEMENTATION);
-        if (implementationClassName == null) {
-            return null;
-        }
-
-        try {
-            return NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileSwapManager.class);
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) {
-        return new EventReporter() {
-            @Override
-            public void reportEvent(final Severity severity, final String category, final String message) {
-                final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
-                bulletinRepository.addBulletin(bulletin);
-            }
-        };
-    }
-    
-    public void initializeFlow() throws IOException {
-        writeLock.lock();
-        try {
-            flowFileSwapManager = createSwapManager(properties);
-
-            long maxIdFromSwapFiles = -1L;
-            if (flowFileSwapManager != null) {
-                if (flowFileRepository.isVolatile()) {
-                    flowFileSwapManager.purge();
-                } else {
-                    maxIdFromSwapFiles = flowFileSwapManager.recoverSwappedFlowFiles(this, contentClaimManager);
-                }
-            }
-
-            flowFileRepository.loadFlowFiles(this, maxIdFromSwapFiles + 1);
-
-            // now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the
-            // ContentRepository to purge superfluous files
-            contentRepository.cleanup();
-
-            if (flowFileSwapManager != null) {
-                flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository));
-            }
-
-            if (externalSiteListener != null) {
-                externalSiteListener.start();
-            }
-
-            timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        updateRemoteProcessGroups();
-                    } catch (final Throwable t) {
-                        LOG.warn("Unable to update Remote Process Groups due to " + t);
-                        if (LOG.isDebugEnabled()) {
-                            LOG.warn("", t);
-                        }
-                    }
-                }
-            }, 0L, 30L, TimeUnit.SECONDS);
-
-            initialized.set(true);
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    /**
-     * <p>
-     * Causes any processors that were added to the flow with a 'delayStart'
-     * flag of true to now start
-     * </p>
-     */
-    public void startDelayed() {
-        writeLock.lock();
-        try {
-            LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()));
-            for (final Connectable connectable : startConnectablesAfterInitialization) {
-                if (connectable.getScheduledState() == ScheduledState.DISABLED) {
-                    continue;
-                }
-
-                try {
-                    if (connectable instanceof ProcessorNode) {
-                        connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
-                    } else {
-                        startConnectable(connectable);
-                    }
-                } catch (final Throwable t) {
-                    LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
-                }
-            }
-
-            startConnectablesAfterInitialization.clear();
-
-            int startedTransmitting = 0;
-            for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) {
-                try {
-                    remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
-                    startedTransmitting++;
-                } catch (final Throwable t) {
-                    LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t});
-                }
-            }
-
-            LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting);
-            startRemoteGroupPortsAfterInitialization.clear();
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    private ContentRepository createContentRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
-        final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION);
-        if (implementationClassName == null) {
-            throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
-        }
-
-        try {
-            final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class);
-            synchronized (contentRepo) {
-                contentRepo.initialize(contentClaimManager);
-            }
-            return contentRepo;
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private ProvenanceEventRepository createProvenanceRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
-        final String implementationClassName = properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, DEFAULT_PROVENANCE_REPO_IMPLEMENTATION);
-        if (implementationClassName == null) {
-            throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
-        }
-
-        try {
-            return NarThreadContextClassLoader.createInstance(implementationClassName, ProvenanceEventRepository.class);
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private ComponentStatusRepository createComponentStatusRepository() {
-        final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
-        if (implementationClassName == null) {
-            throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
-        }
-
-        try {
-            return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class);
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Creates a connection between two Connectable objects.
-     *
-     * @param id required ID of the connection
-     * @param name the name of the connection, or <code>null</code> to leave the
-     * connection unnamed
-     * @param source required source
-     * @param destination required destination
-     * @param relationshipNames required collection of relationship names
-     * @return
-     *
-     * @throws NullPointerException if the ID, source, destination, or set of
-     * relationships is null.
-     * @throws IllegalArgumentException if <code>relationships</code> is an
-     * empty collection
-     */
-    public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) {
-        final StandardConnection.Builder builder = new StandardConnection.Builder(processScheduler);
-
-        final List<Relationship> relationships = new ArrayList<>();
-        for (final String relationshipName : requireNonNull(relationshipNames)) {
-            relationships.add(new Relationship.Builder().name(relationshipName).build());
-        }
-
-        return builder.id(requireNonNull(id).intern()).name(name == null ? null : name.intern()).relationships(relationships).source(requireNonNull(source)).destination(destination).build();
-    }
-
-    /**
-     * Creates a new Label
-     *
-     * @param id
-     * @param text
-     * @return
-     * @throws NullPointerException if either argument is null
-     */
-    public Label createLabel(final String id, final String text) {
-        return new StandardLabel(requireNonNull(id).intern(), text);
-    }
-
-    /**
-     * Creates a funnel
-     *
-     * @param id
-     * @return
-     */
-    public Funnel createFunnel(final String id) {
-        return new StandardFunnel(id.intern(), null, processScheduler);
-    }
-
-    /**
-     * Creates a Port to use as an Input Port for a Process Group
-     *
-     * @param id
-     * @param name
-     * @return
-     * @throws NullPointerException if the ID or name is not unique
-     * @throws IllegalStateException if an Input Port already exists with the
-     * same name or id.
-     */
-    public Port createLocalInputPort(String id, String name) {
-        id = requireNonNull(id).intern();
-        name = requireNonNull(name).intern();
-        verifyPortIdDoesNotExist(id);
-        return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, processScheduler);
-    }
-
-    /**
-     * Creates a Port to use as an Output Port for a Process Group
-     *
-     * @param id
-     * @param name
-     * @return
-     * @throws NullPointerException if the ID or name is not unique
-     * @throws IllegalStateException if an Input Port already exists with the
-     * same name or id.
-     */
-    public Port createLocalOutputPort(String id, String name) {
-        id = requireNonNull(id).intern();
-        name = requireNonNull(name).intern();
-        verifyPortIdDoesNotExist(id);
-        return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, processScheduler);
-    }
-
-    /**
-     * Creates a ProcessGroup with the given ID
-     *
-     * @param id
-     * @return
-     * @throws NullPointerException if the argument is null
-     */
-    public ProcessGroup createProcessGroup(final String id) {
-        return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, properties, encryptor);
-    }
-
-    /**
-     * <p>
-     * Creates a new ProcessorNode with the given type and identifier and initializes it invoking the
-     * methods annotated with {@link OnAdded}.
-     * </p>
-     *
-     * @param type
-     * @param id
-     * @return
-     * @throws NullPointerException if either arg is null
-     * @throws ProcessorInstantiationException if the processor cannot be
-     * instantiated for any reason
-     */
-    public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException {
-        return createProcessor(type, id, true);
-    }
-    
-    /**
-     * <p>
-     * Creates a new ProcessorNode with the given type and identifier and optionally initializes it.
-     * </p>
-     *
-     * @param type the fully qualified Processor class name
-     * @param id the unique ID of the Processor
-     * @param firstTimeAdded whether or not this is the first time this Processor is added to the graph. If {@code true},
-     *                       will invoke methods annotated with the {@link OnAdded} annotation.
-     * @return
-     * @throws NullPointerException if either arg is null
-     * @throws ProcessorInstantiationException if the processor cannot be
-     * instantiated for any reason
-     */
-    public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException {
-        id = id.intern();
-        final Processor processor = instantiateProcessor(type, id);
-        final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
-        final ProcessorNode procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider);
-
-        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
-        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
-
-        if ( firstTimeAdded ) {
-            try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
-            } catch (final Exception e) {
-                logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
-                throw new ProcessorLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
-            }
-        }
-
-        return procNode;
-    }
-
-    private Processor instantiateProcessor(final String type, final String identifier) throws ProcessorInstantiationException {
-        Processor processor;
-
-        final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
-        try {
-            final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type);
-            final Class<?> rawClass;
-            if (detectedClassLoaderForType == null) {
-                // try to find from the current class loader
-                rawClass = Class.forName(type);
-            } else {
-                // try to find from the registered classloader for that type
-                rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type));
-            }
-
-            Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
-            final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class);
-            processor = processorClass.newInstance();
-            final ProcessorLog processorLogger = new SimpleProcessLogger(identifier, processor);
-            final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, processorLogger, this);
-            processor.initialize(ctx);
-            return processor;
-        } catch (final Throwable t) {
-            throw new ProcessorInstantiationException(type, t);
-        } finally {
-            if (ctxClassLoader != null) {
-                Thread.currentThread().setContextClassLoader(ctxClassLoader);
-            }
-        }
-    }
-
-    /**
-     * @return the ExtensionManager used for instantiating Processors,
-     * Prioritizers, etc.
-     */
-    public ExtensionManager getExtensionManager() {
-        return extensionManager;
-    }
-
-    public String getInstanceId() {
-        readLock.lock();
-        try {
-            return instanceId;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    /**
-     * Gets the BulletinRepository for storing and retrieving Bulletins.
-     *
-     * @return
-     */
-    public BulletinRepository getBulletinRepository() {
-        return bulletinRepository;
-    }
-
-    public SnippetManager getSnippetManager() {
-        return snippetManager;
-    }
-
-    /**
-     * Creates a Port to use as an Input Port for the root Process Group, which
-     * is used for Site-to-Site communications
-     *
-     * @param id
-     * @param name
-     * @return
-     * @throws NullPointerException if the ID or name is not unique
-     * @throws IllegalStateException if an Input Port already exists with the
-     * same name or id.
-     */
-    public Port createRemoteInputPort(String id, String name) {
-        id = requireNonNull(id).intern();
-        name = requireNonNull(name).intern();
-        verifyPortIdDoesNotExist(id);
-        return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
-    }
-
-    /**
-     * Creates a Port to use as an Output Port for the root Process Group, which
-     * is used for Site-to-Site communications and will queue flow files waiting
-     * to be delivered to remote instances
-     *
-     * @param id
-     * @param name
-     * @return
-     * @throws NullPointerException if the ID or name is not unique
-     * @throws IllegalStateException if an Input Port already exists with the
-     * same name or id.
-     */
-    public Port createRemoteOutputPort(String id, String name) {
-        id = requireNonNull(id).intern();
-        name = requireNonNull(name).intern();
-        verifyPortIdDoesNotExist(id);
-        return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
-    }
-
-    /**
-     * Creates a new Remote Process Group with the given ID that points to the
-     * given URI
-     *
-     * @param id
-     * @param uri
-     * @return
-     *
-     * @throws NullPointerException if either argument is null
-     * @throws IllegalArgumentException if <code>uri</code> is not a valid URI.
-     */
-    public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uri) {
-        return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext);
-    }
-
-    /**
-     * Verifies that no output port exists with the given id or name. If this
-     * does not hold true, throws an IllegalStateException
-     *
-     * @param id
-     * @throws IllegalStateException
-     */
-    private void verifyPortIdDoesNotExist(final String id) {
-        Port port = rootGroup.findOutputPort(id);
-        if (port != null) {
-            throw new IllegalStateException("An Input Port already exists with ID " + id);
-        }
-        port = rootGroup.findInputPort(id);
-        if (port != null) {
-            throw new IllegalStateException("An Input Port already exists with ID " + id);
-        }
-    }
-
-    /**
-     * @return the name of this controller, which is also the name of the Root
-     * Group.
-     */
-    public String getName() {
-        readLock.lock();
-        try {
-            return rootGroup.getName();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    /**
-     * Sets the name for the Root Group, which also changes the name for the
-     * controller.
-     *
-     * @param name
-     */
-    public void setName(final String name) {
-        readLock.lock();
-        try {
-            rootGroup.setName(name);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    /**
-     * Gets the comments of this controller, which is also the comment of the
-     * Root Group.
-     *
-     * @return
-     */
-    public String getComments() {
-        readLock.lock();
-        try {
-            return rootGroup.getComments();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    /**
-     * Sets the comment for the Root Group, which also changes the comment for
-     * the controller.
-     *
-     * @param comments
-     */
-    public void setComments(final String comments) {
-        readLock.lock();
-        try {
-            rootGroup.setComments(comments);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    /**
-     * @return <code>true</code> if the scheduling engine for this controller
-     * has been terminated.
-     */
-    public boolean isTerminated() {
-        this.readLock.lock();
-        try {
-            return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated());
-        } finally {
-            this.readLock.unlock();
-        }
-    }
-
-    /**
-     * Triggers the controller to begin shutdown, stopping all processors and
-     * terminating the scheduling engine. After calling this method, the
-     * {@link #isTerminated()} method will indicate whether or not the shutdown
-     * has finished.
-     *
-     * @param kill if <code>true</code>, attempts to stop all active threads,
-     * but makes no guarantee that this will happen
-     *
-     * @throws IllegalStateException if the controller is already stopped or
-     * currently in the processor of stopping
-     */
-    public void shutdown(final boolean kill) {
-        this.shutdown = true;
-        stopAllProcessors();
-
-        writeLock.lock();
-        try {
-            if (isTerminated() || timerDrivenEngineRef.get().isTerminating()) {
-                throw new IllegalStateException("Controller already stopped or still stopping...");
-            }
-
-            if (kill) {
-                this.timerDrivenEngineRef.get().shutdownNow();
-                this.eventDrivenEngineRef.get().shutdownNow();
-                LOG.info("Initiated immediate shutdown of flow controller...");
-            } else {
-                this.timerDrivenEngineRef.get().shutdown();
-                this.eventDrivenEngineRef.get().shutdown();
-                LOG.info("Initiated graceful shutdown of flow controller...waiting up to " + gracefulShutdownSeconds + " seconds");
-            }
-
-            clusterTaskExecutor.shutdown();
-
-            // Trigger any processors' methods marked with @OnShutdown to be called
-            rootGroup.shutdown();
-
-            try {
-                this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
-                this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
-            } catch (final InterruptedException ie) {
-                LOG.info("Interrupted while waiting for controller termination.");
-            }
-
-            try {
-                flowFileRepository.close();
-            } catch (final Throwable t) {
-                LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[]{t});
-            }
-
-            if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) {
-                LOG.info("Controller has been terminated successfully.");
-            } else {
-                LOG.warn("Controller hasn't terminated properly.  There exists an uninterruptable thread that will take an indeterminate amount of time to stop.  Might need to kill the program manually.");
-            }
-
-            if (externalSiteListener != null) {
-                externalSiteListener.stop();
-            }
-
-            if (flowFileSwapManager != null) {
-                flowFileSwapManager.shutdown();
-            }
-            
-            if ( processScheduler != null ) {
-            	processScheduler.shutdown();
-            }
-            
-            if ( contentRepository != null ) {
-                contentRepository.shutdown();
-            }
-            
-            if ( provenanceEventRepository != null ) {
-            	try {
-            		provenanceEventRepository.close();
-            	} catch (final IOException ioe) {
-            		LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString());
-            		if ( LOG.isDebugEnabled() ) {
-            			LOG.warn("", ioe);
-            		}
-            	}
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    /**
-     * Serializes the current state of the controller to the given OutputStream
-     *
-     * @param serializer
-     * @param os
-     * @throws FlowSerializationException if serialization of the flow fails for
-     * any reason
-     */
-    public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException {
-        readLock.lock();
-        try {
-            serializer.serialize(this, os);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    /**
-     * Synchronizes this controller with the proposed flow.
-     *
-     * For more details, see
-     * {@link FlowSynchronizer#sync(FlowController, DataFlow)}.
-     *
-     * @param synchronizer
-     * @param dataFlow the flow to load the controller with. If the flow is null
-     * or zero length, then the controller must not have a flow or else an
-     * UninheritableFlowException will be thrown.
-     *
-     * @throws FlowSerializationException if proposed flow is not a valid flow
-     * configuration file
-     * @throws UninheritableFlowException if the proposed flow cannot be loaded
-     * by the controller because in doing so would risk orphaning flow files
-     * @throws FlowSynchronizationException if updates to the controller failed.
-     * If this exception is thrown, then the controller should be considered
-     * unsafe to be used
-     */
-    public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow)
-            throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
-        writeLock.lock();
-        try {
-            LOG.debug("Synchronizing controller with proposed flow");
-            synchronizer.sync(this, dataFlow, encryptor);
-            LOG.info("Successfully synchronized controller with proposed flow");
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    /**
-     * @return the currently configured maximum number of threads that can be
-     * used for executing processors at any given time.
-     */
-    public int getMaxTimerDrivenThreadCount() {
-        return maxTimerDrivenThreads.get();
-    }
-
-    public int getMaxEventDrivenThreadCount() {
-        return maxEventDrivenThreads.get();
-    }
-
-    public void setMaxTimerDrivenThreadCount(final int maxThreadCount) {
-        writeLock.lock();
-        try {
-            setMaxThreadCount(maxThreadCount, this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads);
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    public void setMaxEventDrivenThreadCount(final int maxThreadCount) {
-        writeLock.lock();
-        try {
-            setMaxThreadCount(maxThreadCount, this.eventDrivenEngineRef.get(), this.maxEventDrivenThreads);
-            processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN, maxThreadCount);
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    /**
-     * Updates the number of threads that can be simultaneously used for
-     * executing processors.
-     *
-     * @param maxThreadCount
-     *
-     * This method must be called while holding the write lock!
-     */
-    private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) {
-        if (maxThreadCount < 1) {
-            throw new IllegalArgumentException();
-        }
-
-        maxThreads.getAndSet(maxThreadCount);
-        if (null != engine && engine.getCorePoolSize() < maxThreadCount) {
-            engine.setCorePoolSize(maxThreads.intValue());
-        }
-    }
-
-    /**
-     * @return the ID of the root group
-     */
-    public String getRootGroupId() {
-        readLock.lock();
-        try {
-            return rootGroup.getIdentifier();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    /**
-     * Sets the root group to the given group
-     *
-     * @param group the ProcessGroup that is to become the new Root Group
-     *
-     * @throws IllegalArgumentException if the ProcessGroup has a parent
-     * @throws IllegalStateException if the FlowController does not know about
-     * the given process group
-     */
-    void setRootGroup(final ProcessGroup group) {
-        if (requireNonNull(group).getParent() != null) {
-            throw new IllegalArgumentException("A ProcessGroup that has a parent cannot be the Root Group");
-        }
-
-        writeLock.lock();
-        try {
-            rootGroup = group;
-
-            if (externalSiteListener != null) {
-                externalSiteListener.setRootGroup(group);
-            }
-
-            // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    public SystemDiagnostics getSystemDiagnostics() {
-        final SystemDiagnosticsFactory factory = new SystemDiagnosticsFactory();
-        return factory.create(flowFileRepository, contentRepository);
-    }
-
-    //
-    // ProcessGroup access
-    //
-    /**
-     * Updates the process group corresponding to the specified DTO. Any field
-     * in DTO that is <code>null</code> (with the exception of the required ID)
-     * will be ignored.
-     *
-     * @param dto
-     * @return a fully-populated DTO representing the newly updated ProcessGroup
-     * @throws ProcessorInstantiationException
-     *
-     * @throws IllegalStateException if no process group can be found with the
-     * ID of DTO or with the ID of the DTO's parentGroupId, if the template ID
-     * specified is invalid, or if the DTO's Parent Group ID changes but the
-     * parent group has incoming or outgoing connections
-     *
-     * @throws NullPointerException if the DTO or its ID is null
-     */
-    public void updateProcessGroup(final ProcessGroupDTO dto) throws ProcessorInstantiationException {
-        final ProcessGroup group = lookupGroup(requireNonNull(dto).getId());
-
-        final String name = dto.getName();
-        final PositionDTO position = dto.getPosition();
-        final String comments = dto.getComments();
-
-        if (name != null) {
-            group.setName(name);
-        }
-        if (position != null) {
-            group.setPosition(toPosition(position));
-        }
-        if (comments != null) {
-            group.setComments(comments);
-        }
-    }
-
-    //
-    // Template access
-    //
-    /**
-     * Adds a template to this controller. The contents of this template must be
-     * part of the current flow. This is going create a template based on a
-     * snippet of this flow.
-     *
-     * @param dto
-     * @return a copy of the given DTO
-     * @throws IOException if an I/O error occurs when persisting the Template
-     * @throws NullPointerException if the DTO is null
-     * @throws IllegalArgumentException if does not contain all required
-     * information, such as the template name or a processor's configuration
-     * element
-     */
-    public Template addTemplate(final TemplateDTO dto) throws IOException {
-        return templateManager.addTemplate(dto);
-    }
-
-    /**
-     * Removes all templates from this controller
-     *
-     * @throws IOException
-     */
-    public void clearTemplates() throws IOException {
-        templateManager.clear();
-    }
-
-    /**
-     * Imports the specified template into this controller. The contents of this
-     * template may have come from another NiFi instance.
-     *
-     * @param dto
-     * @return
-     * @throws IOException
-     */
-    public Template importTemplate(final TemplateDTO dto) throws IOException {
-        return templateManager.importTemplate(dto);
-    }
-
-    /**
-     * Returns the template with the given ID, or <code>null</code> if no
-     * template exists with the given ID.
-     *
-     * @param id
-     * @return
-     */
-    public Template getTemplate(final String id) {
-        return templateManager.getTemplate(id);
-    }
-
-    public TemplateManager getTemplateManager() {
-        return templateManager;
-    }
-
-    /**
-     * Returns all templates that this controller knows about.
-     *
-     * @return
-     */
-    public Collection<Template> getTemplates() {
-        return templateManager.getTemplates();
-    }
-
-    /**
-     * Removes the template with the given ID.
-     *
-     * @param id the ID of the template to remove
-     * @throws NullPointerException if the argument is null
-     * @throws IllegalStateException if no template exists with the given ID
-     * @throws IOException if template could not be removed
-     */
-    public void removeTemplate(final String id) throws IOException, IllegalStateException {
-        templateManager.removeTemplate(id);
-    }
-
-    private Position toPosition(final PositionDTO dto) {
-        return new Position(dto.getX(), dto.getY());
-    }
-
-    //
-    // Snippet
-    //
-    /**
-     * Creates an instance of the given snippet and adds the components to the
-     * given group
-     *
-     * @param group
-     * @param dto
-     *
-     * @throws NullPointerException if either argument is null
-     * @throws IllegalStateException if the snippet is not valid because a
-     * component in the snippet has an ID that is not unique to this flow, or
-     * because it shares an Input Port or Output Port at the root level whose
-     * name already exists in the given ProcessGroup, or because the Template
-     * contains a Processor or a Prioritizer whose class is not valid within
-     * this instance of NiFi.
-     * @throws ProcessorInstantiationException if unable to instantiate a
-     * processor
-     */
-    public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
-        writeLock.lock();
-        try {
-            validateSnippetContents(requireNonNull(group), dto);
-
-            //
-            // Instantiate the labels
-            //
-            for (final LabelDTO labelDTO : dto.getLabels()) {
-                final Label label = createLabel(labelDTO.getId(), labelDTO.getLabel());
-                label.setPosition(toPosition(labelDTO.getPosition()));
-                if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) {
-                    label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight()));
-                }
-
-                // TODO: Update the label's "style"
-                group.addLabel(label);
-            }
-
-            // 
-            // Instantiate the funnels
-            for (final FunnelDTO funnelDTO : dto.getFunnels()) {
-                final Funnel funnel = createFunnel(funnelDTO.getId());
-                funnel.setPosition(toPosition(funnelDTO.getPosition()));
-                group.addFunnel(funnel);
-            }
-
-            //
-            // Instantiate Input Ports & Output Ports
-            //
-            for (final PortDTO portDTO : dto.getInputPorts()) {
-                final Port inputPort;
-                if (group.isRootGroup()) {
-                    inputPort = createRemoteInputPort(portDTO.getId(), portDTO.getName());
-                    inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
-                    if (portDTO.getGroupAccessControl() != null) {
-                        ((RootGroupPort) inputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
-                    }
-                    if (portDTO.getUserAccessControl() != null) {
-                        ((RootGroupPort) inputPort).setUserAccessControl(portDTO.getUserAccessControl());
-                    }
-                } else {
-                    inputPort = createLocalInputPort(portDTO.getId(), portDTO.getName());
-                }
-
-                inputPort.setPosition(toPosition(portDTO.getPosition()));
-                inputPort.setProcessGroup(group);
-                inputPort.setComments(portDTO.getComments());
-                group.addInputPort(inputPort);
-            }
-
-            for (final PortDTO portDTO : dto.getOutputPorts()) {
-                final Port outputPort;
-                if (group.isRootGroup()) {
-                    outputPort = createRemoteOutputPort(portDTO.getId(), portDTO.getName());
-                    outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
-                    if (portDTO.getGroupAccessControl() != null) {
-                        ((RootGroupPort) outputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
-                    }
-                    if (portDTO.getUserAccessControl() != null) {
-                        ((RootGroupPort) outputPort).setUserAccessControl(portDTO.getUserAccessControl());
-                    }
-                } else {
-                    outputPort = createLocalOutputPort(portDTO.getId(), portDTO.getName());
-                }
-
-                outputPort.setPosition(toPosition(portDTO.getPosition()));
-                outputPort.setProcessGroup(group);
-                outputPort.setComments(portDTO.getComments());
-                group.addOutputPort(outputPort);
-            }
-
-            //
-            // Instantiate the processors
-            //
-            for (final ProcessorDTO processorDTO : dto.getProcessors()) {
-                final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId());
-
-                procNode.setPosition(toPosition(processorDTO.getPosition()));
-                procNode.setProcessGroup(group);
-
-                final ProcessorConfigDTO config = processorDTO.getConfig();
-                procNode.setComments(config.getComments());
-                if (config.isLossTolerant() != null) {
-                    procNode.setLossTolerant(config.isLossTolerant());
-                }
-                procNode.setName(processorDTO.getName());
-
-                procNode.setYieldPeriod(config.getYieldDuration());
-                procNode.setPenalizationPeriod(config.getPenaltyDuration());
-                procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
-                procNode.setAnnotationData(config.getAnnotationData());
-                procNode.setStyle(processorDTO.getStyle());
-
-                if (config.getRunDurationMillis() != null) {
-                    procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS);
-                }
-
-                if (config.getSchedulingStrategy() != null) {
-                    procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
-                }
-
-                // ensure that the scheduling strategy is set prior to these values
-                procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
-                procNode.setScheduldingPeriod(config.getSchedulingPeriod());
-
-                final Set<Relationship> relationships = new HashSet<>();
-                if (processorDTO.getRelationships() != null) {
-                    for (final RelationshipDTO rel : processorDTO.getRelationships()) {
-                        if (rel.isAutoTerminate()) {
-                            relationships.add(procNode.getRelationship(rel.getName()));
-                        }
-                    }
-                    procNode.setAutoTerminatedRelationships(relationships);
-                }
-
-                if (config.getProperties() != null) {
-                    for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
-                        if (entry.getValue() != null) {
-                            procNode.setProperty(entry.getKey(), entry.getValue());
-                        }
-                    }
-                }
-
-                group.addProcessor(procNode);
-            }
-
-            //
-            // Instantiate Remote Process Groups
-            //
-            for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) {
-                final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUri());
-                remoteGroup.setComments(remoteGroupDTO.getComments());
-                remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
-                remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
-                remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration());
-                remoteGroup.setProcessGroup(group);
-
-                // set the input/output ports
-                if (remoteGroupDTO.getContents() != null) {
-                    final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents();
-
-                    // ensure there input ports
-                    if (contents.getInputPorts() != null) {
-                        remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()));
-                    }
-
-                    // ensure there are output ports
-                    if (contents.getOutputPorts() != null) {
-                        remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()));
-                    }
-                }
-
-                group.addRemoteProcessGroup(remoteGroup);
-            }
-
-            // 
-            // Instantiate ProcessGroups
-            //
-            for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) {
-                final ProcessGroup childGroup = createProcessGroup(groupDTO.getId());
-                childGroup.setParent(group);
-                childGroup.setPosition(toPosition(groupDTO.getPosition()));
-                childGroup.setComments(groupDTO.getComments());
-                childGroup.setName(groupDTO.getName());
-                group.addProcessGroup(childGroup);
-
-                final FlowSnippetDTO contents = groupDTO.getContents();
-
-                // we want this to be recursive, so we will create a new template that contains only
-                // the contents of this child group and recursively call ourselves.
-                final FlowSnippetDTO childTemplateDTO = new FlowSnippetDTO();
-                childTemplateDTO.setConnections(contents.getConnections());
-                childTemplateDTO.setInputPorts(contents.getInputPorts());
-                childTemplateDTO.setLabels(contents.getLabels());
-                childTemplateDTO.setOutputPorts(contents.getOutputPorts());
-                childTemplateDTO.setProcessGroups(contents.getProcessGroups());
-                childTemplateDTO.setProcessors(contents.getProcessors());
-                childTemplateDTO.setFunnels(contents.getFunnels());
-                childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups());
-                instantiateSnippet(childGroup, childTemplateDTO);
-            }
-
-            //
-            // Instantiate Connections
-            //
-            for (final ConnectionDTO connectionDTO : dto.getConnections()) {
-                final ConnectableDTO sourceDTO = connectionDTO.getSource();
-                final ConnectableDTO destinationDTO = connectionDTO.getDestination();
-                final Connectable source;
-                final Connectable destination;
-
-                // locate the source and destination connectable. if this is a remote port 
-                // we need to locate the remote process groups. otherwise we need to 
-                // find the connectable given its parent group.
-                // NOTE: (getConnectable returns ANY connectable, when the parent is
-                // not this group only input ports or output ports should be returned. if something 
-                // other than a port is returned, an exception will be thrown when adding the 
-                // connection below.)
-                // see if the source connectable is a remote port
-                if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) {
-                    final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(sourceDTO.getGroupId());
-                    source = remoteGroup.getOutputPort(sourceDTO.getId());
-                } else {
-                    final ProcessGroup sourceGroup = getConnectableParent(group, sourceDTO.getGroupId());
-                    source = sourceGroup.getConnectable(sourceDTO.getId());
-                }
-
-                // see if the destination connectable is a remote port
-                if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDTO.getType())) {
-                    final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(destinationDTO.getGroupId());
-                    destination = remoteGroup.getInputPort(destinationDTO.getId());
-                } else {
-                    final ProcessGroup destinationGroup = getConnectableParent(group, destinationDTO.getGroupId());
-                    destination = destinationGroup.getConnectable(destinationDTO.getId());
-                }
-
-                // determine the selection relationships for this connection
-                final Set<String> relationships = new HashSet<>();
-                if (connectionDTO.getSelectedRelationships() != null) {
-                    relationships.addAll(connectionDTO.getSelectedRelationships());
-                }
-
-                final Connection connection = createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships);
-
-                if (connectionDTO.getBends() != null) {
-                    final List<Position> bendPoints = new ArrayList<>();
-                    for (final PositionDTO bend : connectionDTO.getBends()) {
-                        bendPoints.add(new Position(bend.getX(), bend.getY()));
-                    }
-                    connection.setBendPoints(bendPoints);
-                }
-
-                final FlowFileQueue queue = connection.getFlowFileQueue();
-                queue.setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold());
-                queue.setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold());
-                queue.setFlowFileExpiration(connectionDTO.getFlowFileExpiration());
-
-                final List<String> prioritizers = connectionDTO.getPrioritizers();
-                if (prioritizers != null) {
-                    final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers);
-                    final List<FlowFilePrioritizer> newPrioritizers = new ArrayList<>();
-                    for (final String className : newPrioritizersClasses) {
-                        try {
-                            newPrioritizers.add(createPrioritizer(className));
-                        } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
-                            throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e);
-                        }
-                    }
-                    queue.setPriorities(newPrioritizers);
-                }
-
-                connection.setProcessGroup(group);
-                group.addConnection(connection);
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    /**
-     * Converts a set of ports into a set of remote process group ports.
-     *
-     * @param ports
-     * @return
-     */
-    private Set<RemoteProcessGroupPortDescriptor> convertRemotePort(final Set<RemoteProcessGroupPortDTO> ports) {
-        Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
-        if (ports != null) {
-            remotePorts = new LinkedHashSet<>(ports.size());
-            for (RemoteProcessGroupPortDTO port : ports) {
-                final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
-                descriptor.setId(port.getId());
-                descriptor.setName(port.getName());
-                descriptor.setComments(port.getComments());
-                descriptor.setTargetRunning(port.isTargetRunning());
-                descriptor.setConnected(port.isConnected());
-                descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount());
-                descriptor.setTransmitting(port.isTransmitting());
-                descriptor.setUseCompression(port.getUseCompression());
-                remotePorts.add(descriptor);
-            }
-        }
-        return remotePorts;
-    }
-
-    /**
-     * Returns the parent of the specified Connectable. This only considers this
-     * group and any direct child sub groups.
-     *
-     * @param parentGroupId
-     * @return
-     */
-    private ProcessGroup getConnectableParent(final ProcessGroup group, final String parentGroupId) {
-        if (areGroupsSame(group.getIdentifier(), parentGroupId)) {
-            return group;
-        } else {
-            return group.getProcessGroup(parentGroupId);
-        }
-    }
-
-    /**
-     * <p>
-     * Verifies that the given DTO is valid, according to the following:
-     *
-     * <ul>
-     * <li>None of the ID's in any component of the DTO can be used in this
-     * flow.</li>
-     * <li>The ProcessGroup to which the template's contents will be added must
-     * not contain any InputPort or OutputPort with the same name as one of the
-     * corresponding components in the root level of the template.</li>
-     * <li>All Processors' classes must exist in this instance.</li>
-     * <li>All Flow File Prioritizers' classes must exist in this instance.</li>
-     * </ul>
-     * </p>
-     *
-     * <p>
-     * If any of the above statements does not hold true, an
-     * {@link IllegalStateException} or a
-     * {@link ProcessorInstantiationException} will be thrown.
-     * </p>
-     *
-     * @param group
-     * @param templateContents
-     */
-    private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO templateContents) {
-        // validate the names of Input Ports
-        for (final PortDTO port : templateContents.getInputPorts()) {
-            if (group.getInputPortByName(port.getName()) != null) {
-                throw new IllegalStateException("ProcessGroup already has an Input Port with name " + port.getName());
-            }
-        }
-
-        // validate the names of Output Ports
-        for (final PortDTO port : templateContents.getOutputPorts()) {
-            if (group.getOutputPortByName(port.getName()) != null) {
-                throw new IllegalStateException("ProcessGroup already has an Output Port with name " + port.getName());
-            }
-        }
-
-        // validate that all Processor Types and Prioritizer Types are valid
-        final List<String> processorClasses = new ArrayList<>();
-        for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
-            processorClasses.add(c.getName());
-        }
-        final List<String> prioritizerClasses = new ArrayList<>();
-        for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
-            prioritizerClasses.add(c.getName());
-        }
-
-        final Set<ProcessorDTO> allProcs = new HashSet<>();
-        final Set<ConnectionDTO> allConns = new HashSet<>();
-        allProcs.addAll(templateContents.getProcessors());
-        allConns.addAll(templateContents.getConnections());
-        for (final ProcessGroupDTO childGroup : templateContents.getProcessGroups()) {
-            allProcs.addAll(findAllProcessors(childGroup));
-            allConns.addAll(findAllConnections(childGroup));
-        }
-
-        for (final ProcessorDTO proc : allProcs) {
-            if (!processorClasses.contains(proc.getType())) {
-                throw new IllegalStateException("Invalid Processor Type: " + proc.getType());
-            }
-        }
-
-        for (final ConnectionDTO conn : allConns) {
-            final List<String> prioritizers = conn.getPrioritizers();
-            if (prioritizers != null) {
-                for (final String prioritizer : prioritizers) {
-                    if (!prioritizerClasses.contains(prioritizer)) {
-                        throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Recursively finds all ProcessorDTO's
-     *
-     * @param group
-     * @return
-     */
-    private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) {
-        final Set<ProcessorDTO> procs = new HashSet<>();
-        for (final ProcessorDTO dto : group.getContents().getProcessors()) {
-            procs.add(dto);
-        }
-
-        for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) {
-            procs.addAll(findAllProcessors(childGroup));
-        }
-        return procs;
-    }
-
-    /**
-     * Recursively finds all ConnectionDTO's
-     *
-     * @param group
-     * @return
-     */
-    private Set<ConnectionDTO> findAllConnections(final ProcessGroupDTO group) {
-        final Set<ConnectionDTO> conns = new HashSet<>();
-        for (final ConnectionDTO dto : group.getContents().getConnections()) {
-            conns.add(dto);
-        }
-
-        for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) {
-            conns.addAll(findAllConnections(childGroup));
-        }
-        return conns;
-    }
-
-    //
-    // Processor access
-    //
-    /**
-     * Indicates whether or not the two ID's point to the same ProcessGroup. If
-     * either id is null, will return <code>false</code.
-     *
-     * @param id1
-     * @param id2
-     * @return
-     */
-    public boolean areGroupsSame(final String id1, final String id2) {
-        if (id1 == null || id2 == null) {
-            return false;
-        } else if (id1.equals(id2)) {
-            return true;
-        } else {
-            final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1);
-            final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2);
-            return (comparable1.equals(comparable2));
-        }
-    }
-
-    public FlowFilePrioritizer createPrioritizer(final String type) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
-        FlowFilePrioritizer prioritizer;
-
-        final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
-        try {
-            final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type);
-            final Class<?> rawClass;
-            if (detectedClassLoaderForType == null) {
-                // try to find from the current class loader
-                rawClass = Class.forName(type);
-            } else {
-                // try to find from the registered classloader for that type
-                rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type));
-            }
-
-            Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
-            final Class<? extends FlowFilePrioritizer> prioritizerClass = rawClass.asSubclass(FlowFilePrioritizer.class);
-            final Object processorObj = prioritizerClass.newInstance();
-            prioritizer = prioritizerClass.cast(processorObj);
-
-            return prioritizer;
-        } finally {
-            if (ctxClassLoader != null) {
-                Thread.currentThread().setContextClassLoader(ctxClassLoader);
-            }
-        }
-    }
-
-    //
-    // InputPort access
-    //
-    public PortDTO updateInputPort(final String parentGroupId, final PortDTO dto) {
-        final ProcessGroup parentGroup = lookupGroup(parentGroupId);
-        final Port port = parentGroup.getInputPort(dto.getId());
-        if (port == null) {
-            throw new IllegalStateException("No Input Port with ID " + dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId);
-        }
-
-        final String name = dto.getName();
-        if (dto.getPosition() != null) {
-            port.setPosition(toPosition(dto.getPosition()));
-        }
-
-        if (name != null) {
-            port.setName(name);
-        }
-
-        return createDTO(port);
-    }
-
-    private PortDTO createDTO(final Port port) {
-        if (port == null) {
-            return null;
-        }
-
-        final PortDTO dto = new PortDTO();
-        dto.setId(port.getIdentifier());
-        dto.setPosition(new PositionDTO(port.getPosition().getX(), port.getPosition().getY()));
-        dto.setName(port.getName());
-        dto.setParentGroupId(port.getProcessGroup().getIdentifier());
-
-        return dto;
-    }
-
-    //
-    // OutputPort access
-    //
-    public PortDTO updateOutputPort(final String parentGroupId, final PortDTO dto) {
-        final ProcessGroup parentGroup = lookupGroup(parentGroupId);
-        final Port port = parentGroup.getOutputPort(dto.getId());
-        if (port == null) {
-            throw new IllegalStateException("No Output Port with ID " + dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId);
-        }
-
-        final String name = dto.getName();
-        if (name != null) {
-            port.setName(name);
-        }
-
-        if (dto.getPosition() != null) {
-            port.setPosition(toPosition(dto.getPosition()));
-        }
-
-        return createDTO(port);
-    }
-
-    //
-    // Processor/Prioritizer/Filter Class Access
-    //
-    @SuppressWarnings("rawtypes")
-    public Set<Class> getFlowFileProcessorClasses() {
-        return ExtensionManager.getExtensions(Processor.class);
-    }
-
-    @SuppressWarnings("rawtypes")
-    public Set<Class> getFlowFileComparatorClasses() {
-        return ExtensionManager.getExtensions(FlowFilePrioritizer.class);
-    }
-
-    /**
-     * Returns the ProcessGroup with the given ID
-     *
-     * @param id
-     * @return the process group or null if not group is found
-     */
-    private ProcessGroup lookupGroup(final String id) {
-        final ProcessGroup group = getGroup(id);
-        if (group == null) {
-            throw new IllegalStateException("No Group with ID " + id + " exists");
-        }
-        return group;
-    }
-
-    /**
-     * Returns the ProcessGroup with the given ID
-     *
-     * @param id
-     * @return the process group or null if not group is found
-     */
-    public ProcessGroup getGroup(final String id) {
-        requireNonNull(id);
-        final ProcessGroup root;
-        readLock.lock();
-        try {
-            root = rootGroup;
-        } finally {
-            readLock.unlock();
-        }
-
-        final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
-        return (root == null) ? null : root.findProcessGroup(searchId);
-    }
-
-    @Override
-    public ProcessGroupStatus getControllerStatus() {
-        return getGroupStatus(getRootGroupId());
-    }
-
-    public ProcessGroupStatus getGroupStatus(final String groupId) {
-        return getGroupStatus(groupId, getProcessorStats());
-    }
-
-    public ProcessGroupStatus getGroupStatus(final String groupId, final RepositoryStatusReport statusReport) {
-        final ProcessGroup group = getGroup(groupId);
-        return getGroupStatus(group, statusReport);
-    }
-
-    public ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport) {
-        if (group == null) {
-            return null;
-        }
-
-        final ProcessGroupStatus status = new ProcessGroupStatus();
-        status.setId(group.getIdentifier());
-        status.setName(group.getName());
-        status.setCreationTimestamp(new Date().getTime());
-        int activeGroupThreads = 0;
-        long bytesRead = 0L;
-        long bytesWritten = 0L;
-        int queuedCount = 0;
-        long queuedContentSize = 0L;
-        int flowFilesIn = 0;
-        long bytesIn = 0L;
-        int flowFilesOut = 0;
-        long bytesOut = 0L;
-        int flowFilesReceived = 0;
-        long bytesReceived = 0L;
-        int flowFilesSent = 0;
-        long bytesSent = 0L;
-
-        // set status for processors
-        final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>();
-        status.setProcessorStatus(processorStatusCollection);
-        for (final ProcessorNode procNode : group.getProcessors()) {
-            final ProcessorStatus procStat = getProcessorStatus(statusReport, procNode);
-            processorStatusCollection.add(procStat);
-            activeGroupThreads += procStat.getActiveThreadCount();
-            bytesRead += procStat.getBytesRead();
-            bytesWritten += procStat.getBytesWritten();
-
-            flowFilesReceived += procStat.getFlowFilesReceived();
-            bytesReceived += procStat.getBytesReceived();
-            flowFilesSent += procStat.getFlowFilesSent();
-            bytesSent += procStat.getBytesSent();
-        }
-
-        // set status for local child groups     
-        final Collection<ProcessGroupStatus> localChildGroupStatusCollection = new ArrayList<>();
-        status.setProcessGroupStatus(localChildGroupStatusCollection);
-        for (final ProcessGroup childGroup : group.getProcessGroups()) {
-            final ProcessGroupStatus childGroupStatus = getGroupStatus(childGroup, statusReport);
-            localChildGroupStatusCollection.add(childGroupStatus);
-            activeGroupThreads += childGroupStatus.getActiveThreadCount();
-            bytesRead += childGroupStatus.getBytesRead();
-            bytesWritten += childGroupStatus.getBytesWritten();
-            queuedCount += childGroupStatus.getQueuedCount();
-            queuedContentSize += childGroupStatus.getQueuedContentSize();
-
-            flowFilesReceived += childGroupStatus.getFlowFilesReceived();
-            bytesReceived += childGroupStatus.getBytesReceived();
-            flowFilesSent += childGroupStatus.getFlowFilesSent();
-            bytesSent += childGroupStatus.getBytesSent();
-        }
-
-        // set status for remote child groups
-        final Collection<RemoteProcessGroupStatus> remoteProcessGroupStatusCollection = new ArrayList<>();
-        status.setRemoteProcessGroupStatus(remoteProcessGroupStatusCollection);
-        for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
-            final RemoteProcessGroupStatus remoteStatus = createRemoteGroupStatus(remoteGroup, statusReport);
-            if (remoteStatus != null) {
-                remoteProcessGroupStatusCollection.add(remoteStatus);
-
-                flowFilesReceived += remoteStatus.getReceivedCount();
-                bytesReceived += remoteStatus.getReceivedContentSize();
-                flowFilesSent += remoteStatus.getSentCo

<TRUNCATED>