You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/22 18:05:03 UTC

[74/79] [abbrv] incubator-nifi git commit: NIFI-6: Rebase from develop to include renaming of directory structure

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 0000000,346e801..1b7a3c0
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@@ -1,0 -1,3579 +1,3643 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller;
+ 
+ import static java.util.Objects.requireNonNull;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.text.DateFormat;
+ import java.text.SimpleDateFormat;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashSet;
+ import java.util.LinkedHashSet;
+ import java.util.List;
+ import java.util.Locale;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ScheduledFuture;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ 
+ import javax.net.ssl.SSLContext;
+ 
+ import org.apache.nifi.admin.service.UserService;
++import org.apache.nifi.annotation.lifecycle.OnAdded;
++import org.apache.nifi.annotation.lifecycle.OnRemoved;
+ import org.apache.nifi.cluster.BulletinsPayload;
+ import org.apache.nifi.cluster.HeartbeatPayload;
+ import org.apache.nifi.cluster.protocol.DataFlow;
+ import org.apache.nifi.cluster.protocol.Heartbeat;
+ import org.apache.nifi.cluster.protocol.NodeBulletins;
+ import org.apache.nifi.cluster.protocol.NodeIdentifier;
+ import org.apache.nifi.cluster.protocol.NodeProtocolSender;
+ import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+ import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+ import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.connectable.ConnectableType;
+ import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.connectable.Funnel;
+ import org.apache.nifi.connectable.LocalPort;
+ import org.apache.nifi.connectable.Port;
+ import org.apache.nifi.connectable.Position;
+ import org.apache.nifi.connectable.Size;
+ import org.apache.nifi.connectable.StandardConnection;
+ import org.apache.nifi.controller.exception.CommunicationsException;
+ import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+ import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+ import org.apache.nifi.controller.label.Label;
+ import org.apache.nifi.controller.label.StandardLabel;
+ import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+ import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
+ import org.apache.nifi.controller.repository.ContentRepository;
+ import org.apache.nifi.controller.repository.CounterRepository;
+ import org.apache.nifi.controller.repository.FlowFileEvent;
+ import org.apache.nifi.controller.repository.FlowFileEventRepository;
+ import org.apache.nifi.controller.repository.FlowFileRecord;
+ import org.apache.nifi.controller.repository.FlowFileRepository;
+ import org.apache.nifi.controller.repository.FlowFileSwapManager;
+ import org.apache.nifi.controller.repository.QueueProvider;
+ import org.apache.nifi.controller.repository.RepositoryRecord;
+ import org.apache.nifi.controller.repository.RepositoryStatusReport;
+ import org.apache.nifi.controller.repository.StandardCounterRepository;
+ import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+ import org.apache.nifi.controller.repository.StandardRepositoryRecord;
+ import org.apache.nifi.controller.repository.claim.ContentClaim;
+ import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+ import org.apache.nifi.controller.repository.claim.ContentDirection;
+ import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+ import org.apache.nifi.controller.repository.io.LimitedInputStream;
+ import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
+ import org.apache.nifi.controller.scheduling.ProcessContextFactory;
+ import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
+ import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
+ import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
+ import org.apache.nifi.controller.service.ControllerServiceNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+ import org.apache.nifi.controller.status.ConnectionStatus;
+ import org.apache.nifi.controller.status.PortStatus;
+ import org.apache.nifi.controller.status.ProcessGroupStatus;
+ import org.apache.nifi.controller.status.ProcessorStatus;
+ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+ import org.apache.nifi.controller.status.RunStatus;
+ import org.apache.nifi.controller.status.TransmissionStatus;
+ import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+ import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+ import org.apache.nifi.controller.tasks.ExpireFlowFiles;
+ import org.apache.nifi.diagnostics.SystemDiagnostics;
+ import org.apache.nifi.diagnostics.SystemDiagnosticsFactory;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.engine.FlowEngine;
+ import org.apache.nifi.events.BulletinFactory;
+ import org.apache.nifi.events.EventReporter;
+ import org.apache.nifi.events.NodeBulletinProcessingStrategy;
+ import org.apache.nifi.events.VolatileBulletinRepository;
+ import org.apache.nifi.flowfile.FlowFilePrioritizer;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.framework.security.util.SslContextFactory;
+ import org.apache.nifi.groups.ProcessGroup;
+ import org.apache.nifi.groups.RemoteProcessGroup;
+ import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
+ import org.apache.nifi.groups.StandardProcessGroup;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.LogLevel;
+ import org.apache.nifi.logging.LogRepository;
+ import org.apache.nifi.logging.LogRepositoryFactory;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.logging.ProcessorLogObserver;
+ import org.apache.nifi.nar.ExtensionManager;
++import org.apache.nifi.nar.NarClassLoader;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.nar.NarThreadContextClassLoader;
+ import org.apache.nifi.processor.Processor;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.QueueSize;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.SimpleProcessLogger;
+ import org.apache.nifi.processor.StandardProcessorInitializationContext;
+ import org.apache.nifi.processor.StandardValidationContextFactory;
 -import org.apache.nifi.processor.annotation.OnAdded;
