You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/11/06 16:24:09 UTC

[8/9] nifi git commit: NIFI-5769: Refactored FlowController to use Composition over Inheritance - Ensure that when root group is set, that we register its ID in FlowManager

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 680962e..c538044 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,28 +16,19 @@
  */
 package org.apache.nifi.controller;
 
-import org.apache.commons.collections4.Predicate;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.action.Action;
 import org.apache.nifi.admin.service.AuditService;
-import org.apache.nifi.annotation.configuration.DefaultSettings;
-import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
-import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.authorization.Authorizer;
-import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
-import org.apache.nifi.authorization.resource.DataAuthorizable;
-import org.apache.nifi.authorization.resource.ProvenanceDataAuthorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.util.IdentityMapping;
 import org.apache.nifi.authorization.util.IdentityMappingUtil;
-import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
@@ -52,8 +43,6 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.components.validation.StandardValidationTrigger;
 import org.apache.nifi.components.validation.TriggerValidationTask;
@@ -63,19 +52,15 @@ import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Funnel;
-import org.apache.nifi.connectable.LocalPort;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
-import org.apache.nifi.connectable.Size;
 import org.apache.nifi.connectable.StandardConnection;
 import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater;
 import org.apache.nifi.controller.cluster.Heartbeater;
 import org.apache.nifi.controller.exception.CommunicationsException;
-import org.apache.nifi.controller.exception.ComponentLifeCycleException;
-import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
-import org.apache.nifi.controller.exception.ProcessorInstantiationException;
-import org.apache.nifi.controller.label.Label;
-import org.apache.nifi.controller.label.StandardLabel;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.flow.StandardFlowManager;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
 import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
 import org.apache.nifi.controller.queue.ConnectionEventListener;
@@ -97,19 +82,15 @@ import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
 import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.reporting.ReportingTaskProvider;
-import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
-import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.CounterRepository;
-import org.apache.nifi.controller.repository.FlowFileEvent;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.QueueProvider;
-import org.apache.nifi.controller.repository.RepositoryStatusReport;
 import org.apache.nifi.controller.repository.StandardCounterRepository;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.StandardQueueProvider;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
 import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
 import org.apache.nifi.controller.repository.SwapSummary;
@@ -120,7 +101,6 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
-import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent;
 import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
 import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
 import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
@@ -131,20 +111,12 @@ import org.apache.nifi.controller.serialization.FlowSerializer;
 import org.apache.nifi.controller.serialization.FlowSynchronizationException;
 import org.apache.nifi.controller.serialization.FlowSynchronizer;
 import org.apache.nifi.controller.serialization.ScheduledStateLookup;
-import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
 import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
-import org.apache.nifi.controller.status.ConnectionStatus;
-import org.apache.nifi.controller.status.PortStatus;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.RunStatus;
-import org.apache.nifi.controller.status.TransmissionStatus;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
 import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
@@ -162,92 +134,47 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.framework.security.util.SslContextFactory;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
-import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.groups.StandardProcessGroup;
-import org.apache.nifi.history.History;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.logging.ControllerServiceLogObserver;
-import org.apache.nifi.logging.LogLevel;
-import org.apache.nifi.logging.LogRepository;
-import org.apache.nifi.logging.LogRepositoryFactory;
-import org.apache.nifi.logging.ProcessorLogObserver;
-import org.apache.nifi.logging.ReportingTaskLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
-import org.apache.nifi.processor.GhostProcessor;
 import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.processor.StandardProcessContext;
-import org.apache.nifi.processor.StandardProcessorInitializationContext;
-import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.provenance.ComponentIdentifierLookup;
 import org.apache.nifi.provenance.IdentifierLookup;
 import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.StandardProvenanceAuthorizableFactory;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.StandardVersionControlInformation;
-import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.registry.flow.VersionedConnection;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
 import org.apache.nifi.registry.variable.MutableVariableRegistry;
-import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RemoteResourceManager;
 import org.apache.nifi.remote.RemoteSiteListener;
-import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.remote.SocketRemoteSiteListener;
-import org.apache.nifi.remote.StandardRemoteProcessGroup;
-import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
-import org.apache.nifi.remote.StandardRootGroupPort;
-import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.cluster.NodeInformant;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.EventAccess;
-import org.apache.nifi.reporting.GhostReportingTask;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.reporting.StandardEventAccess;
+import org.apache.nifi.reporting.UserAwareEventAccess;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.util.ComponentIdGenerator;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
-import org.apache.nifi.util.SnippetUtils;
 import org.apache.nifi.util.concurrency.TimedLock;
-import org.apache.nifi.web.ResourceNotFoundException;
-import org.apache.nifi.web.api.dto.BatchSettingsDTO;
-import org.apache.nifi.web.api.dto.BundleDTO;
-import org.apache.nifi.web.api.dto.ConnectableDTO;
-import org.apache.nifi.web.api.dto.ConnectionDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.FunnelDTO;
-import org.apache.nifi.web.api.dto.LabelDTO;
-import org.apache.nifi.web.api.dto.PortDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.RelationshipDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
@@ -255,28 +182,25 @@ import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -288,8 +212,7 @@ import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 
-public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider,
-    QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup, ReloadComponent {
+public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider {
 
     // default repository implementations
     public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
@@ -303,8 +226,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
     public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 5-minute captures
 
-    public static final String ROOT_GROUP_ID_ALIAS = "root";
-    public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
 
     // default properties for scaling the positions of components from pre-1.0 flow encoding versions.
     public static final double DEFAULT_POSITION_SCALE_FACTOR_X = 1.5;
@@ -338,9 +259,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final ComponentStatusRepository componentStatusRepository;
     private final StateManagerProvider stateManagerProvider;
     private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
-    private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
     private final VariableRegistry variableRegistry;
-    private final ConcurrentMap<String, ControllerServiceNode> rootControllerServices = new ConcurrentHashMap<>();
 
     private final ConnectionLoadBalanceServer loadBalanceServer;
     private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry;
@@ -366,7 +285,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final Integer remoteInputHttpPort;
     private final Boolean isSiteToSiteSecure;
 
-    private final AtomicReference<ProcessGroup> rootGroupRef = new AtomicReference<>();
     private final List<Connectable> startConnectablesAfterInitialization;
     private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
     private final LeaderElectionManager leaderElectionManager;
@@ -374,6 +292,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final FlowRegistryClient flowRegistryClient;
     private final FlowEngine validationThreadPool;
     private final ValidationTrigger validationTrigger;
+    private final ReloadComponent reloadComponent;
+    private final ProvenanceAuthorizableFactory provenanceAuthorizableFactory;
+    private final UserAwareEventAccess eventAccess;
+    private final StandardFlowManager flowManager;
 
     /**
      * true if controller is configured to operate in a clustered environment
@@ -534,8 +456,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         this.variableRegistry = variableRegistry == null ? VariableRegistry.EMPTY_REGISTRY : variableRegistry;
 
         try {
+            this.provenanceAuthorizableFactory = new StandardProvenanceAuthorizableFactory(this);
             this.provenanceRepository = createProvenanceRepository(nifiProperties);
-            this.provenanceRepository.initialize(createEventReporter(bulletinRepository), authorizer, this, this);
+
+            final IdentifierLookup identifierLookup = new ComponentIdentifierLookup(this);
+
+            this.provenanceRepository.initialize(createEventReporter(), authorizer, provenanceAuthorizableFactory, identifierLookup);
         } catch (final Exception e) {
             throw new RuntimeException("Unable to create Provenance Repository", e);
         }
@@ -557,8 +483,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
 
+        this.flowManager = new StandardFlowManager(nifiProperties, sslContext, this, flowFileEventRepository);
+
+        controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository);
+
         eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
-            eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
+            eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
 
         final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
@@ -598,19 +528,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getNodeHeartbeatInterval(), TimeUnit.SECONDS);
 
         this.snippetManager = new SnippetManager();
+        this.reloadComponent = new StandardReloadComponent(this);
 
-        final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler,
+        final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), controllerServiceProvider, processScheduler,
             nifiProperties, encryptor, this, new MutableVariableRegistry(this.variableRegistry));
-        rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
+        rootGroup.setName(FlowManager.DEFAULT_ROOT_GROUP_NAME);
         setRootGroup(rootGroup);
         instanceId = ComponentIdGenerator.generateId().toString();
 
         this.validationThreadPool = new FlowEngine(5, "Validate Components", true);
         this.validationTrigger = new StandardValidationTrigger(validationThreadPool, this::isInitialized);
 
-        controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider,
-            this.variableRegistry, this.nifiProperties, validationTrigger);
-
         if (remoteInputSocketPort == null) {
             LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set");
         } else if (isSiteToSiteSecure && sslContext == null) {
@@ -655,12 +583,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             zooKeeperStateServer = null;
         }
 
+        eventAccess = new StandardEventAccess(this, flowFileEventRepository);
         componentStatusRepository = createComponentStatusRepository();
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
                 try {
-                    componentStatusRepository.capture(getControllerStatus(), getGarbageCollectionStatus());
+                    componentStatusRepository.capture(eventAccess.getControllerStatus(), getGarbageCollectionStatus());
                 } catch (final Exception e) {
                     LOG.error("Failed to capture component stats for Stats History", e);
                 }
@@ -704,7 +633,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
             final InetSocketAddress loadBalanceAddress = nifiProperties.getClusterLoadBalanceAddress();
             // Setup Load Balancing Server
-            final EventReporter eventReporter = createEventReporter(bulletinRepository);
+            final EventReporter eventReporter = createEventReporter();
             final List<IdentityMapping> identityMappings = IdentityMappingUtil.getIdentityMappings(nifiProperties);
             final LoadBalanceAuthorizer authorizeConnection = new ClusterLoadBalanceAuthorizer(clusterCoordinator, eventReporter);
             final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepository, provenanceRepository, this, authorizeConnection);
@@ -766,20 +695,44 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
-    private static FlowFileSwapManager createSwapManager(final NiFiProperties properties, final ExtensionManager extensionManager) {
-        final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, DEFAULT_SWAP_MANAGER_IMPLEMENTATION);
+    public FlowFileSwapManager createSwapManager() {
+        final String implementationClassName = nifiProperties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, DEFAULT_SWAP_MANAGER_IMPLEMENTATION);
         if (implementationClassName == null) {
             return null;
         }
 
         try {
-            return NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, FlowFileSwapManager.class, properties);
+            final FlowFileSwapManager swapManager = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, FlowFileSwapManager.class, nifiProperties);
+
+            final EventReporter eventReporter = createEventReporter();
+            try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() {
+                    @Override
+                    public ResourceClaimManager getResourceClaimManager() {
+                        return resourceClaimManager;
+                    }
+
+                    @Override
+                    public FlowFileRepository getFlowFileRepository() {
+                        return flowFileRepository;
+                    }
+
+                    @Override
+                    public EventReporter getEventReporter() {
+                        return eventReporter;
+                    }
+                };
+
+                swapManager.initialize(initializationContext);
+            }
+
+            return swapManager;
         } catch (final Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) {
+    public EventReporter createEventReporter() {
         return new EventReporter() {
             private static final long serialVersionUID = 1L;
 
@@ -795,7 +748,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         writeLock.lock();
         try {
             // get all connections/queues and recover from swap files.
-            final List<Connection> connections = getGroup(getRootGroupId()).findAllConnections();
+            final List<Connection> connections = flowManager.getRootGroup().findAllConnections();
 
             long maxIdFromSwapFiles = -1L;
             if (flowFileRepository.isVolatile()) {
@@ -820,7 +773,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 }
             }
 
-            flowFileRepository.loadFlowFiles(this, maxIdFromSwapFiles + 1);
+            flowFileRepository.loadFlowFiles(new StandardQueueProvider(this), maxIdFromSwapFiles + 1);
 
             // Begin expiring FlowFiles that are old
             final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository,
@@ -858,7 +811,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
                 @Override
                 public void run() {
-                    final ProcessGroup rootGroup = getRootGroup();
+                    final ProcessGroup rootGroup = flowManager.getRootGroup();
                     final List<ProcessGroup> allGroups = rootGroup.findAllProcessGroups();
                     allGroups.add(rootGroup);
 
@@ -879,14 +832,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     private void notifyComponentsConfigurationRestored() {
-        for (final ProcessorNode procNode : getGroup(getRootGroupId()).findAllProcessors()) {
+        for (final ProcessorNode procNode : flowManager.getRootGroup().findAllProcessors()) {
             final Processor processor = procNode.getProcessor();
             try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(), processor.getIdentifier())) {
                 ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
             }
         }
 
-        for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
+        for (final ControllerServiceNode serviceNode : flowManager.getAllControllerServices()) {
             final ControllerService service = serviceNode.getControllerServiceImplementation();
 
             try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) {
@@ -944,13 +897,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 }
             };
 
-            new TriggerValidationTask(this, triggerIfValidating).run();
+            new TriggerValidationTask(flowManager, triggerIfValidating).run();
 
             final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
             LOG.info("Performed initial validation of all components in {} milliseconds", millis);
 
             // Trigger component validation to occur every 5 seconds.
-            validationThreadPool.scheduleWithFixedDelay(new TriggerValidationTask(this, validationTrigger), 5, 5, TimeUnit.SECONDS);
+            validationThreadPool.scheduleWithFixedDelay(new TriggerValidationTask(flowManager, validationTrigger), 5, 5, TimeUnit.SECONDS);
 
             if (startDelayedComponents) {
                 LOG.info("Starting {} processors/ports/funnels", startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size());
@@ -1005,7 +958,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 startRemoteGroupPortsAfterInitialization.clear();
             }
 
-            for (final Connection connection : getRootGroup().findAllConnections()) {
+            for (final Connection connection : flowManager.getRootGroup().findAllConnections()) {
                 connection.getFlowFileQueue().startLoadBalancing();
             }
         } finally {
@@ -1063,377 +1016,29 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
-    /**
-     * Creates a connection between two Connectable objects.
-     *
-     * @param id required ID of the connection
-     * @param name the name of the connection, or <code>null</code> to leave the
-     * connection unnamed
-     * @param source required source
-     * @param destination required destination
-     * @param relationshipNames required collection of relationship names
-     * @return
-     *
-     * @throws NullPointerException if the ID, source, destination, or set of
-     * relationships is null.
-     * @throws IllegalArgumentException if <code>relationships</code> is an
-     * empty collection
-     */
-    public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) {
-        final StandardConnection.Builder builder = new StandardConnection.Builder(processScheduler);
-
-        final List<Relationship> relationships = new ArrayList<>();
-        for (final String relationshipName : requireNonNull(relationshipNames)) {
-            relationships.add(new Relationship.Builder().name(relationshipName).build());
-        }
-
-        // Create and initialize a FlowFileSwapManager for this connection
-        final FlowFileSwapManager swapManager = createSwapManager(nifiProperties, extensionManager);
-        final EventReporter eventReporter = createEventReporter(getBulletinRepository());
-
-        try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
-            final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() {
-                @Override
-                public ResourceClaimManager getResourceClaimManager() {
-                    return resourceClaimManager;
-                }
-
-                @Override
-                public FlowFileRepository getFlowFileRepository() {
-                    return flowFileRepository;
-                }
-
-                @Override
-                public EventReporter getEventReporter() {
-                    return eventReporter;
-                }
-            };
-
-            swapManager.initialize(initializationContext);
-        }
-
-        final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
-            @Override
-            public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) {
-                final FlowFileQueue flowFileQueue;
-
-                if (clusterCoordinator == null) {
-                    flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
-                            eventReporter, nifiProperties.getQueueSwapThreshold(), nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold());
-                } else {
-                    flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, eventListener, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
-                            clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
-
-                    flowFileQueue.setBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold());
-                    flowFileQueue.setBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold());
-                }
-
-                return flowFileQueue;
-            }
-        };
-
-        final Connection connection = builder.id(requireNonNull(id).intern())
-                .name(name == null ? null : name.intern())
-                .relationships(relationships)
-                .source(requireNonNull(source))
-                .destination(destination)
-                .flowFileQueueFactory(flowFileQueueFactory)
-                .build();
-
-        return connection;
-    }
-
-    /**
-     * Creates a new Label
-     *
-     * @param id identifier
-     * @param text label text
-     * @return new label
-     * @throws NullPointerException if either argument is null
-     */
-    public Label createLabel(final String id, final String text) {
-        return new StandardLabel(requireNonNull(id).intern(), text);
-    }
-
-    /**
-     * Creates a funnel
-     *
-     * @param id funnel id
-     * @return new funnel
-     */
-    public Funnel createFunnel(final String id) {
-        return new StandardFunnel(id.intern(), null, processScheduler);
-    }
-
-    /**
-     * Creates a Port to use as an Input Port for a Process Group
-     *
-     * @param id port identifier
-     * @param name port name
-     * @return new port
-     * @throws NullPointerException if the ID or name is not unique
-     * @throws IllegalStateException if an Input Port already exists with the
-     * same name or id.
-     */
-    public Port createLocalInputPort(String id, String name) {
-        id = requireNonNull(id).intern();
-        name = requireNonNull(name).intern();
-        verifyPortIdDoesNotExist(id);
-        return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, processScheduler);
-    }
-
-    /**
-     * Creates a Port to use as an Output Port for a Process Group
-     *
-     * @param id port id
-     * @param name port name
-     * @return new port
-     * @throws NullPointerException if the ID or name is not unique
-     * @throws IllegalStateException if an Input Port already exists with the
-     * same name or id.
-     */
-    public Port createLocalOutputPort(String id, String name) {
-        id = requireNonNull(id).intern();
-        name = requireNonNull(name).intern();
-        verifyPortIdDoesNotExist(id);
-        return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, processScheduler);
-    }
-
-    /**
-     * Creates a ProcessGroup with the given ID
-     *
-     * @param id group id
-     * @return new group
-     * @throws NullPointerException if the argument is null
-     */
-    public ProcessGroup createProcessGroup(final String id) {
-        return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, nifiProperties, encryptor, this, new MutableVariableRegistry(variableRegistry));
-    }
-
-    /**
-     * <p>
-     * Creates a new ProcessorNode with the given type and identifier and
-     * initializes it invoking the methods annotated with {@link OnAdded}.
-     * </p>
-     *
-     * @param type processor type
-     * @param id processor id
-     * @param coordinate the coordinate of the bundle for this processor
-     * @return new processor
-     * @throws NullPointerException if either arg is null
-     * @throws ProcessorInstantiationException if the processor cannot be
-     * instantiated for any reason
-     */
-    public ProcessorNode createProcessor(final String type, final String id, final BundleCoordinate coordinate) throws ProcessorInstantiationException {
-        return createProcessor(type, id, coordinate, true);
-    }
-
-    /**
-     * <p>
-     * Creates a new ProcessorNode with the given type and identifier and
-     * optionally initializes it.
-     * </p>
-     *
-     * @param type the fully qualified Processor class name
-     * @param id the unique ID of the Processor
-     * @param coordinate the bundle coordinate for this processor
-     * @param firstTimeAdded whether or not this is the first time this
-     * Processor is added to the graph. If {@code true}, will invoke methods
-     * annotated with the {@link OnAdded} annotation.
-     * @return new processor node
-     * @throws NullPointerException if either arg is null
-     * @throws ProcessorInstantiationException if the processor cannot be
-     * instantiated for any reason
-     */
-    public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final boolean firstTimeAdded) throws ProcessorInstantiationException {
-        return createProcessor(type, id, coordinate, Collections.emptySet(), firstTimeAdded, true);
-    }
-
-    /**
-     * <p>
-     * Creates a new ProcessorNode with the given type and identifier and
-     * optionally initializes it.
-     * </p>
-     *
-     * @param type the fully qualified Processor class name
-     * @param id the unique ID of the Processor
-     * @param coordinate the bundle coordinate for this processor
-     * @param firstTimeAdded whether or not this is the first time this
-     * Processor is added to the graph. If {@code true}, will invoke methods
-     * annotated with the {@link OnAdded} annotation.
-     * @return new processor node
-     * @throws NullPointerException if either arg is null
-     * @throws ProcessorInstantiationException if the processor cannot be
-     * instantiated for any reason
-     */
-    public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final Set<URL> additionalUrls,
-                                         final boolean firstTimeAdded, final boolean registerLogObserver) throws ProcessorInstantiationException {
-        id = id.intern();
-
-        boolean creationSuccessful;
-        LoggableComponent<Processor> processor;
-
-        // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
-        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
-
-        try {
-            processor = instantiateProcessor(type, id, coordinate, additionalUrls);
-            creationSuccessful = true;
-        } catch (final ProcessorInstantiationException pie) {
-            LOG.error("Could not create Processor of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", pie);
-            final GhostProcessor ghostProc = new GhostProcessor();
-            ghostProc.setIdentifier(id);
-            ghostProc.setCanonicalClassName(type);
-            processor = new LoggableComponent<>(ghostProc, coordinate, null);
-            creationSuccessful = false;
-        }
-
-        final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
-        final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, componentVarRegistry);
-        final ProcessorNode procNode;
-        if (creationSuccessful) {
-            procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider,
-                nifiProperties, componentVarRegistry, this, extensionManager, validationTrigger);
-        } else {
-            final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
-            final String componentType = "(Missing) " + simpleClassName;
-            procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider,
-                componentType, type, nifiProperties, componentVarRegistry, this, extensionManager, validationTrigger, true);
-        }
-
-        if (registerLogObserver) {
-            logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
-        }
-
-        try {
-            final Class<?> procClass = procNode.getProcessor().getClass();
-            if(procClass.isAnnotationPresent(DefaultSettings.class)) {
-                DefaultSettings ds = procClass.getAnnotation(DefaultSettings.class);
-                try {
-                    procNode.setYieldPeriod(ds.yieldDuration());
-                } catch(Throwable ex) {
-                    LOG.error(String.format("Error while setting yield period from DefaultSettings annotation:%s",ex.getMessage()),ex);
-                }
-                try {
 
-                    procNode.setPenalizationPeriod(ds.penaltyDuration());
-                } catch(Throwable ex) {
-                    LOG.error(String.format("Error while setting penalty duration from DefaultSettings annotation:%s",ex.getMessage()),ex);
-                }
 
-                // calling setBulletinLevel changes the level in the LogRepository so we only want to do this when
-                // the caller said to register the log observer, otherwise we could be changing the level when we didn't mean to
-                if (registerLogObserver) {
-                    try {
-                        procNode.setBulletinLevel(ds.bulletinLevel());
-                    } catch (Throwable ex) {
-                        LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s", ex.getMessage()), ex);
-                    }
-                }
-            }
-        } catch (Throwable ex) {
-            LOG.error(String.format("Error while setting default settings from DefaultSettings annotation: %s",ex.getMessage()),ex);
-        }
 