+ import org.apache.nifi.provenance.ProvenanceEventRecord;
+ import org.apache.nifi.provenance.ProvenanceEventRepository;
+ import org.apache.nifi.provenance.ProvenanceEventType;
+ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+ import org.apache.nifi.remote.RemoteGroupPort;
+ import org.apache.nifi.remote.RemoteResourceManager;
+ import org.apache.nifi.remote.RemoteSiteListener;
+ import org.apache.nifi.remote.RootGroupPort;
+ import org.apache.nifi.remote.SocketRemoteSiteListener;
+ import org.apache.nifi.remote.StandardRemoteProcessGroup;
+ import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
+ import org.apache.nifi.remote.StandardRootGroupPort;
+ import org.apache.nifi.remote.TransferDirection;
+ import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
+ import org.apache.nifi.reporting.Bulletin;
+ import org.apache.nifi.reporting.BulletinRepository;
+ import org.apache.nifi.reporting.EventAccess;
+ import org.apache.nifi.reporting.ReportingTask;
+ import org.apache.nifi.reporting.Severity;
+ import org.apache.nifi.scheduling.SchedulingStrategy;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.NiFiProperties;
+ import org.apache.nifi.util.ReflectionUtils;
+ import org.apache.nifi.web.api.dto.ConnectableDTO;
+ import org.apache.nifi.web.api.dto.ConnectionDTO;
+ import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+ import org.apache.nifi.web.api.dto.FunnelDTO;
+ import org.apache.nifi.web.api.dto.LabelDTO;
+ import org.apache.nifi.web.api.dto.PortDTO;
+ import org.apache.nifi.web.api.dto.PositionDTO;
+ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+ import org.apache.nifi.web.api.dto.ProcessorDTO;
+ import org.apache.nifi.web.api.dto.RelationshipDTO;
+ import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
+ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+ import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+ import org.apache.nifi.web.api.dto.TemplateDTO;
+ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+ import org.apache.commons.lang3.StringUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.sun.jersey.api.client.ClientHandlerException;
+ 
+ public class FlowController implements EventAccess, ControllerServiceProvider, Heartbeater, QueueProvider {
+ 
+     // default repository implementations
+     public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
+     public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.FileSystemRepository";
+     public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.VolatileProvenanceRepository";
+     public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.FileSystemSwapManager";
+     public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
+ 
+     public static final String SCHEDULE_MINIMUM_NANOSECONDS = "flowcontroller.minimum.nanoseconds";
+     public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds";
+     public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
+     public static final int METRICS_RESERVOIR_SIZE = 288;   // 1 day worth of 5-minute captures
+ 
+     public static final String ROOT_GROUP_ID_ALIAS = "root";
+     public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
+ 
+     private final AtomicInteger maxTimerDrivenThreads;
+     private final AtomicInteger maxEventDrivenThreads;
+     private final AtomicReference<FlowEngine> timerDrivenEngineRef;
+     private final AtomicReference<FlowEngine> eventDrivenEngineRef;
+ 
+     private final ContentRepository contentRepository;
+     private final FlowFileRepository flowFileRepository;
+     private final FlowFileEventRepository flowFileEventRepository;
+     private final ProvenanceEventRepository provenanceEventRepository;
+     private final VolatileBulletinRepository bulletinRepository;
+     private final StandardProcessScheduler processScheduler;
+     private final TemplateManager templateManager;
+     private final SnippetManager snippetManager;
+     private final long gracefulShutdownSeconds;
+     private final ExtensionManager extensionManager;
+     private final NiFiProperties properties;
+     private final SSLContext sslContext;
+     private final RemoteSiteListener externalSiteListener;
+     private final AtomicReference<CounterRepository> counterRepositoryRef;
+     private final AtomicBoolean initialized = new AtomicBoolean(false);
+     private final ControllerServiceProvider controllerServiceProvider;
+     private final UserService userService;
+     private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
+     private final ComponentStatusRepository componentStatusRepository;
+     private final long systemStartTime = System.currentTimeMillis();    // time at which the node was started
+     private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
+ 
+     // The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may
+     // change while the instance is running. We do this because we want to generate heartbeats even if we
+     // are unable to obtain a read lock on the entire FlowController.
+     private final AtomicReference<HeartbeatBean> heartbeatBeanRef = new AtomicReference<>();
+     private final AtomicBoolean heartbeatsSuspended = new AtomicBoolean(false);
+ 
+     private final Integer remoteInputSocketPort;
+     private final Boolean isSiteToSiteSecure;
+     private Integer clusterManagerRemoteSitePort = null;
+     private Boolean clusterManagerRemoteSiteCommsSecure = null;
+ 
+     private ProcessGroup rootGroup;
+     private final List<Connectable> startConnectablesAfterInitialization;
+     private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
+ 
+     /**
+      * true if controller is configured to operate in a clustered environment
+      */
+     private final boolean configuredForClustering;
+ 
+     /**
+      * the time to wait between heartbeats
+      */
+     private final int heartbeatDelaySeconds;
+ 
+     /**
+      * The sensitive property string encryptor *
+      */
+     private final StringEncryptor encryptor;
+ 
+     /**
+      * cluster protocol sender
+      */
+     private final NodeProtocolSender protocolSender;
+ 
+     private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
+     private final ContentClaimManager contentClaimManager = new StandardContentClaimManager();
+ 
+     // guarded by rwLock
+     /**
+      * timer to periodically send heartbeats to the cluster
+      */
+     private ScheduledFuture<?> bulletinFuture;
+     private ScheduledFuture<?> heartbeatGeneratorFuture;
+     private ScheduledFuture<?> heartbeatSenderFuture;
+ 
+     // guarded by FlowController lock
+     /**
+      * timer task to generate heartbeats
+      */
+     private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
+ 
+     private AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber;
+ 
+     // guarded by rwLock
+     /**
+      * the node identifier;
+      */
+     private NodeIdentifier nodeId;
+ 
+     // guarded by rwLock
+     /**
+      * true if controller is connected or trying to connect to the cluster
+      */
+     private boolean clustered;
+     private String clusterManagerDN;
+ 
+     // guarded by rwLock
+     /**
+      * true if controller is the primary of the cluster
+      */
+     private boolean primary;
+ 
+     // guarded by rwLock
+     /**
+      * true if connected to a cluster
+      */
+     private boolean connected;
+ 
+     // guarded by rwLock
+     private String instanceId;
+ 
+     private volatile boolean shutdown = false;
+ 
+     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+     private final Lock readLock = rwLock.readLock();
+     private final Lock writeLock = rwLock.writeLock();
+ 
+     private FlowFileSwapManager flowFileSwapManager;    // guarded by read/write lock
+ 
+     private static final Logger LOG = LoggerFactory.getLogger(FlowController.class);
+     private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat");
+ 
+     public static FlowController createStandaloneInstance(
+             final FlowFileEventRepository flowFileEventRepo,
+             final NiFiProperties properties,
+             final UserService userService,
+             final StringEncryptor encryptor) {
+         return new FlowController(
+                 flowFileEventRepo,
+                 properties,
+                 userService,
+                 encryptor,
+                 /* configuredForClustering */ false,
+                 /* NodeProtocolSender */ null);
+     }
+ 
+     public static FlowController createClusteredInstance(
+             final FlowFileEventRepository flowFileEventRepo,
+             final NiFiProperties properties,
+             final UserService userService,
+             final StringEncryptor encryptor,
+             final NodeProtocolSender protocolSender) {
+         final FlowController flowController = new FlowController(
+                 flowFileEventRepo,
+                 properties,
+                 userService,
+                 encryptor,
+                 /* configuredForClustering */ true,
+                 /* NodeProtocolSender */ protocolSender);
+ 
+         flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.isSiteToSiteSecure());
+ 
+         return flowController;
+     }
+ 
+     private FlowController(
+             final FlowFileEventRepository flowFileEventRepo,
+             final NiFiProperties properties,
+             final UserService userService,
+             final StringEncryptor encryptor,
+             final boolean configuredForClustering,
+             final NodeProtocolSender protocolSender) {
+ 
+         maxTimerDrivenThreads = new AtomicInteger(10);
+         maxEventDrivenThreads = new AtomicInteger(5);
+ 
+         this.encryptor = encryptor;
+         this.properties = properties;
+         sslContext = SslContextFactory.createSslContext(properties, false);
+         extensionManager = new ExtensionManager();
+         controllerServiceProvider = new StandardControllerServiceProvider();
+ 
+         timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
+         eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
+ 
+         final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, contentClaimManager);
+         flowFileRepository = flowFileRepo;
+         flowFileEventRepository = flowFileEventRepo;
+         counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository());
+ 
+         bulletinRepository = new VolatileBulletinRepository();
+         nodeBulletinSubscriber = new AtomicReference<>();
+ 
+         try {
+             this.provenanceEventRepository = createProvenanceRepository(properties);
+             this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository));
+ 
+             this.contentRepository = createContentRepository(properties);
+         } catch (final Exception e) {
+             throw new RuntimeException("Unable to create Provenance Repository", e);
+         }
+ 
+         processScheduler = new StandardProcessScheduler(this, this, encryptor);
+         eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
+ 
+         final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
+         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
+                 eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
+ 
+         final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
+         final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
+         processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
+         processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent);
+         processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent);
+         processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);
+ 
+         startConnectablesAfterInitialization = new ArrayList<>();
+         startRemoteGroupPortsAfterInitialization = new ArrayList<>();
+         this.userService = userService;
+ 
+         final String gracefulShutdownSecondsVal = properties.getProperty(GRACEFUL_SHUTDOWN_PERIOD);
+         long shutdownSecs;
+         try {
+             shutdownSecs = Long.parseLong(gracefulShutdownSecondsVal);
+             if (shutdownSecs < 1) {
+                 shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS;
+             }
+         } catch (final NumberFormatException nfe) {
+             shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS;
+         }
+         gracefulShutdownSeconds = shutdownSecs;
+ 
+         remoteInputSocketPort = properties.getRemoteInputPort();
+         isSiteToSiteSecure = properties.isSiteToSiteSecure();
+ 
+         if (isSiteToSiteSecure && sslContext == null && remoteInputSocketPort != null) {
+             throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured");
+         }
+ 
+         this.configuredForClustering = configuredForClustering;
+         this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS);
+         this.protocolSender = protocolSender;
+         try {
+             this.templateManager = new TemplateManager(properties.getTemplateDirectory());
+         } catch (IOException e) {
+             throw new RuntimeException(e);
+         }
+ 
+         this.snippetManager = new SnippetManager();
+ 
+         rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), this, processScheduler, properties, encryptor);
+         rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
+         instanceId = UUID.randomUUID().toString();
+ 
+         if (remoteInputSocketPort == null){
+             LOG.info("Not enabling Site-to-Site functionality because nifi.remote.input.socket.port is not set");
+             externalSiteListener = null;
+         } else if (isSiteToSiteSecure && sslContext == null) {
+             LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
+             externalSiteListener = null;
+         } else {
+             // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
+             RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class);
+             externalSiteListener = new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null);
+             externalSiteListener.setRootGroup(rootGroup);
+         }
+ 
+         // Determine frequency for obtaining component status snapshots
+         final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
+         long snapshotMillis;
+         try {
+             snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS);
+         } catch (final Exception e) {
+             snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
+         }
+ 
+         componentStatusRepository = createComponentStatusRepository();
+         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
+             @Override
+             public void run() {
+                 componentStatusRepository.capture(getControllerStatus());
+             }
+         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
+ 
+         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
+     }
+ 
+     private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ContentClaimManager contentClaimManager) {
+         final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
+         if (implementationClassName == null) {
+             throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
+                     + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
+         }
+ 
+         try {
+             final FlowFileRepository created = NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileRepository.class);
+             synchronized (created) {
+                 created.initialize(contentClaimManager);
+             }
+             return created;
+         } catch (final Exception e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     private static FlowFileSwapManager createSwapManager(final NiFiProperties properties) {
+         final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, DEFAULT_SWAP_MANAGER_IMPLEMENTATION);
+         if (implementationClassName == null) {
+             return null;
+         }
+ 
+         try {
+             return NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileSwapManager.class);
+         } catch (final Exception e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) {
+         return new EventReporter() {
+             @Override
+             public void reportEvent(final Severity severity, final String category, final String message) {
+                 final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
+                 bulletinRepository.addBulletin(bulletin);
+             }
+         };
+     }
+     
+     public void initializeFlow() throws IOException {
+         writeLock.lock();
+         try {
+             flowFileSwapManager = createSwapManager(properties);
+ 
+             long maxIdFromSwapFiles = -1L;
+             if (flowFileSwapManager != null) {
+                 if (flowFileRepository.isVolatile()) {
+                     flowFileSwapManager.purge();
+                 } else {
+                     maxIdFromSwapFiles = flowFileSwapManager.recoverSwappedFlowFiles(this, contentClaimManager);
+                 }
+             }
+ 
+             flowFileRepository.loadFlowFiles(this, maxIdFromSwapFiles + 1);
+ 
+             // now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the
+             // ContentRepository to purge superfluous files
+             contentRepository.cleanup();
+ 
+             if (flowFileSwapManager != null) {
+                 flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository));
+             }
+ 
+             if (externalSiteListener != null) {
+                 externalSiteListener.start();
+             }
+ 
+             timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
+                 @Override
+                 public void run() {
+                     try {
+                         updateRemoteProcessGroups();
+                     } catch (final Throwable t) {
+                         LOG.warn("Unable to update Remote Process Groups due to " + t);
+                         if (LOG.isDebugEnabled()) {
+                             LOG.warn("", t);
+                         }
+                     }
+                 }
+             }, 0L, 30L, TimeUnit.SECONDS);
+ 
+             initialized.set(true);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * <p>
+      * Causes any processors that were added to the flow with a 'delayStart'
+      * flag of true to now start
+      * </p>
+      */
+     public void startDelayed() {
+         writeLock.lock();
+         try {
+             LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()));
+             for (final Connectable connectable : startConnectablesAfterInitialization) {
+                 if (connectable.getScheduledState() == ScheduledState.DISABLED) {
+                     continue;
+                 }
+ 
+                 try {
+                     if (connectable instanceof ProcessorNode) {
+                         connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
+                     } else {
+                         startConnectable(connectable);
+                     }
+                 } catch (final Throwable t) {
+                     LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
+                 }
+             }
+ 
+             startConnectablesAfterInitialization.clear();
+ 
+             int startedTransmitting = 0;
+             for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) {
+                 try {
+                     remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
+                     startedTransmitting++;
+                 } catch (final Throwable t) {
+                     LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t});
+                 }
+             }
+ 
+             LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting);
+             startRemoteGroupPortsAfterInitialization.clear();
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     private ContentRepository createContentRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+         final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION);
+         if (implementationClassName == null) {
+             throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
+                     + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
+         }
+ 
+         try {
+             final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class);
+             synchronized (contentRepo) {
+                 contentRepo.initialize(contentClaimManager);
+             }
+             return contentRepo;
+         } catch (final Exception e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     private ProvenanceEventRepository createProvenanceRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+         final String implementationClassName = properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, DEFAULT_PROVENANCE_REPO_IMPLEMENTATION);
+         if (implementationClassName == null) {
+             throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
+                     + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
+         }
+ 
+         try {
+             return NarThreadContextClassLoader.createInstance(implementationClassName, ProvenanceEventRepository.class);
+         } catch (final Exception e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     private ComponentStatusRepository createComponentStatusRepository() {
+         final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
+         if (implementationClassName == null) {
+             throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
+                     + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+         }
+ 
+         try {
+             return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class);
+         } catch (final Exception e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     /**
+      * Creates a connection between two Connectable objects.
+      *
+      * @param id required ID of the connection
+      * @param name the name of the connection, or <code>null</code> to leave the
+      * connection unnamed
+      * @param source required source
+      * @param destination required destination
+      * @param relationshipNames required collection of relationship names
+      * @return
+      *
+      * @throws NullPointerException if the ID, source, destination, or set of
+      * relationships is null.
+      * @throws IllegalArgumentException if <code>relationships</code> is an
+      * empty collection
+      */
+     public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) {
+         final StandardConnection.Builder builder = new StandardConnection.Builder(processScheduler);
+ 
+         final List<Relationship> relationships = new ArrayList<>();
+         for (final String relationshipName : requireNonNull(relationshipNames)) {
+             relationships.add(new Relationship.Builder().name(relationshipName).build());
+         }
+ 
+         return builder.id(requireNonNull(id).intern()).name(name == null ? null : name.intern()).relationships(relationships).source(requireNonNull(source)).destination(destination).build();
+     }
+ 
+     /**
+      * Creates a new Label
+      *
+      * @param id
+      * @param text
+      * @return
+      * @throws NullPointerException if either argument is null
+      */
+     public Label createLabel(final String id, final String text) {
+         return new StandardLabel(requireNonNull(id).intern(), text);
+     }
+ 
+     /**
+      * Creates a funnel
+      *
+      * @param id
+      * @return
+      */
+     public Funnel createFunnel(final String id) {
+         return new StandardFunnel(id.intern(), null, processScheduler);
+     }
+ 
+     /**
+      * Creates a Port to use as an Input Port for a Process Group
+      *
+      * @param id
+      * @param name
+      * @return
+      * @throws NullPointerException if the ID or name is not unique
+      * @throws IllegalStateException if an Input Port already exists with the
+      * same name or id.
+      */
+     public Port createLocalInputPort(String id, String name) {
+         id = requireNonNull(id).intern();
+         name = requireNonNull(name).intern();
+         verifyPortIdDoesNotExist(id);
+         return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, processScheduler);
+     }
+ 
+     /**
+      * Creates a Port to use as an Output Port for a Process Group
+      *
+      * @param id
+      * @param name
+      * @return
+      * @throws NullPointerException if the ID or name is not unique
+      * @throws IllegalStateException if an Input Port already exists with the
+      * same name or id.
+      */
+     public Port createLocalOutputPort(String id, String name) {
+         id = requireNonNull(id).intern();
+         name = requireNonNull(name).intern();
+         verifyPortIdDoesNotExist(id);
+         return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, processScheduler);
+     }
+ 
+     /**
+      * Creates a ProcessGroup with the given ID
+      *
+      * @param id
+      * @return
+      * @throws NullPointerException if the argument is null
+      */
+     public ProcessGroup createProcessGroup(final String id) {
+         return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, properties, encryptor);
+     }
+ 
+     /**
+      * <p>
+      * Creates a new ProcessorNode with the given type and identifier and initializes it invoking the
+      * methods annotated with {@link OnAdded}.
+      * </p>
+      *
+      * @param type
+      * @param id
+      * @return
+      * @throws NullPointerException if either arg is null
+      * @throws ProcessorInstantiationException if the processor cannot be
+      * instantiated for any reason
+      */
+     public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException {
+         return createProcessor(type, id, true);
+     }
+     
+     /**
+      * <p>
+      * Creates a new ProcessorNode with the given type and identifier and optionally initializes it.
+      * </p>
+      *
+      * @param type the fully qualified Processor class name
+      * @param id the unique ID of the Processor
+      * @param firstTimeAdded whether or not this is the first time this Processor is added to the graph. If {@code true},
+      *                       will invoke methods annotated with the {@link OnAdded} annotation.
+      * @return
+      * @throws NullPointerException if either arg is null
+      * @throws ProcessorInstantiationException if the processor cannot be
+      * instantiated for any reason
+      */
++    @SuppressWarnings("deprecation")
+     public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException {
+         id = id.intern();
+         final Processor processor = instantiateProcessor(type, id);
+         final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
+         final ProcessorNode procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider);
+ 
+         final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+         logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
+ 
+         if ( firstTimeAdded ) {
+             try (final NarCloseable x = NarCloseable.withNarLoader()) {
 -                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
++                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, org.apache.nifi.processor.annotation.OnAdded.class, processor);
+             } catch (final Exception e) {
+                 logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
+                 throw new ProcessorLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
+             }
+         }
+ 
+         return procNode;
+     }
+ 
+     private Processor instantiateProcessor(final String type, final String identifier) throws ProcessorInstantiationException {
+         Processor processor;
+ 
+         final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
+         try {
+             final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type);
+             final Class<?> rawClass;
+             if (detectedClassLoaderForType == null) {
+                 // try to find from the current class loader
+                 rawClass = Class.forName(type);
+             } else {
+                 // try to find from the registered classloader for that type
+                 rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type));
+             }
+ 
+             Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+             final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class);
+             processor = processorClass.newInstance();
+             final ProcessorLog processorLogger = new SimpleProcessLogger(identifier, processor);
+             final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, processorLogger, this);
+             processor.initialize(ctx);
+             return processor;
+         } catch (final Throwable t) {
+             throw new ProcessorInstantiationException(type, t);
+         } finally {
+             if (ctxClassLoader != null) {
+                 Thread.currentThread().setContextClassLoader(ctxClassLoader);
+             }
+         }
+     }
+ 
+     /**
+      * @return the ExtensionManager used for instantiating Processors,
+      * Prioritizers, etc.
+      */
+     public ExtensionManager getExtensionManager() {
+         return extensionManager;
+     }
+ 
+     public String getInstanceId() {
+         readLock.lock();
+         try {
+             return instanceId;
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Gets the BulletinRepository for storing and retrieving Bulletins.
+      *
+      * @return
+      */
+     public BulletinRepository getBulletinRepository() {
+         return bulletinRepository;
+     }
+ 
+     public SnippetManager getSnippetManager() {
+         return snippetManager;
+     }
+ 
+     /**
+      * Creates a Port to use as an Input Port for the root Process Group, which
+      * is used for Site-to-Site communications
+      *
+      * @param id
+      * @param name
+      * @return
+      * @throws NullPointerException if the ID or name is not unique
+      * @throws IllegalStateException if an Input Port already exists with the
+      * same name or id.
+      */
+     public Port createRemoteInputPort(String id, String name) {
+         id = requireNonNull(id).intern();
+         name = requireNonNull(name).intern();
+         verifyPortIdDoesNotExist(id);
+         return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
+     }
+ 
+     /**
+      * Creates a Port to use as an Output Port for the root Process Group, which
+      * is used for Site-to-Site communications and will queue flow files waiting
+      * to be delivered to remote instances
+      *
+      * @param id
+      * @param name
+      * @return
+      * @throws NullPointerException if the ID or name is not unique
+      * @throws IllegalStateException if an Input Port already exists with the
+      * same name or id.
+      */
+     public Port createRemoteOutputPort(String id, String name) {
+         id = requireNonNull(id).intern();
+         name = requireNonNull(name).intern();
+         verifyPortIdDoesNotExist(id);
+         return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
+     }
+ 
+     /**
+      * Creates a new Remote Process Group with the given ID that points to the
+      * given URI
+      *
+      * @param id
+      * @param uri
+      * @return
+      *
+      * @throws NullPointerException if either argument is null
+      * @throws IllegalArgumentException if <code>uri</code> is not a valid URI.
+      */
+     public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uri) {
+         return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext);
+     }
+ 
+     /**
+      * Verifies that no output port exists with the given id or name. If this
+      * does not hold true, throws an IllegalStateException
+      *
+      * @param id
+      * @throws IllegalStateException
+      */
+     private void verifyPortIdDoesNotExist(final String id) {
+         Port port = rootGroup.findOutputPort(id);
+         if (port != null) {
+             throw new IllegalStateException("An Input Port already exists with ID " + id);
+         }
+         port = rootGroup.findInputPort(id);
+         if (port != null) {
+             throw new IllegalStateException("An Input Port already exists with ID " + id);
+         }
+     }
+ 
+     /**
+      * @return the name of this controller, which is also the name of the Root
+      * Group.
+      */
+     public String getName() {
+         readLock.lock();
+         try {
+             return rootGroup.getName();
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Sets the name for the Root Group, which also changes the name for the
+      * controller.
+      *
+      * @param name
+      */
+     public void setName(final String name) {
+         readLock.lock();
+         try {
+             rootGroup.setName(name);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Gets the comments of this controller, which is also the comment of the
+      * Root Group.
+      *
+      * @return
+      */
+     public String getComments() {
+         readLock.lock();
+         try {
+             return rootGroup.getComments();
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Sets the comment for the Root Group, which also changes the comment for
+      * the controller.
+      *
+      * @param comments
+      */
+     public void setComments(final String comments) {
+         readLock.lock();
+         try {
+             rootGroup.setComments(comments);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * @return <code>true</code> if the scheduling engine for this controller
+      * has been terminated.
+      */
+     public boolean isTerminated() {
+         this.readLock.lock();
+         try {
+             return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated());
+         } finally {
+             this.readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Triggers the controller to begin shutdown, stopping all processors and
+      * terminating the scheduling engine. After calling this method, the
+      * {@link #isTerminated()} method will indicate whether or not the shutdown
+      * has finished.
+      *
+      * @param kill if <code>true</code>, attempts to stop all active threads,
+      * but makes no guarantee that this will happen
+      *
+      * @throws IllegalStateException if the controller is already stopped or
+      * currently in the processor of stopping
+      */
+     public void shutdown(final boolean kill) {
+         this.shutdown = true;
+         stopAllProcessors();
+ 
+         writeLock.lock();
+         try {
+             if (isTerminated() || timerDrivenEngineRef.get().isTerminating()) {
+                 throw new IllegalStateException("Controller already stopped or still stopping...");
+             }
+ 
+             if (kill) {
+                 this.timerDrivenEngineRef.get().shutdownNow();
+                 this.eventDrivenEngineRef.get().shutdownNow();
+                 LOG.info("Initiated immediate shutdown of flow controller...");
+             } else {
+                 this.timerDrivenEngineRef.get().shutdown();
+                 this.eventDrivenEngineRef.get().shutdown();
+                 LOG.info("Initiated graceful shutdown of flow controller...waiting up to " + gracefulShutdownSeconds + " seconds");
+             }
+ 
+             clusterTaskExecutor.shutdown();
+ 
+             // Trigger any processors' methods marked with @OnShutdown to be called
+             rootGroup.shutdown();
+ 
+             try {
+                 this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
+                 this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
+             } catch (final InterruptedException ie) {
+                 LOG.info("Interrupted while waiting for controller termination.");
+             }
+ 
+             try {
+                 flowFileRepository.close();
+             } catch (final Throwable t) {
+                 LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[]{t});
+             }
+ 
+             if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) {
+                 LOG.info("Controller has been terminated successfully.");
+             } else {
+                 LOG.warn("Controller hasn't terminated properly.  There exists an uninterruptable thread that will take an indeterminate amount of time to stop.  Might need to kill the program manually.");
+             }
+ 
+             if (externalSiteListener != null) {
+                 externalSiteListener.stop();
+             }
+ 
+             if (flowFileSwapManager != null) {
+                 flowFileSwapManager.shutdown();
+             }
+             
+             if ( processScheduler != null ) {
+             	processScheduler.shutdown();
+             }
+             
+             if ( contentRepository != null ) {
+                 contentRepository.shutdown();
+             }
+             
+             if ( provenanceEventRepository != null ) {
+             	try {
+             		provenanceEventRepository.close();
+             	} catch (final IOException ioe) {
+             		LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString());
+             		if ( LOG.isDebugEnabled() ) {
+             			LOG.warn("", ioe);
+             		}
+             	}
+             }
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * Serializes the current state of the controller to the given OutputStream
+      *
+      * @param serializer
+      * @param os
+      * @throws FlowSerializationException if serialization of the flow fails for
+      * any reason
+      */
+     public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException {
+         readLock.lock();
+         try {
+             serializer.serialize(this, os);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Synchronizes this controller with the proposed flow.
+      *
+      * For more details, see
+      * {@link FlowSynchronizer#sync(FlowController, DataFlow)}.
+      *
+      * @param synchronizer
+      * @param dataFlow the flow to load the controller with. If the flow is null
+      * or zero length, then the controller must not have a flow or else an
+      * UninheritableFlowException will be thrown.
+      *
+      * @throws FlowSerializationException if proposed flow is not a valid flow
+      * configuration file
+      * @throws UninheritableFlowException if the proposed flow cannot be loaded
+      * by the controller because in doing so would risk orphaning flow files
+      * @throws FlowSynchronizationException if updates to the controller failed.
+      * If this exception is thrown, then the controller should be considered
+      * unsafe to be used
+      */
+     public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow)
+             throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
+         writeLock.lock();
+         try {
+             LOG.debug("Synchronizing controller with proposed flow");
+             synchronizer.sync(this, dataFlow, encryptor);
+             LOG.info("Successfully synchronized controller with proposed flow");
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * @return the currently configured maximum number of threads that can be
+      * used for executing processors at any given time.
+      */
+     public int getMaxTimerDrivenThreadCount() {
+         return maxTimerDrivenThreads.get();
+     }
+ 
+     public int getMaxEventDrivenThreadCount() {
+         return maxEventDrivenThreads.get();
+     }
+ 
+     public void setMaxTimerDrivenThreadCount(final int maxThreadCount) {
+         writeLock.lock();
+         try {
+             setMaxThreadCount(maxThreadCount, this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     public void setMaxEventDrivenThreadCount(final int maxThreadCount) {
+         writeLock.lock();
+         try {
+             setMaxThreadCount(maxThreadCount, this.eventDrivenEngineRef.get(), this.maxEventDrivenThreads);
+             processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN, maxThreadCount);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * Updates the number of threads that can be simultaneously used for
+      * executing processors.
+      *
+      * @param maxThreadCount
+      *
+      * This method must be called while holding the write lock!
+      */
+     private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) {
+         if (maxThreadCount < 1) {
+             throw new IllegalArgumentException();
+         }
+ 
+         maxThreads.getAndSet(maxThreadCount);
+         if (null != engine && engine.getCorePoolSize() < maxThreadCount) {
+             engine.setCorePoolSize(maxThreads.intValue());
+         }
+     }
+ 
+     /**
+      * @return the ID of the root group
+      */
+     public String getRootGroupId() {
+         readLock.lock();
+         try {
+             return rootGroup.getIdentifier();
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Sets the root group to the given group
+      *
+      * @param group the ProcessGroup that is to become the new Root Group
+      *
+      * @throws IllegalArgumentException if the ProcessGroup has a parent
+      * @throws IllegalStateException if the FlowController does not know about
+      * the given process group
+      */
+     void setRootGroup(final ProcessGroup group) {
+         if (requireNonNull(group).getParent() != null) {
+             throw new IllegalArgumentException("A ProcessGroup that has a parent cannot be the Root Group");
+         }
+ 
+         writeLock.lock();
+         try {
+             rootGroup = group;
+ 
+             if (externalSiteListener != null) {
+                 externalSiteListener.setRootGroup(group);
+             }
+ 
+             // update the heartbeat bean
+             this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     public SystemDiagnostics getSystemDiagnostics() {
+         final SystemDiagnosticsFactory factory = new SystemDiagnosticsFactory();
+         return factory.create(flowFileRepository, contentRepository);
+     }
+ 
+     //
+     // ProcessGroup access
+     //
+     /**
+      * Updates the process group corresponding to the specified DTO. Any field
+      * in DTO that is <code>null</code> (with the exception of the required ID)
+      * will be ignored.
+      *
+      * @param dto
+      * @return a fully-populated DTO representing the newly updated ProcessGroup
+      * @throws ProcessorInstantiationException
+      *
+      * @throws IllegalStateException if no process group can be found with the
+      * ID of DTO or with the ID of the DTO's parentGroupId, if the template ID
+      * specified is invalid, or if the DTO's Parent Group ID changes but the
+      * parent group has incoming or outgoing connections
+      *
+      * @throws NullPointerException if the DTO or its ID is null
+      */
+     public void updateProcessGroup(final ProcessGroupDTO dto) throws ProcessorInstantiationException {
+         final ProcessGroup group = lookupGroup(requireNonNull(dto).getId());
+ 
+         final String name = dto.getName();
+         final PositionDTO position = dto.getPosition();
+         final String comments = dto.getComments();
+ 
+         if (name != null) {
+             group.setName(name);
+         }
+         if (position != null) {
+             group.setPosition(toPosition(position));
+         }
+         if (comments != null) {
+             group.setComments(comments);
+         }
+     }
+ 
+     //
+     // Template access
+     //
+     /**
+      * Adds a template to this controller. The contents of this template must be
+      * part of the current flow. This is going create a template based on a
+      * snippet of this flow.
+      *
+      * @param dto
+      * @return a copy of the given DTO
+      * @throws IOException if an I/O error occurs when persisting the Template
+      * @throws NullPointerException if the DTO is null
+      * @throws IllegalArgumentException if does not contain all required
+      * information, such as the template name or a processor's configuration
+      * element
+      */
+     public Template addTemplate(final TemplateDTO dto) throws IOException {
+         return templateManager.addTemplate(dto);
+     }
+ 
+     /**
+      * Removes all templates from this controller
+      *
+      * @throws IOException
+      */
+     public void clearTemplates() throws IOException {
+         templateManager.clear();
+     }
+ 
+     /**
+      * Imports the specified template into this controller. The contents of this
+      * template may have come from another NiFi instance.
+      *
+      * @param dto
+      * @return
+      * @throws IOException
+      */
+     public Template importTemplate(final TemplateDTO dto) throws IOException {
+         return templateManager.importTemplate(dto);
+     }
+ 
+     /**
+      * Returns the template with the given ID, or <code>null</code> if no
+      * template exists with the given ID.
+      *
+      * @param id
+      * @return
+      */
+     public Template getTemplate(final String id) {
+         return templateManager.getTemplate(id);
+     }
+ 
+     public TemplateManager getTemplateManager() {
+         return templateManager;
+     }
+ 
+     /**
+      * Returns all templates that this controller knows about.
+      *
+      * @return
+      */
+     public Collection<Template> getTemplates() {
+         return templateManager.getTemplates();
+     }
+ 
+     /**
+      * Removes the template with the given ID.
+      *
+      * @param id the ID of the template to remove
+      * @throws NullPointerException if the argument is null
+      * @throws IllegalStateException if no template exists with the given ID
+      * @throws IOException if template could not be removed
+      */
+     public void removeTemplate(final String id) throws IOException, IllegalStateException {
+         templateManager.removeTemplate(id);
+     }
+ 
+     private Position toPosition(final PositionDTO dto) {
+         return new Position(dto.getX(), dto.getY());
+     }
+ 
+     //
+     // Snippet
+     //
+     /**
+      * Creates an instance of the given snippet and adds the components to the
+      * given group
+      *
+      * @param group
+      * @param dto
+      *
+      * @throws NullPointerException if either argument is null
+      * @throws IllegalStateException if the snippet is not valid because a
+      * component in the snippet has an ID that is not unique to this flow, or
+      * because it shares an Input Port or Output Port at the root level whose
+      * name already exists in the given ProcessGroup, or because the Template
+      * contains a Processor or a Prioritizer whose class is not valid within
+      * this instance of NiFi.
+      * @throws ProcessorInstantiationException if unable to instantiate a
+      * processor
+      */
+     public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
+         writeLock.lock();
+         try {
+             validateSnippetContents(requireNonNull(group), dto);
+ 
+             //
+             // Instantiate the labels
+             //
+             for (final LabelDTO labelDTO : dto.getLabels()) {
+                 final Label label = createLabel(labelDTO.getId(), labelDTO.getLabel());
+                 label.setPosition(toPosition(labelDTO.getPosition()));
+                 if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) {
+                     label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight()));
+                 }
+ 
+                 // TODO: Update the label's "style"
+                 group.addLabel(label);
+             }
+ 
+             // 
+             // Instantiate the funnels
+             for (final FunnelDTO funnelDTO : dto.getFunnels()) {
+                 final Funnel funnel = createFunnel(funnelDTO.getId());
+                 funnel.setPosition(toPosition(funnelDTO.getPosition()));
+                 group.addFunnel(funnel);
+             }
+ 
+             //
+             // Instantiate Input Ports & Output Ports
+             //
+             for (final PortDTO portDTO : dto.getInputPorts()) {
+                 final Port inputPort;
+                 if (group.isRootGroup()) {
+                     inputPort = createRemoteInputPort(portDTO.getId(), portDTO.getName());
+                     inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
+                     if (portDTO.getGroupAccessControl() != null) {
+                         ((RootGroupPort) inputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
+                     }
+                     if (portDTO.getUserAccessControl() != null) {
+                         ((RootGroupPort) inputPort).setUserAccessControl(portDTO.getUserAccessControl());
+                     }
+                 } else {
+                     inputPort = createLocalInputPort(portDTO.getId(), portDTO.getName());
+                 }
+ 
+                 inputPort.setPosition(toPosition(portDTO.getPosition()));
+                 inputPort.setProcessGroup(group);
+                 inputPort.setComments(portDTO.getComments());
+                 group.addInputPort(inputPort);
+             }
+ 
+             for (final PortDTO portDTO : dto.getOutputPorts()) {
+                 final Port outputPort;
+                 if (group.isRootGroup()) {
+                     outputPort = createRemoteOutputPort(portDTO.getId(), portDTO.getName());
+                     outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
+                     if (portDTO.getGroupAccessControl() != null) {
+                         ((RootGroupPort) outputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
+                     }
+                     if (portDTO.getUserAccessControl() != null) {
+                         ((RootGroupPort) outputPort).setUserAccessControl(portDTO.getUserAccessControl());
+                     }
+                 } else {
+                     outputPort = createLocalOutputPort(portDTO.getId(), portDTO.getName());
+                 }
+ 
+                 outputPort.setPosition(toPosition(portDTO.getPosition()));
+                 outputPort.setProcessGroup(group);
+                 outputPort.setComments(portDTO.getComments());
+                 group.addOutputPort(outputPort);
+             }
+ 
+             //
+             // Instantiate the processors
+             //
+             for (final ProcessorDTO processorDTO : dto.getProcessors()) {
+                 final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId());
+ 
+                 procNode.setPosition(toPosition(processorDTO.getPosition()));
+                 procNode.setProcessGroup(group);
+ 
+                 final ProcessorConfigDTO config = processorDTO.getConfig();
+                 procNode.setComments(config.getComments());
+                 if (config.isLossTolerant() != null) {
+                     procNode.setLossTolerant(config.isLossTolerant());
+                 }
+                 procNode.setName(processorDTO.getName());
+ 
+                 procNode.setYieldPeriod(config.getYieldDuration());
+                 procNode.setPenalizationPeriod(config.getPenaltyDuration());
+                 procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
+                 procNode.setAnnotationData(config.getAnnotationData());
+                 procNode.setStyle(processorDTO.getStyle());
+ 
+                 if (config.getRunDurationMillis() != null) {
+                     procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS);
+                 }
+ 
+                 if (config.getSchedulingStrategy() != null) {
+                     procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
+                 }
+ 
+                 // ensure that the scheduling strategy is set prior to these values
+                 procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
+                 procNode.setScheduldingPeriod(config.getSchedulingPeriod());
+ 
+                 final Set<Relationship> relationships = new HashSet<>();
+                 if (processorDTO.getRelationships() != null) {
+                     for (final RelationshipDTO rel : processorDTO.getRelationships()) {
+                         if (rel.isAutoTerminate()) {
+                             relationships.add(procNode.getRelationship(rel.getName()));
+                         }
+                     }
+                     procNode.setAutoTerminatedRelationships(relationships);
+                 }
+ 
+                 if (config.getProperties() != null) {
+                     for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
+                         if (entry.getValue() != null) {
+                             procNode.setProperty(entry.getKey(), entry.getValue());
+                         }
+                     }
+                 }
+ 
+                 group.addProcessor(procNode);
+             }
+ 
+             //
+             // Instantiate Remote Process Groups
+             //
+             for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) {
+                 final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUri());
+                 remoteGroup.setComments(remoteGroupDTO.getComments());
+                 remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
+                 remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
+                 remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration());
+                 remoteGroup.setProcessGroup(group);
+ 
+                 // set the input/output ports
+                 if (remoteGroupDTO.getContents() != null) {
+                     final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents();
+ 
+                     // ensure there input ports
+                     if (contents.getInputPorts() != null) {
+                         remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()));
+                     }
+ 
+                     // ensure there are output ports
+                     if (contents.getOutputPorts() != null) {
+                         remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()));
+                     }
+                 }
+ 
+                 group.addRemoteProcessGroup(remoteGroup);
+             }
+ 
+             // 
+             // Instantiate ProcessGroups
+             //
+             for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) {
+                 final ProcessGroup childGroup = createProcessGroup(groupDTO.getId());
+                 childGroup.setParent(group);
+                 childGroup.setPosition(toPosition(groupDTO.getPosition()));
+                 childGroup.setComments(groupDTO.getComments());
+                 childGroup.setName(groupDTO.getName());
+                 group.addProcessGroup(childGroup);
+ 
+                 final FlowSnippetDTO contents = groupDTO.getContents();
+ 
+                 // we want this to be recursive, so we will create a new template that contains only
+                 // the contents of this child group and recursively call ourselves.
+                 final FlowSnippetDTO childTemplateDTO = new FlowSnippetDTO();
+                 childTemplateDTO.setConnections(contents.getConnections());
+                 childTemplateDTO.setInputPorts(contents.getInputPorts());
+                 childTemplateDTO.setLabels(contents.getLabels());
+                 childTemplateDTO.setOutputPorts(contents.getOutputPorts());
+                 childTemplateDTO.setProcessGroups(contents.getProcessGroups());
+                 childTemplateDTO.setProcessors(contents.getProcessors());
+                 childTemplateDTO.setFunnels(contents.getFunnels());
+                 childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups());
+                 instantiateSnippet(childGroup, childTemplateDTO);
+             }
+ 
+             //
+             // Instantiate Connections
+             //
+             for (final ConnectionDTO connectionDTO : dto.getConnections()) {
+                 final ConnectableDTO sourceDTO = connectionDTO.getSource();
+                 final ConnectableDTO destinationDTO = connectionDTO.getDestination();
+                 final Connectable source;
+                 final Connectable destination;
+ 
+                 // locate the source and destination connectable. if this is a remote port 
+                 // we need to locate the remote process groups. otherwise we need to 
+                 // find the connectable given its parent group.
+                 // NOTE: (getConnectable returns ANY connectable, when the parent is
+                 // not this group only input ports or output ports should be returned. if something 
+                 // other than a port is returned, an exception will be thrown when adding the 
+                 // connection below.)
+                 // see if the source connectable is a remote port
+                 if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) {
+                     final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(sourceDTO.getGroupId());
+                     source = remoteGroup.getOutputPort(sourceDTO.getId());
+                 } else {
+                     final ProcessGroup sourceGroup = getConnectableParent(group, sourceDTO.getGroupId());
+                     source = sourceGroup.getConnectable(sourceDTO.getId());
+                 }
+ 
+                 // see if the destination connectable is a remote port
+                 if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDTO.getType())) {
+                     final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(destinationDTO.getGroupId());
+                     destination = remoteGroup.getInputPort(destinationDTO.getId());
+                 } else {
+                     final ProcessGroup destinationGroup = getConnectableParent(group, destinationDTO.getGroupId());
+                     destination = destinationGroup.getConnectable(destinationDTO.getId());
+                 }
+ 
+                 // determine the selection relationships for this connection
+                 final Set<String> relationships = new HashSet<>();
+                 if (connectionDTO.getSelectedRelationships() != null) {
+                     relationships.addAll(connectionDTO.getSelectedRelationships());
+                 }
+ 
+                 final Connection connection = createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships);
+ 
+                 if (connectionDTO.getBends() != null) {
+                     final List<Position> bendPoints = new ArrayList<>();
+                     for (final PositionDTO bend : connectionDTO.getBends()) {
+                         bendPoints.add(new Position(bend.getX(), bend.getY()));
+                     }
+                     connection.setBendPoints(bendPoints);
+                 }
+ 
+                 final FlowFileQueue queue = connection.getFlowFileQueue();
+                 queue.setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold());
+                 queue.setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold());
+                 queue.setFlowFileExpiration(connectionDTO.getFlowFileExpiration());
+ 
+                 final List<String> prioritizers = connectionDTO.getPrioritizers();
+                 if (prioritizers != null) {
+                     final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers);
+                     final List<FlowFilePrioritizer> newPrioritizers = new ArrayList<>();
+                     for (final String className : newPrioritizersClasses) {
+                         try {
+                             newPrioritizers.add(createPrioritizer(className));
+                         } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+                             throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e);
+                         }
+                     }
+                     queue.setPriorities(newPrioritizers);
+                 }
+ 
+                 connection.setProcessGroup(group);
+                 group.addConnection(connection);
+             }
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * Converts a set of ports into a set of remote process group ports.
+      *
+      * @param ports
+      * @return
+      */
+     private Set<RemoteProcessGroupPortDescriptor> convertRemotePort(final Set<RemoteProcessGroupPortDTO> ports) {
+         Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
+         if (ports != null) {
+             remotePorts = new LinkedHashSet<>(ports.size());
+             for (RemoteProcessGroupPortDTO port : ports) {
+                 final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
+                 descriptor.setId(port.getId());
+                 descriptor.setName(port.getName());
+                 descriptor.setComments(port.getComments());
+                 descriptor.setTargetRunning(port.isTargetRunning());
+                 descriptor.setConnected(port.isConnected());
+                 descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount());
+                 descriptor.setTransmitting(port.isTransmitting());
+                 descriptor.setUseCompression(port.getUseCompression());
+                 remotePorts.add(descriptor);
+             }
+         }
+         return remotePorts;
+     }
+ 
+     /**
+      * Returns the parent of the specified Connectable. This only considers this
+      * group and any direct child sub groups.
+      *
+      * @param parentGroupId
+      * @return
+      */
+     private ProcessGroup getConnectableParent(final ProcessGroup group, final String parentGroupId) {
+         if (areGroupsSame(group.getIdentifier(), parentGroupId)) {
+             return group;
+         } else {
+             return group.getProcessGroup(parentGroupId);
+         }
+     }
+ 
+     /**
+      * <p>
+      * Verifies that the given DTO is valid, according to the following:
+      *
+      * <ul>
+      * <li>None of the ID's in any component of the DTO can be used in this
+      * flow.</li>
+      * <li>The ProcessGroup to which the template's contents will be added must
+      * not contain any InputPort or OutputPort with the same name as one of the
+      * corresponding components in the root level of the template.</li>
+      * <li>All Processors' classes must exist in this instance.</li>
+      * <li>All Flow File Prioritizers' classes must exist in this instance.</li>
+      * </ul>
+      * </p>
+      *
+      * <p>
+      * If any of the above statements does not hold true, an
+      * {@link IllegalStateException} or a
+      * {@link ProcessorInstantiationException} will be thrown.
+      * </p>
+      *
+      * @param group
+      * @param templateContents
+      */
+     private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO templateContents) {
+         // validate the names of Input Ports
+         for (final PortDTO port : templateContents.getInputPorts()) {
+             if (group.getInputPortByName(port.getName()) != null) {
+                 throw new IllegalStateException("ProcessGroup already has an Input Port with name " + port.getName());
+             }
+         }
+ 
+         // validate the names of Output Ports
+         for (final PortDTO port : templateContents.getOutputPorts()) {
+             if (group.getOutputPortByName(port.getName()) != null) {
+                 throw new IllegalStateException("ProcessGroup already has an Output Port with name " + port.getName());
+             }
+         }
+ 
+         // validate that all Processor Types and Prioritizer Types are valid
+         final List<String> processorClasses = new ArrayList<>();
+         for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
+             processorClasses.add(c.getName());
+         }
+         final List<String> prioritizerClasses = new ArrayList<>();
+         for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
+             prioritizerClasses.add(c.getName());
+         }
+ 
+         final Set<ProcessorDTO> allProcs = new HashSet<>();
+         final Set<ConnectionDTO> allConns = new HashSet<>();
+         allProcs.addAll(templateContents.getProcessors());
+         allConns.addAll(templateContents.getConnections());
+         for (final ProcessGroupDTO childGroup : templateContents.getProcessGroups()) {
+             allProcs.addAll(findAllProcessors(childGroup));
+             allConns.addAll(findAllConnections(childGroup));
+         }
+ 
+         for (final ProcessorDTO proc : allProcs) {
+             if (!processorClasses.contains(proc.getType())) {
+                 throw new IllegalStateException("Invalid Processor Type: " + proc.getType());
+             }
+         }
+ 
+         for (final ConnectionDTO conn : allConns) {
+             final List<String> prioritizers = conn.getPrioritizers();
+             if (prioritizers != null) {
+                 for (final String prioritizer : prioritizers) {
+                     if (!prioritizerClasses.contains(prioritizer)) {
+                         throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer);
+                     }
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * Recursively finds all ProcessorDTO's
+      *
+      * @param group
+      * @return
+      */
+     private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) {
+         final Set<ProcessorDTO> procs = new HashSet<>();
+         for (final ProcessorDTO dto : group.getContents().getProcessors()) {
+             procs.add(dto);
+         }
+ 
+         for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) {
+             procs.addAll(findAllProcessors(childGroup));
+         }
+         return procs;
+     }
+ 
+     /**
+      * Recursively finds all ConnectionDTO's
+      *
+      * @param group
+      * @return
+      */
+     private Set<ConnectionDTO> findAllConnections(final ProcessGroupDTO group) {
+         final Set<ConnectionDTO> conns = new HashSet<>();
+         for (final ConnectionDTO dto : group.getContents().getConnections()) {
+             conns.add(dto);
+         }
+ 
+         for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) {
+             conns.addAll(findAllConnections(childGroup));
+         }
+         return conns;
+     }
+ 
+     //
+     // Processor access
+     //
+     /**
+      * Indicates whether or not the two ID's point to the same ProcessGroup. If
+      * either id is null, will return <code>false</code.
+      *
+      * @param id1
+      * @param id2
+      * @return
+      */
+     public boolean areGroupsSame(final String id1, final String id2) {
+         if (id1 == null || id2 == null) {
+             return false;
+         } else if (id1.equals(id2)) {
+             return true;
+         } else {
+             final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1);
+             final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2);
+             return (comparable1.equals(comparable2));
+         }
+     }
+ 
+     public FlowFilePrioritizer createPrioritizer(final String type) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+         FlowFilePrioritizer prioritizer;
+ 
+         final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
+         try {
+             final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type);
+             final Class<?> rawClass;
+             if (detectedClassLoaderForType == null) {
+                 // try to find from the current class loader
+                 rawClass = Class.forName(type);
+             } else {
+                 // try to find from the registered classloader for that type
+                 rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type));
+             }
+ 
+             Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+             final Class<? extends FlowFilePrioritizer> prioritizerClass = rawClass.asSubclass(FlowFilePrioritizer.class);
+             final Object processorObj = prioritizerClass.newInstance();
+             prioritizer = prioritizerClass.cast(processorObj);
+ 
+             return prioritizer;
+         } finally {
+             if (ctxClassLoader != null) {
+                 Thread.currentThread().setContextClassLoader(ctxClassLoader);
+             }
+         }
+     }
+ 
+     //
+     // InputPort access
+     //
+     public PortDTO updateInputPort(final String parentGroupId, final PortDTO dto) {
+         final ProcessGroup parentGroup = lookupGroup(parentGroupId);
+         final Port port = parentGroup.getInputPort(dto.getId());
+         if (port == null) {
+             throw new IllegalStateException("No Input Port with ID " + dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId);
+         }
+ 
+         final String name = dto.getName();
+         if (dto.getPosition() != null) {
+             port.setPosition(toPosition(dto.getPosition()));
+         }
+ 
+         if (name != null) {
+             port.setName(name);
+         }
+ 
+         return createDTO(port);
+     }
+ 
+     private PortDTO createDTO(final Port port) {
+         if (port == null) {
+             return null;
+         }
+ 
+         final PortDTO dto = new PortDTO();
+         dto.setId(port.getIdentifier());
+         dto.setPosition(new PositionDTO(port.getPosition().getX(), port.getPosition().getY()));
+         dto.setName(port.getName());
+         dto.setParentGroupId(port.getProcessGroup().getIdentifier());
+ 
+         return dto;
+     }
+ 
+     //
+     // OutputPort access
+     //
+     public PortDTO updateOutputPort(final String parentGroupId, final PortDTO dto) {
+         final ProcessGroup parentGroup = lookupGroup(parentGroupId);
+         final Port port = parentGroup.getOutputPort(dto.getId());
+         if (port == null) {
+             throw new IllegalStateException("No Output Port with ID " + dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId);
+         }
+ 
+         final String name = dto.getName();
+         if (name != null) {
+             port.setName(name);
+         }
+ 
+         if (dto.getPosition() != null) {
+             port.setPosition(toPosition(dto.getPosition()));
+         }
+ 
+         return createDTO(port);
+     }
+ 
+     //
+     // Processor/Prioritizer/Filter Class Access
+     //
+     @SuppressWarnings("rawtypes")
+     public Set<Class> getFlowFileProcessorClasses() {
+         return ExtensionManager.getExtensions(Processor.class);
+     }
+ 
+     @SuppressWarnings("rawtypes")
+     public Set<Class> getFlowFileComparatorClasses() {
+         return ExtensionManager.getExtensions(FlowFilePrioritizer.class);
+     }
+ 
+     /**
+      * Returns the ProcessGroup with the given ID
+      *
+      * @param id
+      * @return the process group or null if not group is found
+      */
+     private ProcessGroup lookupGroup(final String id) {
+         final ProcessGroup group = getGroup(id);
+         if (group == null) {
+             throw new IllegalStateException("No Group with ID " + id + " exists");
+         }
+         return group;
+     }
+ 
+     /**
+      * Returns the ProcessGroup with the given ID
+      *
+      * @param id
+      * @return the process group or null if not group is found
+      */
+     public ProcessGroup getGroup(final String id) {
+         requireNonNull(id);
+         final ProcessGroup root;
+         readLock.lock();
+         try {
+             root = rootGroup;
+         } finally {
+             readLock.unlock();
+         }
+ 
+         final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
+         return (root == null) ? null : root.findProcessGroup(searchId);
+     }
+ 
+     @Override
+     public ProcessGroupStatus getControllerStatus() {
+         return getGroupStatus(getRootGroupId());
+     }
+ 
+     public ProcessGroupStatus getGroupStatus(final String groupId) {
+         return getGroupStatus(groupId, getProcessorStats());
+     }
+ 
+     public ProcessGroupStatus getGroupStatus(final String groupId, final RepositoryStatusReport statusReport) {
+         final ProcessGroup group = getGroup(groupId);
+         return getGroupStatus(group, statusReport);
+     }
+ 
+     public ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport) {
+         if (group == null) {
+             return null;
+         }
+ 
+         final ProcessGroupStatus status = new ProcessGroupStatus();
+         status.setId(group.getIdentifier());
+         status.setName(group.getName());
+         status.setCreationTimestamp(new Date().getTime());
+         int activeGroupThreads = 0;
+         long bytesRead = 0L;
+         long bytesWritten = 0L;
+         int queuedCount = 0;
+         long queuedContentSize = 0L;
+         int flowFilesIn = 0;
+         long bytesIn = 0L;
+         int flowFilesOut = 0;
+         long bytesOut = 0L;
+         int flowFilesReceived = 0;
+         long bytesReceived = 0L;
+         int flowFilesSent = 0;
+         long bytesSent = 0L;
+ 
+         // set status for processors
+         final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>();
+         status.setProcessorStatus(processorStatusCollection);
+

<TRUNCATED>