-        if (firstTimeAdded) {
-            try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
-                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, procNode.getProcessor());
-            } catch (final Exception e) {
-                if (registerLogObserver) {
-                    logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
-                }
-                throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
-            }
+    public KerberosConfig createKerberosConfig(final NiFiProperties nifiProperties) {
+        final String principal = nifiProperties.getKerberosServicePrincipal();
+        final String keytabLocation = nifiProperties.getKerberosServiceKeytabLocation();
+        final File kerberosConfigFile = nifiProperties.getKerberosConfigurationFile();
 
-            if (firstTimeAdded) {
-                try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
-                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
-                }
-            }
+        if (principal == null && keytabLocation == null && kerberosConfigFile == null) {
+            return KerberosConfig.NOT_CONFIGURED;
         }
 
-        return procNode;
+        final File keytabFile = keytabLocation == null ? null : new File(keytabLocation);
+        return new KerberosConfig(principal, keytabFile, kerberosConfigFile);
     }
 
-    private LoggableComponent<Processor> instantiateProcessor(final String type, final String identifier, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
-            throws ProcessorInstantiationException {
-
-        final Bundle processorBundle = extensionManager.getBundle(bundleCoordinate);
-        if (processorBundle == null) {
-            throw new ProcessorInstantiationException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
-        }
-
-        final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
-        try {
-            final ClassLoader detectedClassLoaderForInstance = extensionManager.createInstanceClassLoader(type, identifier, processorBundle, additionalUrls);
-            final Class<?> rawClass = Class.forName(type, true, detectedClassLoaderForInstance);
-            Thread.currentThread().setContextClassLoader(detectedClassLoaderForInstance);
-
-            final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class);
-            final Processor processor = processorClass.newInstance();
-
-            final ComponentLog componentLogger = new SimpleProcessLogger(identifier, processor);
-            final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
-            final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, terminationAwareLogger, this, this, nifiProperties);
-            processor.initialize(ctx);
-
-            LogRepositoryFactory.getRepository(identifier).setLogger(terminationAwareLogger);
 
-            return new LoggableComponent<>(processor, bundleCoordinate, terminationAwareLogger);
-        } catch (final Throwable t) {
-            throw new ProcessorInstantiationException(type, t);
-        } finally {
-            if (ctxClassLoader != null) {
-                Thread.currentThread().setContextClassLoader(ctxClassLoader);
-            }
-        }
+    public ValidationTrigger getValidationTrigger() {
+        return validationTrigger;
     }
 
-    @Override
-    public void reload(final ProcessorNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
-            throws ProcessorInstantiationException {
-        if (existingNode == null) {
-            throw new IllegalStateException("Existing ProcessorNode cannot be null");
-        }
-
-        final String id = existingNode.getProcessor().getIdentifier();
-
-        // ghost components will have a null logger
-        if (existingNode.getLogger() != null) {
-            existingNode.getLogger().debug("Reloading component {} to type {} from bundle {}", new Object[]{id, newType, bundleCoordinate});
-        }
-
-        // createProcessor will create a new instance class loader for the same id so
-        // save the instance class loader to use it for calling OnRemoved on the existing processor
-        final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
-
-        // create a new node with firstTimeAdded as true so lifecycle methods get fired
-        // attempt the creation to make sure it works before firing the OnRemoved methods below
-        final ProcessorNode newNode = createProcessor(newType, id, bundleCoordinate, additionalUrls, true, false);
-
-        // call OnRemoved for the existing processor using the previous instance class loader
-        try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
-            final StateManager stateManager = getStateManagerProvider().getStateManager(id);
-            final StandardProcessContext processContext = new StandardProcessContext(existingNode, controllerServiceProvider, encryptor, stateManager, () -> false);
-            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
-        } finally {
-            extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
-        }
-
-        // set the new processor in the existing node
-        final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getProcessor());
-        final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
-        LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
-
-        final LoggableComponent<Processor> newProcessor = new LoggableComponent<>(newNode.getProcessor(), newNode.getBundleCoordinate(), terminationAwareLogger);
-        existingNode.setProcessor(newProcessor);
-        existingNode.setExtensionMissing(newNode.isExtensionMissing());
-
-        // need to refresh the properties in case we are changing from ghost component to real component
-        existingNode.refreshProperties();
-
-        LOG.debug("Triggering async validation of {} due to processor reload", existingNode);
-        validationTrigger.triggerAsync(existingNode);
+    public StringEncryptor getEncryptor() {
+        return encryptor;
     }
 
     /**
@@ -1476,117 +1081,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return authorizer;
     }
 
-    /**
-     * Creates a Port to use as an Input Port for the root Process Group, which
-     * is used for Site-to-Site communications
-     *
-     * @param id port id
-     * @param name port name
-     * @return new port
-     * @throws NullPointerException if the ID or name is not unique
-     * @throws IllegalStateException if an Input Port already exists with the
-     * same name or id.
-     */
-    public Port createRemoteInputPort(String id, String name) {
-        id = requireNonNull(id).intern();
-        name = requireNonNull(name).intern();
-        verifyPortIdDoesNotExist(id);
-        return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
-                authorizer, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure), nifiProperties);
-    }
-
-    /**
-     * Creates a Port to use as an Output Port for the root Process Group, which
-     * is used for Site-to-Site communications and will queue flow files waiting
-     * to be delivered to remote instances
-     *
-     * @param id port id
-     * @param name port name
-     * @return new port
-     * @throws NullPointerException if the ID or name is not unique
-     * @throws IllegalStateException if an Input Port already exists with the
-     * same name or id.
-     */
-    public Port createRemoteOutputPort(String id, String name) {
-        id = requireNonNull(id).intern();
-        name = requireNonNull(name).intern();
-        verifyPortIdDoesNotExist(id);
-        return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
-                authorizer, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure), nifiProperties);
-    }
-
-    /**
-     * Creates a new Remote Process Group with the given ID that points to the
-     * given URI
-     *
-     * @param id group id
-     * @param uris group uris, multiple url can be specified in comma-separated format
-     * @return new group
-     * @throws NullPointerException if either argument is null
-     * @throws IllegalArgumentException if <code>uri</code> is not a valid URI.
-     */
-    public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) {
-        return new StandardRemoteProcessGroup(requireNonNull(id).intern(), uris, null, this, sslContext, nifiProperties);
-    }
-
-    public ProcessGroup getRootGroup() {
-        return rootGroupRef.get();
-    }
-
-    /**
-     * Verifies that no output port exists with the given id or name. If this
-     * does not hold true, throws an IllegalStateException
-     *
-     * @param id port identifier
-     * @throws IllegalStateException port already exists
-     */
-    private void verifyPortIdDoesNotExist(final String id) {
-        final ProcessGroup rootGroup = getRootGroup();
-        Port port = rootGroup.findOutputPort(id);
-        if (port != null) {
-            throw new IllegalStateException("An Input Port already exists with ID " + id);
-        }
-        port = rootGroup.findInputPort(id);
-        if (port != null) {
-            throw new IllegalStateException("An Input Port already exists with ID " + id);
-        }
-    }
-
-    /**
-     * @return the name of this controller, which is also the name of the Root
-     * Group.
-     */
-    public String getName() {
-        return getRootGroup().getName();
-    }
-
-    /**
-     * Sets the name for the Root Group, which also changes the name for the
-     * controller.
-     *
-     * @param name of root group
-     */
-    public void setName(final String name) {
-        getRootGroup().setName(name);
-    }
-
-    /**
-     * @return the comments of this controller, which is also the comment of the
-     * Root Group
-     */
-    public String getComments() {
-        return getRootGroup().getComments();
-    }
-
-    /**
-     * Sets the comments
-     *
-     * @param comments for the Root Group, which also changes the comment for
-     * the controller
-     */
-    public void setComments(final String comments) {
-        getRootGroup().setComments(comments);
-    }
 
     /**
      * @return <code>true</code> if the scheduling engine for this controller
@@ -1615,7 +1109,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      */
     public void shutdown(final boolean kill) {
         this.shutdown = true;
-        stopAllProcessors();
+        flowManager.getRootGroup().stopProcessing();
 
         readLock.lock();
         try {
@@ -1654,14 +1148,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             loadBalanceClientTasks.forEach(NioAsyncLoadBalanceClientTask::stop);
 
             // Trigger any processors' methods marked with @OnShutdown to be called
-            getRootGroup().shutdown();
+            flowManager.getRootGroup().shutdown();
 
             stateManagerProvider.shutdown();
 
             // invoke any methods annotated with @OnShutdown on Controller Services
-            for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
-                try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(
-                        extensionManager, serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) {
+            for (final ControllerServiceNode serviceNode : flowManager.getAllControllerServices()) {
+                final Class<?> serviceImplClass = serviceNode.getControllerServiceImplementation().getClass();
+                try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, serviceImplClass, serviceNode.getIdentifier())) {
                     final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, variableRegistry);
                     ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
                 }
@@ -1864,11 +1358,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
-    /**
-     * @return the ID of the root group
-     */
-    public String getRootGroupId() {
-        return getRootGroup().getIdentifier();
+    public UserAwareEventAccess getEventAccess() {
+        return eventAccess;
     }
 
     /**
@@ -1887,7 +1378,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         writeLock.lock();
         try {
-            rootGroupRef.set(group);
+            flowManager.setRootGroup(group);
             for (final RemoteSiteListener listener : externalSiteListeners) {
                 listener.setRootGroup(group);
             }
@@ -1895,7 +1386,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             // update the heartbeat bean
             this.heartbeatBeanRef.set(new HeartbeatBean(group, isPrimary()));
             allProcessGroups.put(group.getIdentifier(), group);
-            allProcessGroups.put(ROOT_GROUP_ID_ALIAS, group);
+            allProcessGroups.put(FlowManager.ROOT_GROUP_ID_ALIAS, group);
         } finally {
             writeLock.unlock("setRootGroup");
         }
@@ -1921,38 +1412,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     //
     // ProcessGroup access
     //
-    /**
-     * Updates the process group corresponding to the specified DTO. Any field
-     * in DTO that is <code>null</code> (with the exception of the required ID)
-     * will be ignored.
-     *
-     * @param dto group
-     * @throws ProcessorInstantiationException
-     *
-     * @throws IllegalStateException if no process group can be found with the
-     * ID of DTO or with the ID of the DTO's parentGroupId, if the template ID
-     * specified is invalid, or if the DTO's Parent Group ID changes but the
-     * parent group has incoming or outgoing connections
-     *
-     * @throws NullPointerException if the DTO or its ID is null
-     */
-    public void updateProcessGroup(final ProcessGroupDTO dto) throws ProcessorInstantiationException {
-        final ProcessGroup group = lookupGroup(requireNonNull(dto).getId());
-
-        final String name = dto.getName();
-        final PositionDTO position = dto.getPosition();
-        final String comments = dto.getComments();
-
-        if (name != null) {
-            group.setName(name);
-        }
-        if (position != null) {
-            group.setPosition(toPosition(position));
-        }
-        if (comments != null) {
-            group.setComments(comments);
-        }
-    }
 
     private Position toPosition(final PositionDTO dto) {
         return new Position(dto.getX(), dto.getY());
@@ -1961,1549 +1420,153 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     //
     // Snippet
     //
-    /**
-     * Creates an instance of the given snippet and adds the components to the
-     * given group
-     *
-     * @param group group
-     * @param dto dto
-     *
-     * @throws NullPointerException if either argument is null
-     * @throws IllegalStateException if the snippet is not valid because a
-     * component in the snippet has an ID that is not unique to this flow, or
-     * because it shares an Input Port or Output Port at the root level whose
-     * name already exists in the given ProcessGroup, or because the Template
-     * contains a Processor or a Prioritizer whose class is not valid within
-     * this instance of NiFi.
-     * @throws ProcessorInstantiationException if unable to instantiate a
-     * processor
-     */
-    public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
-        instantiateSnippet(group, dto, true);
-        group.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
+
+    private void verifyBundleInVersionedFlow(final org.apache.nifi.registry.flow.Bundle requiredBundle, final Set<BundleCoordinate> supportedBundles) {
+        final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
+        if (!supportedBundles.contains(requiredCoordinate)) {
+            throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
+        }
     }
 
-    private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException {
-        validateSnippetContents(requireNonNull(group), dto);
-        writeLock.lock();
-        try {
-            //
-            // Instantiate Controller Services
-            //
-            final List<ControllerServiceNode> serviceNodes = new ArrayList<>();
-            try {
-                for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
-                    final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
-                    final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(), true);
-                    serviceNode.pauseValidationTrigger();
-                    serviceNodes.add(serviceNode);
-
-                    serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
-                    serviceNode.setComments(controllerServiceDTO.getComments());
-                    serviceNode.setName(controllerServiceDTO.getName());
-                    if (!topLevel) {
-                        serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId());
-                    }
 
-                    group.addControllerService(serviceNode);
+    private void verifyProcessorsInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+        if (versionedFlow.getProcessors() != null) {
+            versionedFlow.getProcessors().forEach(processor -> {
+                if (processor.getBundle() == null) {
+                    throw new IllegalArgumentException("Processor bundle must be specified.");
                 }
 
-                // configure controller services. We do this after creating all of them in case 1 service
-                // references another service.
-                for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
-                    final String serviceId = controllerServiceDTO.getId();
-                    final ControllerServiceNode serviceNode = getControllerServiceNode(serviceId);
-                    serviceNode.setProperties(controllerServiceDTO.getProperties());
+                if (supportedTypes.containsKey(processor.getType())) {
+                    verifyBundleInVersionedFlow(processor.getBundle(), supportedTypes.get(processor.getType()));
+                } else {
+                    throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
                 }
-            } finally {
-                serviceNodes.stream().forEach(ControllerServiceNode::resumeValidationTrigger);
-            }
-
-            //
-            // Instantiate the labels
-            //
-            for (final LabelDTO labelDTO : dto.getLabels()) {
-                final Label label = createLabel(labelDTO.getId(), labelDTO.getLabel());
-                label.setPosition(toPosition(labelDTO.getPosition()));
-                if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) {
-                    label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight()));
-                }
-
-                label.setStyle(labelDTO.getStyle());
-                if (!topLevel) {
-                    label.setVersionedComponentId(labelDTO.getVersionedComponentId());
-                }
-
-                group.addLabel(label);
-            }
-
-            // Instantiate the funnels
-            for (final FunnelDTO funnelDTO : dto.getFunnels()) {
-                final Funnel funnel = createFunnel(funnelDTO.getId());
-                funnel.setPosition(toPosition(funnelDTO.getPosition()));
-                if (!topLevel) {
-                    funnel.setVersionedComponentId(funnelDTO.getVersionedComponentId());
-                }
-
-                group.addFunnel(funnel);
-            }
-
-            //
-            // Instantiate Input Ports & Output Ports
-            //
-            for (final PortDTO portDTO : dto.getInputPorts()) {
-                final Port inputPort;
-                if (group.isRootGroup()) {
-                    inputPort = createRemoteInputPort(portDTO.getId(), portDTO.getName());
-                    inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
-                    if (portDTO.getGroupAccessControl() != null) {
-                        ((RootGroupPort) inputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
-                    }
-                    if (portDTO.getUserAccessControl() != null) {
-                        ((RootGroupPort) inputPort).setUserAccessControl(portDTO.getUserAccessControl());
-                    }
-                } else {
-                    inputPort = createLocalInputPort(portDTO.getId(), portDTO.getName());
-                }
-
-                if (!topLevel) {
-                    inputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
-                }
-                inputPort.setPosition(toPosition(portDTO.getPosition()));
-                inputPort.setProcessGroup(group);
-                inputPort.setComments(portDTO.getComments());
-                group.addInputPort(inputPort);
-            }
-
-            for (final PortDTO portDTO : dto.getOutputPorts()) {
-                final Port outputPort;
-                if (group.isRootGroup()) {
-                    outputPort = createRemoteOutputPort(portDTO.getId(), portDTO.getName());
-                    outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
-                    if (portDTO.getGroupAccessControl() != null) {
-                        ((RootGroupPort) outputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
-                    }
-                    if (portDTO.getUserAccessControl() != null) {
-                        ((RootGroupPort) outputPort).setUserAccessControl(portDTO.getUserAccessControl());
-                    }
-                } else {
-                    outputPort = createLocalOutputPort(portDTO.getId(), portDTO.getName());
-                }
-
-                if (!topLevel) {
-                    outputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
-                }
-                outputPort.setPosition(toPosition(portDTO.getPosition()));
-                outputPort.setProcessGroup(group);
-                outputPort.setComments(portDTO.getComments());
-                group.addOutputPort(outputPort);
-            }
-
-            //
-            // Instantiate the processors
-            //
-            for (final ProcessorDTO processorDTO : dto.getProcessors()) {
-                final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, processorDTO.getType(), processorDTO.getBundle());
-                final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate);
-                procNode.pauseValidationTrigger();
-
-                try {
-                    procNode.setPosition(toPosition(processorDTO.getPosition()));
-                    procNode.setProcessGroup(group);
-                    if (!topLevel) {
-                        procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
-                    }
-
-                    final ProcessorConfigDTO config = processorDTO.getConfig();
-                    procNode.setComments(config.getComments());
-                    if (config.isLossTolerant() != null) {
-                        procNode.setLossTolerant(config.isLossTolerant());
-                    }
-                    procNode.setName(processorDTO.getName());
-
-                    procNode.setYieldPeriod(config.getYieldDuration());
-                    procNode.setPenalizationPeriod(config.getPenaltyDuration());
-                    procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
-                    procNode.setAnnotationData(config.getAnnotationData());
-                    procNode.setStyle(processorDTO.getStyle());
-
-                    if (config.getRunDurationMillis() != null) {
-                        procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS);
-                    }
-
-                    if (config.getSchedulingStrategy() != null) {
-                        procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
-                    }
-
-                    if (config.getExecutionNode() != null) {
-                        procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
-                    }
-
-                    if (processorDTO.getState().equals(ScheduledState.DISABLED.toString())) {
-                        procNode.disable();
-                    }
-
-                    // ensure that the scheduling strategy is set prior to these values
-                    procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
-                    procNode.setScheduldingPeriod(config.getSchedulingPeriod());
-
-                    final Set<Relationship> relationships = new HashSet<>();
-                    if (processorDTO.getRelationships() != null) {
-                        for (final RelationshipDTO rel : processorDTO.getRelationships()) {
-                            if (rel.isAutoTerminate()) {
-                                relationships.add(procNode.getRelationship(rel.getName()));
-                            }
-                        }
-                        procNode.setAutoTerminatedRelationships(relationships);
-                    }
-
-                    if (config.getProperties() != null) {
-                        procNode.setProperties(config.getProperties());
-                    }
-
-                    group.addProcessor(procNode);
-                } finally {
-                    procNode.resumeValidationTrigger();
-                }
-            }
-
-            //
-            // Instantiate Remote Process Groups
-            //
-            for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) {
-                final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUris());
-                remoteGroup.setComments(remoteGroupDTO.getComments());
-                remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
-                remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
-                remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration());
-                if (!topLevel) {
-                    remoteGroup.setVersionedComponentId(remoteGroupDTO.getVersionedComponentId());
-                }
-
-                if (remoteGroupDTO.getTransportProtocol() == null) {
-                    remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.RAW);
-                } else {
-                    remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(remoteGroupDTO.getTransportProtocol()));
-                }
-
-                remoteGroup.setProxyHost(remoteGroupDTO.getProxyHost());
-                remoteGroup.setProxyPort(remoteGroupDTO.getProxyPort());
-                remoteGroup.setProxyUser(remoteGroupDTO.getProxyUser());
-                remoteGroup.setProxyPassword(remoteGroupDTO.getProxyPassword());
-                remoteGroup.setProcessGroup(group);
-
-                // set the input/output ports
-                if (remoteGroupDTO.getContents() != null) {
-                    final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents();
-
-                    // ensure there are input ports
-                    if (contents.getInputPorts() != null) {
-                        remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()), false);
-                    }
-
-                    // ensure there are output ports
-                    if (contents.getOutputPorts() != null) {
-                        remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()), false);
-                    }
-                }
-
-                group.addRemoteProcessGroup(remoteGroup);
-            }
-
-            //
-            // Instantiate ProcessGroups
-            //
-            for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) {
-                final ProcessGroup childGroup = createProcessGroup(groupDTO.getId());
-                childGroup.setParent(group);
-                childGroup.setPosition(toPosition(groupDTO.getPosition()));
-                childGroup.setComments(groupDTO.getComments());
-                childGroup.setName(groupDTO.getName());
-                if (groupDTO.getVariables() != null) {
-                    childGroup.setVariables(groupDTO.getVariables());
-                }
-
-                // If this Process Group is 'top level' then we do not set versioned component ID's.
-                // We do this only if this component is the child of a Versioned Component.
-                if (!topLevel) {
-                    childGroup.setVersionedComponentId(groupDTO.getVersionedComponentId());
-                }
-
-                group.addProcessGroup(childGroup);
-
-                final FlowSnippetDTO contents = groupDTO.getContents();
-
-                // we want this to be recursive, so we will create a new template that contains only
-                // the contents of this child group and recursively call ourselves.
-                final FlowSnippetDTO childTemplateDTO = new FlowSnippetDTO();
-                childTemplateDTO.setConnections(contents.getConnections());
-                childTemplateDTO.setInputPorts(contents.getInputPorts());
-                childTemplateDTO.setLabels(contents.getLabels());
-                childTemplateDTO.setOutputPorts(contents.getOutputPorts());
-                childTemplateDTO.setProcessGroups(contents.getProcessGroups());
-                childTemplateDTO.setProcessors(contents.getProcessors());
-                childTemplateDTO.setFunnels(contents.getFunnels());
-                childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups());
-                childTemplateDTO.setControllerServices(contents.getControllerServices());
-                instantiateSnippet(childGroup, childTemplateDTO, false);
-
-                if (groupDTO.getVersionControlInformation() != null) {
-                    final VersionControlInformation vci = StandardVersionControlInformation.Builder
-                        .fromDto(groupDTO.getVersionControlInformation())
-                        .build();
-                    childGroup.setVersionControlInformation(vci, Collections.emptyMap());
-                }
-            }
-
-            //
-            // Instantiate Connections
-            //
-            for (final ConnectionDTO connectionDTO : dto.getConnections()) {
-                final ConnectableDTO sourceDTO = connectionDTO.getSource();
-                final ConnectableDTO destinationDTO = connectionDTO.getDestination();
-                final Connectable source;
-                final Connectable destination;
-
-                // locate the source and destination connectable. if this is a remote port
-                // we need to locate the remote process groups. otherwise we need to
-                // find the connectable given its parent group.
-                // NOTE: (getConnectable returns ANY connectable, when the parent is
-                // not this group only input ports or output ports should be returned. if something
-                // other than a port is returned, an exception will be thrown when adding the
-                // connection below.)
-                // see if the source connectable is a remote port
-                if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) {
-                    final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(sourceDTO.getGroupId());
-                    source = remoteGroup.getOutputPort(sourceDTO.getId());
-                } else {
-                    final ProcessGroup sourceGroup = getConnectableParent(group, sourceDTO.getGroupId());
-                    source = sourceGroup.getConnectable(sourceDTO.getId());
-                }
-
-                // see if the destination connectable is a remote port
-                if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDTO.getType())) {
-                    final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(destinationDTO.getGroupId());
-                    destination = remoteGroup.getInputPort(destinationDTO.getId());
-                } else {
-                    final ProcessGroup destinationGroup = getConnectableParent(group, destinationDTO.getGroupId());
-                    destination = destinationGroup.getConnectable(destinationDTO.getId());
-                }
-
-                // determine the selection relationships for this connection
-                final Set<String> relationships = new HashSet<>();
-                if (connectionDTO.getSelectedRelationships() != null) {
-                    relationships.addAll(connectionDTO.getSelectedRelationships());
-                }
-
-                final Connection connection = createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships);
-                if (!topLevel) {
-                    connection.setVersionedComponentId(connectionDTO.getVersionedComponentId());
-                }
-
-                if (connectionDTO.getBends() != null) {
-                    final List<Position> bendPoints = new ArrayList<>();
-                    for (final PositionDTO bend : connectionDTO.getBends()) {
-                        bendPoints.add(new Position(bend.getX(), bend.getY()));
-                    }
-                    connection.setBendPoints(bendPoints);
-                }
-
-                final FlowFileQueue queue = connection.getFlowFileQueue();
-                queue.setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold());
-                queue.setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold());
-                queue.setFlowFileExpiration(connectionDTO.getFlowFileExpiration());
-
-                final List<String> prioritizers = connectionDTO.getPrioritizers();
-                if (prioritizers != null) {
-                    final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers);
-                    final List<FlowFilePrioritizer> newPrioritizers = new ArrayList<>();
-                    for (final String className : newPrioritizersClasses) {
-                        try {
-                            newPrioritizers.add(createPrioritizer(className));
-                        } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
-                            throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e);
-                        }
-                    }
-                    queue.setPriorities(newPrioritizers);
-                }
-
-                final String loadBalanceStrategyName = connectionDTO.getLoadBalanceStrategy();
-                if (loadBalanceStrategyName != null) {
-                    final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
-                    final String partitioningAttribute = connectionDTO.getLoadBalancePartitionAttribute();
-                    queue.setLoadBalanceStrategy(loadBalanceStrategy, partitioningAttribute);
-                }
-
-                connection.setProcessGroup(group);
-                group.addConnection(connection);
-            }
-        } finally {
-            writeLock.unlock("instantiateSnippet");
-        }
-    }
-
-    /**
-     * Converts a set of ports into a set of remote process group ports.
-     *
-     * @param ports ports
-     * @return group descriptors
-     */
-    private Set<RemoteProcessGroupPortDescriptor> convertRemotePort(final Set<RemoteProcessGroupPortDTO> ports) {
-        Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
-        if (ports != null) {
-            remotePorts = new LinkedHashSet<>(ports.size());
-            for (final RemoteProcessGroupPortDTO port : ports) {
-                final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
-                descriptor.setId(port.getId());
-                descriptor.setVersionedComponentId(port.getVersionedComponentId());
-                descriptor.setTargetId(port.getTargetId());
-                descriptor.setName(port.getName());
-                descriptor.setComments(port.getComments());
-                descriptor.setTargetRunning(port.isTargetRunning());
-                descriptor.setConnected(port.isConnected());
-                descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount());
-                descriptor.setTransmitting(port.isTransmitting());
-                descriptor.setUseCompression(port.getUseCompression());
-                final BatchSettingsDTO batchSettings = port.getBatchSettings();
-                if (batchSettings != null) {
-                    descriptor.setBatchCount(batchSettings.getCount());
-                    descriptor.setBatchSize(batchSettings.getSize());
-                    descriptor.setBatchDuration(batchSettings.getDuration());
-                }
-                remotePorts.add(descriptor);
-            }
-        }
-        return remotePorts;
-    }
-
-    /**
-     * Returns the parent of the specified Connectable. This only considers this
-     * group and any direct child sub groups.
-     *
-     * @param parentGroupId group id
-     * @return parent group
-     */
-    private ProcessGroup getConnectableParent(final ProcessGroup group, final String parentGroupId) {
-        if (areGroupsSame(group.getIdentifier(), parentGroupId)) {
-            return group;
-        } else {
-            return group.getProcessGroup(parentGroupId);
-        }
-    }
-
-    private void verifyBundleInSnippet(final BundleDTO requiredBundle, final Set<BundleCoordinate> supportedBundles) {
-        final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
-        if (!supportedBundles.contains(requiredCoordinate)) {
-            throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
-        }
-    }
-
-    private void verifyBundleInVersionedFlow(final org.apache.nifi.registry.flow.Bundle requiredBundle, final Set<BundleCoordinate> supportedBundles) {
-        final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
-        if (!supportedBundles.contains(requiredCoordinate)) {
-            throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
-        }
-    }
-
-    private void verifyProcessorsInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
-        if (templateContents.getProcessors() != null) {
-            templateContents.getProcessors().forEach(processor -> {
-                if (processor.getBundle() == null) {
-                    throw new IllegalArgumentException("Processor bundle must be specified.");
-                }
-
-                if (supportedTypes.containsKey(processor.getType())) {
-                    verifyBundleInSnippet(processor.getBundle(), supportedTypes.get(processor.getType()));
-                } else {
-                    throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
-                }
-            });
-        }
-
-        if (templateContents.getProcessGroups() != null) {
-            templateContents.getProcessGroups().forEach(processGroup -> {
-                verifyProcessorsInSnippet(processGroup.getContents(), supportedTypes);
-            });
-        }
-    }
-
-    private void verifyProcessorsInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
-        if (versionedFlow.getProcessors() != null) {
-            versionedFlow.getProcessors().forEach(processor -> {
-                if (processor.getBundle() == null) {
-                    throw new IllegalArgumentException("Processor bundle must be specified.");
-                }
-
-                if (supportedTypes.containsKey(processor.getType())) {
-                    verifyBundleInVersionedFlow(processor.getBundle(), supportedTypes.get(processor.getType()));
-                } else {
-                    throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
-                }
-            });
-        }
-
-        if (versionedFlow.getProcessGroups() != null) {
-            versionedFlow.getProcessGroups().forEach(processGroup -> {
-                verifyProcessorsInVersionedFlow(processGroup, supportedTypes);
-            });
-        }
-    }
-
-    private void verifyControllerServicesInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
-        if (templateContents.getControllerServices() != null) {
-            templateContents.getControllerServices().forEach(controllerService -> {
-                if (supportedTypes.containsKey(controllerService.getType())) {
-                    if (controllerService.getBundle() == null) {
-                        throw new IllegalArgumentException("Controller Service bundle must be specified.");
-                    }
-
-                    verifyBundleInSnippet(controllerService.getBundle(), supportedTypes.get(controllerService.getType()));
-                } else {
-                    throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
-                }
-            });
-        }
-
-        if (templateContents.getProcessGroups() != null) {
-            templateContents.getProcessGroups().forEach(processGroup -> {
-                verifyControllerServicesInSnippet(processGroup.getContents(), supportedTypes);
-            });
-        }
-    }
-
-    private void verifyControllerServicesInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
-        if (versionedFlow.getControllerServices() != null) {
-            versionedFlow.getControllerServices().forEach(controllerService -> {
-                if (supportedTypes.containsKey(controllerService.getType())) {
-                    if (controllerService.getBundle() == null) {
-                        throw new IllegalArgumentException("Controller Service bundle must be specified.");
-                    }
-
-                    verifyBundleInVersionedFlow(controllerService.getBundle(), supportedTypes.get(controllerService.getType()));
-                } else {
-                    throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
-                }
-            });
-        }
-
-        if (versionedFlow.getProcessGroups() != null) {
-            versionedFlow.getProcessGroups().forEach(processGroup -> {
-                verifyControllerServicesInVersionedFlow(processGroup, supportedTypes);
-            });
-        }
-    }
-
-    public void verifyComponentTypesInSnippet(final FlowSnippetDTO templateContents) {
-        final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
-        for (final Class<?> c : extensionManager.getExtensions(Processor.class)) {
-            final String name = c.getName();
-            processorClasses.put(name, extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
-        }
-        verifyProcessorsInSnippet(templateContents, processorClasses);
-
-        final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>();
-        for (final Class<?> c : extensionManager.getExtensions(ControllerService.class)) {
-            final String name = c.getName();
-            controllerServiceClasses.put(name, extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
-        }
-        verifyControllerServicesInSnippet(templateContents, controllerServiceClasses);
-
-        final Set<String> prioritizerClasses = new HashSet<>();
-        for (final Class<?> c : extensionManager.getExtensions(FlowFilePrioritizer.class)) {
-            prioritizerClasses.add(c.ge

<TRUNCATED>