You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/11/06 16:24:06 UTC

[5/9] nifi git commit: NIFI-5769: Refactored FlowController to use Composition over Inheritance - Ensure that when root group is set, that we register its ID in FlowManager

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 06d32e2..e545558 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -16,11 +16,24 @@
  */
 package org.apache.nifi.controller.service;
 
-import static java.util.Objects.requireNonNull;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -39,46 +52,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.lifecycle.OnAdded;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.bundle.BundleCoordinate;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.state.StateManager;
-import org.apache.nifi.components.state.StateManagerProvider;
-import org.apache.nifi.components.validation.ValidationTrigger;
-import org.apache.nifi.controller.ComponentNode;
-import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.controller.LoggableComponent;
-import org.apache.nifi.controller.ProcessorNode;
-import org.apache.nifi.controller.ReportingTaskNode;
-import org.apache.nifi.controller.ScheduledState;
-import org.apache.nifi.controller.TerminationAwareLogger;
-import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.exception.ComponentLifeCycleException;
-import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
-import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
-import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.logging.LogRepositoryFactory;
-import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.nar.NarCloseable;
-import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.processor.StandardValidationContextFactory;
-import org.apache.nifi.registry.ComponentVariableRegistry;
-import org.apache.nifi.registry.VariableRegistry;
-import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.ReflectionUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static java.util.Objects.requireNonNull;
 
 public class StandardControllerServiceProvider implements ControllerServiceProvider {
 
@@ -86,170 +60,21 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     private final StandardProcessScheduler processScheduler;
     private final BulletinRepository bulletinRepo;
-    private final StateManagerProvider stateManagerProvider;
-    private final VariableRegistry variableRegistry;
     private final FlowController flowController;
-    private final NiFiProperties nifiProperties;
+    private final FlowManager flowManager;
 
     private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>();
-    private final ValidationTrigger validationTrigger;
-
-    public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo,
-        final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties, final ValidationTrigger validationTrigger) {
 
+    public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo) {
         this.flowController = flowController;
         this.processScheduler = scheduler;
         this.bulletinRepo = bulletinRepo;
-        this.stateManagerProvider = stateManagerProvider;
-        this.variableRegistry = variableRegistry;
-        this.nifiProperties = nifiProperties;
-        this.validationTrigger = validationTrigger;
-    }
-
-    private StateManager getStateManager(final String componentId) {
-        return stateManagerProvider.getStateManager(componentId);
+        this.flowManager = flowController.getFlowManager();
     }
 
     @Override
-    public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded) {
-        if (type == null || id == null || bundleCoordinate == null) {
-            throw new NullPointerException();
-        }
-
-        ClassLoader cl = null;
-        final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
-        final ExtensionManager extensionManager = flowController.getExtensionManager();
-        try {
-            final Class<?> rawClass;
-            try {
-                final Bundle csBundle = extensionManager.getBundle(bundleCoordinate);
-                if (csBundle == null) {
-                    throw new ControllerServiceInstantiationException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
-                }
-
-                cl = extensionManager.createInstanceClassLoader(type, id, csBundle, additionalUrls);
-                Thread.currentThread().setContextClassLoader(cl);
-                rawClass = Class.forName(type, false, cl);
-            } catch (final Exception e) {
-                logger.error("Could not create Controller Service of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", e);
-                Thread.currentThread().setContextClassLoader(currentContextClassLoader);
-                return createGhostControllerService(type, id, bundleCoordinate);
-            }
-
-            final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
-
-            final ControllerService originalService = controllerServiceClass.newInstance();
-            final StandardControllerServiceInvocationHandler invocationHandler = new StandardControllerServiceInvocationHandler(extensionManager, originalService);
-
-            // extract all interfaces... controllerServiceClass is non null so getAllInterfaces is non null
-            final List<Class<?>> interfaceList = ClassUtils.getAllInterfaces(controllerServiceClass);
-            final Class<?>[] interfaces = interfaceList.toArray(new Class<?>[interfaceList.size()]);
-
-            final ControllerService proxiedService;
-            if (cl == null) {
-                proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), interfaces, invocationHandler);
-            } else {
-                proxiedService = (ControllerService) Proxy.newProxyInstance(cl, interfaces, invocationHandler);
-            }
-            logger.info("Created Controller Service of type {} with identifier {}", type, id);
-
-            final ComponentLog serviceLogger = new SimpleProcessLogger(id, originalService);
-            final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(serviceLogger);
-
-            originalService.initialize(new StandardControllerServiceInitializationContext(id, terminationAwareLogger, this, getStateManager(id), nifiProperties));
-
-
-
-            final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(originalService, bundleCoordinate, terminationAwareLogger);
-            final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, terminationAwareLogger);
-
-            final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
-            final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this, componentVarRegistry);
-            final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler,
-                id, validationContextFactory, this, componentVarRegistry, flowController, flowController.getExtensionManager(), validationTrigger);
-            serviceNode.setName(rawClass.getSimpleName());
-
-            invocationHandler.setServiceNode(serviceNode);
-
-            if (firstTimeAdded) {
-                try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), originalService.getClass(), originalService.getIdentifier())) {
-                    ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService);
-                } catch (final Exception e) {
-                    throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e);
-                }
-            }
-
-            serviceCache.putIfAbsent(id, serviceNode);
-
-            return serviceNode;
-        } catch (final Throwable t) {
-            throw new ControllerServiceInstantiationException(t);
-        } finally {
-            if (currentContextClassLoader != null) {
-                Thread.currentThread().setContextClassLoader(currentContextClassLoader);
-            }
-        }
-    }
-
-    private ControllerServiceNode createGhostControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate) {
-        final ControllerServiceInvocationHandler invocationHandler = new ControllerServiceInvocationHandler() {
-            @Override
-            public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
-                final String methodName = method.getName();
-
-                if ("validate".equals(methodName)) {
-                    final ValidationResult result = new ValidationResult.Builder()
-                            .input("Any Property")
-                            .subject("Missing Controller Service")
-                            .valid(false)
-                            .explanation("Controller Service could not be created because the Controller Service Type (" + type + ") could not be found")
-                            .build();
-                    return Collections.singleton(result);
-                } else if ("getPropertyDescriptor".equals(methodName)) {
-                    final String propertyName = (String) args[0];
-                    return new PropertyDescriptor.Builder()
-                            .name(propertyName)
-                            .description(propertyName)
-                            .sensitive(true)
-                            .required(true)
-                            .build();
-                } else if ("getPropertyDescriptors".equals(methodName)) {
-                    return Collections.emptyList();
-                } else if ("onPropertyModified".equals(methodName)) {
-                    return null;
-                } else if ("getIdentifier".equals(methodName)) {
-                    return id;
-                } else if ("toString".equals(methodName)) {
-                    return "GhostControllerService[id=" + id + ", type=" + type + "]";
-                } else if ("hashCode".equals(methodName)) {
-                    return 91 * type.hashCode() + 41 * id.hashCode();
-                } else if ("equals".equals(methodName)) {
-                    return proxy == args[0];
-                } else {
-                    throw new IllegalStateException("Controller Service could not be created because the Controller Service Type (" + type + ") could not be found");
-                }
-            }
-            @Override
-            public void setServiceNode(ControllerServiceNode serviceNode) {
-                // nothing to do
-            }
-        };
-
-        final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(),
-                new Class[]{ControllerService.class}, invocationHandler);
-
-        final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
-        final String componentType = "(Missing) " + simpleClassName;
-
-        final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, null);
-
-        final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
-        final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, id,
-            new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, componentVarRegistry, flowController,
-                flowController.getExtensionManager(), validationTrigger, true);
-
-        serviceCache.putIfAbsent(id, serviceNode);
-        return serviceNode;
+    public void onControllerServiceAdded(final ControllerServiceNode serviceNode) {
+        serviceCache.putIfAbsent(serviceNode.getIdentifier(), serviceNode);
     }
 
     @Override
@@ -364,7 +189,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
         while (serviceIter.hasNext() && shouldStart) {
             ControllerServiceNode controllerServiceNode = serviceIter.next();
-            List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices();
+            List<ControllerServiceNode> requiredServices = controllerServiceNode.getRequiredControllerServices();
             for (ControllerServiceNode requiredService : requiredServices) {
                 if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
                     shouldStart = false;
@@ -411,10 +236,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) {
         // validate that we are able to start all of the services.
-        Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
-        while (serviceIter.hasNext()) {
-            ControllerServiceNode controllerServiceNode = serviceIter.next();
-            List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices();
+        for (final ControllerServiceNode controllerServiceNode : serviceNodes) {
+            List<ControllerServiceNode> requiredServices = controllerServiceNode.getRequiredControllerServices();
             for (ControllerServiceNode requiredService : requiredServices) {
                 if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
                     logger.error("Cannot enable {} because it has a dependency on {}, which is not enabled", controllerServiceNode, requiredService);
@@ -502,7 +325,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
         for (final ControllerServiceNode node : serviceNodeMap.values()) {
             final List<ControllerServiceNode> branch = new ArrayList<>();
-            determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>());
+            determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<>());
             orderedNodeLists.add(branch);
         }
 
@@ -601,15 +424,15 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     }
 
     private ProcessGroup getRootGroup() {
-        return flowController.getGroup(flowController.getRootGroupId());
+        return flowManager.getRootGroup();
     }
 
     @Override
     public ControllerService getControllerServiceForComponent(final String serviceIdentifier, final String componentId) {
         // Find the Process Group that owns the component.
-        ProcessGroup groupOfInterest = null;
+        ProcessGroup groupOfInterest;
 
-        final ProcessorNode procNode = flowController.getProcessorNode(componentId);
+        final ProcessorNode procNode = flowManager.getProcessorNode(componentId);
         if (procNode == null) {
             final ControllerServiceNode serviceNode = getControllerServiceNode(componentId);
             if (serviceNode == null) {
@@ -620,7 +443,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
                 // we have confirmed that the component is a reporting task. We can only reference Controller Services
                 // that are scoped at the FlowController level in this case.
-                final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
+                final ControllerServiceNode rootServiceNode = flowManager.getRootControllerService(serviceIdentifier);
                 return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
             } else {
                 groupOfInterest = serviceNode.getProcessGroup();
@@ -630,7 +453,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         }
 
         if (groupOfInterest == null) {
-            final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
+            final ControllerServiceNode rootServiceNode = flowManager.getRootControllerService(serviceIdentifier);
             return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
         }
 
@@ -663,7 +486,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     @Override
     public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
-        final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
+        final ControllerServiceNode rootServiceNode = flowManager.getRootControllerService(serviceIdentifier);
         if (rootServiceNode != null) {
             return rootServiceNode;
         }
@@ -675,10 +498,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, final String groupId) {
         final Set<ControllerServiceNode> serviceNodes;
         if (groupId == null) {
-            serviceNodes = flowController.getRootControllerServices();
+            serviceNodes = flowManager.getRootControllerServices();
         } else {
             ProcessGroup group = getRootGroup();
-            if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) {
+            if (!FlowManager.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) {
                 group = group.findProcessGroup(groupId);
             }
 
@@ -706,7 +529,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     public void removeControllerService(final ControllerServiceNode serviceNode) {
         final ProcessGroup group = requireNonNull(serviceNode).getProcessGroup();
         if (group == null) {
-            flowController.removeRootControllerService(serviceNode);
+            flowManager.removeRootControllerService(serviceNode);
             return;
         }
 
@@ -718,12 +541,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     }
 
     @Override
-    public Set<ControllerServiceNode> getAllControllerServices() {
-        final Set<ControllerServiceNode> allServices = new HashSet<>();
-        allServices.addAll(flowController.getRootControllerServices());
-        allServices.addAll(serviceCache.values());
-
-        return allServices;
+    public Collection<ControllerServiceNode> getNonRootControllerServices() {
+        return serviceCache.values();
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
index 639f8a2..6a60058 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
@@ -17,9 +17,6 @@
 
 package org.apache.nifi.controller.state;
 
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
@@ -29,6 +26,9 @@ import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.processor.SimpleProcessLogger;
 
+import java.io.IOException;
+import java.util.Map;
+
 public class StandardStateManager implements StateManager {
     private final StateProvider localProvider;
     private final StateProvider clusterProvider;

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 851aad3..5a49c72 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -19,10 +19,12 @@ package org.apache.nifi.controller.tasks;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.lifecycle.TaskTerminationAwareStateManager;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.ActiveProcessSessionFactory;
 import org.apache.nifi.controller.repository.BatchingSessionFactory;
 import org.apache.nifi.controller.repository.RepositoryContext;
@@ -80,7 +82,7 @@ public class ConnectableTask {
 
         final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
         if (connectable instanceof ProcessorNode) {
-            processContext = new StandardProcessContext((ProcessorNode) connectable, flowController, encryptor, stateManager, scheduleState::isTerminated);
+            processContext = new StandardProcessContext((ProcessorNode) connectable, flowController.getControllerServiceProvider(), encryptor, stateManager, scheduleState::isTerminated);
         } else {
             processContext = new ConnectableProcessContext(connectable, encryptor, stateManager);
         }
@@ -142,8 +144,8 @@ public class ConnectableTask {
     private boolean isBackPressureEngaged() {
         return connectable.getIncomingConnections().stream()
             .filter(con -> con.getSource() == connectable)
-            .map(con -> con.getFlowFileQueue())
-            .anyMatch(queue -> queue.isFull());
+            .map(Connection::getFlowFileQueue)
+            .anyMatch(FlowFileQueue::isFull);
     }
 
     public InvocationResult invoke() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
index 6e2ee4c..0c8e8a9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
@@ -16,9 +16,6 @@
  */
 package org.apache.nifi.controller.tasks;
 
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Funnel;
@@ -35,6 +32,9 @@ import org.apache.nifi.util.FormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * This task runs through all Connectable Components and goes through its incoming queues, polling for FlowFiles and accepting none. This causes the desired side effect of expiring old FlowFiles.
  */
@@ -51,7 +51,7 @@ public class ExpireFlowFiles implements Runnable {
 
     @Override
     public void run() {
-        final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+        final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
         try {
             expireFlowFiles(rootGroup);
         } catch (final Exception e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index a27962a..61a902a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -52,6 +52,7 @@ import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.LoadBalanceCompression;
@@ -64,6 +65,7 @@ import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.Relationship;
@@ -160,6 +162,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     private final StandardProcessScheduler scheduler;
     private final ControllerServiceProvider controllerServiceProvider;
     private final FlowController flowController;
+    private final FlowManager flowManager;
 
     private final Map<String, Port> inputPorts = new HashMap<>();
     private final Map<String, Port> outputPorts = new HashMap<>();
@@ -192,6 +195,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         this.encryptor = encryptor;
         this.flowController = flowController;
         this.variableRegistry = variableRegistry;
+        this.flowManager = flowController.getFlowManager();
 
         name = new AtomicReference<>();
         position = new AtomicReference<>(new Position(0D, 0D));
@@ -422,9 +426,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 }
             });
 
-            findAllInputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> {
-                port.getProcessGroup().startInputPort(port);
-            });
+            findAllInputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> port.getProcessGroup().startInputPort(port));
 
             findAllOutputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> {
                 port.getProcessGroup().startOutputPort(port);
@@ -446,13 +448,8 @@ public final class StandardProcessGroup implements ProcessGroup {
                 }
             });
 
-            findAllInputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> {
-                port.getProcessGroup().stopInputPort(port);
-            });
-
-            findAllOutputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> {
-                port.getProcessGroup().stopOutputPort(port);
-            });
+            findAllInputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> port.getProcessGroup().stopInputPort(port));
+            findAllOutputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> port.getProcessGroup().stopOutputPort(port));
         } finally {
             readLock.unlock();
         }
@@ -513,7 +510,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
             port.setProcessGroup(this);
             inputPorts.put(requireNonNull(port).getIdentifier(), port);
-            flowController.onInputPortAdded(port);
+            flowManager.onInputPortAdded(port);
             onComponentModified();
         } finally {
             writeLock.unlock();
@@ -552,7 +549,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             scheduler.onPortRemoved(port);
             onComponentModified();
 
-            flowController.onInputPortRemoved(port);
+            flowManager.onInputPortRemoved(port);
             LOG.info("Input Port {} removed from flow", port);
         } finally {
             writeLock.unlock();
@@ -598,7 +595,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
             port.setProcessGroup(this);
             outputPorts.put(port.getIdentifier(), port);
-            flowController.onOutputPortAdded(port);
+            flowManager.onOutputPortAdded(port);
             onComponentModified();
         } finally {
             writeLock.unlock();
@@ -628,7 +625,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             scheduler.onPortRemoved(port);
             onComponentModified();
 
-            flowController.onOutputPortRemoved(port);
+            flowManager.onOutputPortRemoved(port);
             LOG.info("Output Port {} removed from flow", port);
         } finally {
             writeLock.unlock();
@@ -667,10 +664,10 @@ public final class StandardProcessGroup implements ProcessGroup {
             group.getVariableRegistry().setParent(getVariableRegistry());
 
             processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
-            flowController.onProcessGroupAdded(group);
+            flowManager.onProcessGroupAdded(group);
 
-            group.findAllControllerServices().stream().forEach(this::updateControllerServiceReferences);
-            group.findAllProcessors().stream().forEach(this::updateControllerServiceReferences);
+            group.findAllControllerServices().forEach(this::updateControllerServiceReferences);
+            group.findAllProcessors().forEach(this::updateControllerServiceReferences);
 
             onComponentModified();
         } finally {
@@ -714,7 +711,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             processGroups.remove(group.getIdentifier());
             onComponentModified();
 
-            flowController.onProcessGroupRemoved(group);
+            flowManager.onProcessGroupRemoved(group);
             LOG.info("{} removed from flow", group);
         } finally {
             writeLock.unlock();
@@ -752,7 +749,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
         for (final ControllerServiceNode cs : group.getControllerServices(false)) {
             // Must go through Controller Service here because we need to ensure that it is removed from the cache
-            flowController.removeControllerService(cs);
+            flowController.getControllerServiceProvider().removeControllerService(cs);
         }
 
         for (final ProcessGroup childGroup : new ArrayList<>(group.getProcessGroups())) {
@@ -820,8 +817,8 @@ public final class StandardProcessGroup implements ProcessGroup {
                 LOG.warn("Failed to clean up resources for {} due to {}", remoteGroup, e);
             }
 
-            remoteGroup.getInputPorts().stream().forEach(scheduler::onPortRemoved);
-            remoteGroup.getOutputPorts().stream().forEach(scheduler::onPortRemoved);
+            remoteGroup.getInputPorts().forEach(scheduler::onPortRemoved);
+            remoteGroup.getOutputPorts().forEach(scheduler::onPortRemoved);
 
             remoteGroups.remove(remoteGroupId);
             LOG.info("{} removed from flow", remoteProcessGroup);
@@ -843,7 +840,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             processor.setProcessGroup(this);
             processor.getVariableRegistry().setParent(getVariableRegistry());
             processors.put(processorId, processor);
-            flowController.onProcessorAdded(processor);
+            flowManager.onProcessorAdded(processor);
             updateControllerServiceReferences(processor);
             onComponentModified();
         } finally {
@@ -926,9 +923,12 @@ public final class StandardProcessGroup implements ProcessGroup {
             onComponentModified();
 
             scheduler.onProcessorRemoved(processor);
-            flowController.onProcessorRemoved(processor);
+            flowManager.onProcessorRemoved(processor);
 
-            LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
+            final LogRepository logRepository = LogRepositoryFactory.getRepository(processor.getIdentifier());
+            if (logRepository != null) {
+                logRepository.removeAllObservers();
+            }
 
             final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider();
             scheduler.submitFrameworkTask(new Runnable() {
@@ -1068,7 +1068,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 destination.addConnection(connection);
             }
             connections.put(connection.getIdentifier(), connection);
-            flowController.onConnectionAdded(connection);
+            flowManager.onConnectionAdded(connection);
             onComponentModified();
         } finally {
             writeLock.unlock();
@@ -1133,7 +1133,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             LOG.info("{} removed from flow", connection);
             onComponentModified();
 
-            flowController.onConnectionRemoved(connection);
+            flowManager.onConnectionRemoved(connection);
         } finally {
             writeLock.unlock();
         }
@@ -1161,7 +1161,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     @Override
     public Connection findConnection(final String id) {
-        final Connection connection = flowController.getConnection(id);
+        final Connection connection = flowManager.getConnection(id);
         if (connection == null) {
             return null;
         }
@@ -1601,12 +1601,12 @@ public final class StandardProcessGroup implements ProcessGroup {
             return this;
         }
 
-        final ProcessGroup group = flowController.getGroup(id);
+        final ProcessGroup group = flowManager.getGroup(id);
         if (group == null) {
             return null;
         }
 
-        // We found a Processor in the Controller, but we only want to return it if
+        // We found a Process Group in the Controller, but we only want to return it if
         // the Process Group is this or is a child of this.
         if (isOwner(group.getParent())) {
             return group;
@@ -1664,7 +1664,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     @Override
     public ProcessorNode findProcessor(final String id) {
-        final ProcessorNode node = flowController.getProcessorNode(id);
+        final ProcessorNode node = flowManager.getProcessorNode(id);
         if (node == null) {
             return null;
         }
@@ -1769,7 +1769,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     @Override
     public Port findInputPort(final String id) {
-        final Port port = flowController.getInputPort(id);
+        final Port port = flowManager.getInputPort(id);
         if (port == null) {
             return null;
         }
@@ -1796,7 +1796,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     @Override
     public Port findOutputPort(final String id) {
-        final Port port = flowController.getOutputPort(id);
+        final Port port = flowManager.getOutputPort(id);
         if (port == null) {
             return null;
         }
@@ -1904,7 +1904,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
             funnel.setProcessGroup(this);
             funnels.put(funnel.getIdentifier(), funnel);
-            flowController.onFunnelAdded(funnel);
+            flowManager.onFunnelAdded(funnel);
 
             if (autoStart) {
                 startFunnel(funnel);
@@ -1928,7 +1928,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     @Override
     public Funnel findFunnel(final String id) {
-        final Funnel funnel = flowController.getFunnel(id);
+        final Funnel funnel = flowManager.getFunnel(id);
         if (funnel == null) {
             return funnel;
         }
@@ -2026,7 +2026,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             funnels.remove(funnel.getIdentifier());
             onComponentModified();
 
-            flowController.onFunnelRemoved(funnel);
+            flowManager.onFunnelRemoved(funnel);
             LOG.info("{} removed from flow", funnel);
         } finally {
             writeLock.unlock();
@@ -2988,7 +2988,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             }
 
             final Map<VariableDescriptor, String> variableMap = new HashMap<>();
-            variables.entrySet().stream() // cannot use Collectors.toMap because value may be null
+            variables.entrySet() // cannot use Collectors.toMap because value may be null
                 .forEach(entry -> variableMap.put(new VariableDescriptor(entry.getKey()), entry.getValue()));
 
             variableRegistry.setVariables(variableMap);
@@ -3199,30 +3199,27 @@ public final class StandardProcessGroup implements ProcessGroup {
     private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function<String, String> lookup) {
         processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier()));
 
-        processGroup.getConnections().stream()
+        processGroup.getConnections()
             .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
-        processGroup.getProcessors().stream()
+        processGroup.getProcessors()
             .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
-        processGroup.getInputPorts().stream()
+        processGroup.getInputPorts()
             .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
-        processGroup.getOutputPorts().stream()
+        processGroup.getOutputPorts()
             .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
-        processGroup.getLabels().stream()
+        processGroup.getLabels()
             .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
-        processGroup.getFunnels().stream()
+        processGroup.getFunnels()
             .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
-        processGroup.getControllerServices(false).stream()
+        processGroup.getControllerServices(false)
             .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
 
-        processGroup.getRemoteProcessGroups().stream()
+        processGroup.getRemoteProcessGroups()
             .forEach(rpg -> {
                 rpg.setVersionedComponentId(lookup.apply(rpg.getIdentifier()));
 
-                rpg.getInputPorts().stream()
-                    .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
-
-                rpg.getOutputPorts().stream()
-                    .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
+                rpg.getInputPorts().forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
+                rpg.getOutputPorts().forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
             });
 
         for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
@@ -3552,7 +3549,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
             if (childGroup == null) {
                 final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
-                flowController.onProcessGroupAdded(added);
+                flowManager.onProcessGroupAdded(added);
                 added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
                 LOG.info("Added {} to {}", added, this);
             } else if (childCoordinates == null || updateDescendantVersionedGroups) {
@@ -3572,7 +3569,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier());
             if (funnel == null) {
                 final Funnel added = addFunnel(group, proposedFunnel, componentIdSeed);
-                flowController.onFunnelAdded(added);
+                flowManager.onFunnelAdded(added);
                 LOG.info("Added {} to {}", added, this);
             } else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) {
                 updateFunnel(funnel, proposedFunnel);
@@ -3594,7 +3591,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
             if (port == null) {
                 final Port added = addInputPort(group, proposedPort, componentIdSeed);
-                flowController.onInputPortAdded(added);
+                flowManager.onInputPortAdded(added);
                 LOG.info("Added {} to {}", added, this);
             } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
                 updatePort(port, proposedPort);
@@ -3615,7 +3612,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier());
             if (port == null) {
                 final Port added = addOutputPort(group, proposedPort, componentIdSeed);
-                flowController.onOutputPortAdded(added);
+                flowManager.onOutputPortAdded(added);
                 LOG.info("Added {} to {}", added, this);
             } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
                 updatePort(port, proposedPort);
@@ -3659,7 +3656,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
             if (processor == null) {
                 final ProcessorNode added = addProcessor(group, proposedProcessor, componentIdSeed);
-                flowController.onProcessorAdded(added);
+                flowManager.onProcessorAdded(added);
 
                 final Set<Relationship> proposedAutoTerminated =
                     proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
@@ -3718,7 +3715,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
             if (connection == null) {
                 final Connection added = addConnection(group, proposedConnection, componentIdSeed);
-                flowController.onConnectionAdded(added);
+                flowManager.onConnectionAdded(added);
                 LOG.info("Added {} to {}", added, this);
             } else if (isUpdateable(connection)) {
                 // If the connection needs to be updated, then the source and destination will already have
@@ -3739,7 +3736,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final Connection connection = connectionsByVersionedId.get(removedVersionedId);
             LOG.info("Removing {} from {}", connection, group);
             group.removeConnection(connection);
-            flowController.onConnectionRemoved(connection);
+            flowManager.onConnectionRemoved(connection);
         }
 
         // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
@@ -3753,7 +3750,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId);
             LOG.info("Removing {} from {}", service, group);
             // Must remove Controller Service through Flow Controller in order to remove from cache
-            flowController.removeControllerService(service);
+            flowController.getControllerServiceProvider().removeControllerService(service);
         }
 
         for (final String removedVersionedId : funnelsRemoved) {
@@ -3832,7 +3829,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed, final Set<String> variablesToSkip)
             throws ProcessorInstantiationException {
-        final ProcessGroup group = flowController.createProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
+        final ProcessGroup group = flowManager.createProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
         group.setVersionedComponentId(proposed.getIdentifier());
         group.setParent(destination);
         updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, true, variablesToSkip);
@@ -3862,7 +3859,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         final List<FlowFilePrioritizer> prioritizers = proposed.getPrioritizers() == null ? Collections.emptyList() : proposed.getPrioritizers().stream()
             .map(prioritizerName -> {
                 try {
-                    return flowController.createPrioritizer(prioritizerName);
+                    return flowManager.createPrioritizer(prioritizerName);
                 } catch (final Exception e) {
                     throw new IllegalStateException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier());
                 }
@@ -3908,7 +3905,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         destinationGroup.addConnection(connection);
         updateConnection(connection, proposed);
 
-        flowController.onConnectionAdded(connection);
+        flowManager.onConnectionAdded(connection);
         return connection;
     }
 
@@ -4071,7 +4068,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
                 final List<PropertyDescriptor> descriptors = new ArrayList<>(service.getProperties().keySet());
                 final Set<URL> additionalUrls = service.getAdditionalClasspathResources(descriptors);
-                flowController.reload(service, proposed.getType(), newBundleCoordinate, additionalUrls);
+                flowController.getReloadComponent().reload(service, proposed.getType(), newBundleCoordinate, additionalUrls);
             }
         } finally {
             service.resumeValidationTrigger();
@@ -4107,7 +4104,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         final boolean firstTimeAdded = true;
         final Set<URL> additionalUrls = Collections.emptySet();
 
-        final ControllerServiceNode newService = flowController.createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded);
+        final ControllerServiceNode newService = flowManager.createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded, true);
         newService.setVersionedComponentId(proposed.getIdentifier());
 
         destination.addControllerService(newService);
@@ -4121,7 +4118,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final String componentIdSeed) {
-        final Funnel funnel = flowController.createFunnel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
+        final Funnel funnel = flowManager.createFunnel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
         funnel.setVersionedComponentId(proposed.getIdentifier());
         destination.addFunnel(funnel);
         updateFunnel(funnel, proposed);
@@ -4136,7 +4133,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
-        final Port port = flowController.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
+        final Port port = flowManager.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
         port.setVersionedComponentId(proposed.getIdentifier());
         destination.addInputPort(port);
         updatePort(port, proposed);
@@ -4145,7 +4142,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
-        final Port port = flowController.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
+        final Port port = flowManager.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
         port.setVersionedComponentId(proposed.getIdentifier());
         destination.addOutputPort(port);
         updatePort(port, proposed);
@@ -4154,7 +4151,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     private Label addLabel(final ProcessGroup destination, final VersionedLabel proposed, final String componentIdSeed) {
-        final Label label = flowController.createLabel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getLabel());
+        final Label label = flowManager.createLabel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getLabel());
         label.setVersionedComponentId(proposed.getIdentifier());
         destination.addLabel(label);
         updateLabel(label, proposed);
@@ -4171,7 +4168,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final String componentIdSeed) throws ProcessorInstantiationException {
         final BundleCoordinate coordinate = toCoordinate(proposed.getBundle());
-        final ProcessorNode procNode = flowController.createProcessor(proposed.getType(), generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), coordinate, true);
+        final ProcessorNode procNode = flowManager.createProcessor(proposed.getType(), generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), coordinate, true);
         procNode.setVersionedComponentId(proposed.getIdentifier());
 
         destination.addProcessor(procNode);
@@ -4204,7 +4201,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
                 final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
                 final Set<URL> additionalUrls = processor.getAdditionalClasspathResources(descriptors);
-                flowController.reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls);
+                flowController.getReloadComponent().reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls);
             }
         } finally {
             processor.resumeValidationTrigger();
@@ -4276,7 +4273,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
-        final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris());
+        final RemoteProcessGroup rpg = flowManager.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris());
         rpg.setVersionedComponentId(proposed.getIdentifier());
 
         destination.addRemoteProcessGroup(rpg);
@@ -4439,7 +4436,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 .forEach(port -> removedInputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
             flowContents.getInputPorts().stream()
                 .map(VersionedPort::getIdentifier)
-                .forEach(id -> removedInputPortsByVersionId.remove(id));
+                .forEach(removedInputPortsByVersionId::remove);
 
             // Ensure that there are no incoming connections for any Input Port that was removed.
             for (final Port inputPort : removedInputPortsByVersionId.values()) {
@@ -4457,7 +4454,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 .forEach(port -> removedOutputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
             flowContents.getOutputPorts().stream()
                 .map(VersionedPort::getIdentifier)
-                .forEach(id -> removedOutputPortsByVersionId.remove(id));
+                .forEach(removedOutputPortsByVersionId::remove);
 
             // Ensure that there are no outgoing connections for any Output Port that was removed.
             for (final Port outputPort : removedOutputPortsByVersionId.values()) {
@@ -4544,7 +4541,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 if (connectionToAdd.getPrioritizers() != null) {
                     for (final String prioritizerType : connectionToAdd.getPrioritizers()) {
                         try {
-                            flowController.createPrioritizer(prioritizerType);
+                            flowManager.createPrioritizer(prioritizerType);
                         } catch (Exception e) {
                             throw new IllegalArgumentException("Unable to create Prioritizer of type " + prioritizerType, e);
                         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
index c610f8c..3287632 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
@@ -96,11 +96,9 @@ public class StandardLogRepository implements LogRepository {
         try {
             final LogObserver observer = removeObserver(observerIdentifier);
 
-            if (observer == null) {
-                throw new IllegalArgumentException("The specified observer cannot be found.");
+            if (observer != null) {
+                addObserver(observerIdentifier, level, observer);
             }
-
-            addObserver(observerIdentifier, level, observer);
         } finally {
             writeLock.unlock();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
index a0300ee..d90535c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
@@ -16,12 +16,13 @@
  */
 package org.apache.nifi.processor;
 
-import java.io.File;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.NiFiProperties;
+
+import java.io.File;
 
 public class StandardProcessorInitializationContext implements ProcessorInitializationContext {
 
@@ -29,17 +30,16 @@ public class StandardProcessorInitializationContext implements ProcessorInitiali
     private final ComponentLog logger;
     private final ControllerServiceProvider serviceProvider;
     private final NodeTypeProvider nodeTypeProvider;
-    private final NiFiProperties nifiProperties;
+    private final KerberosConfig kerberosConfig;
 
-    public StandardProcessorInitializationContext(
-            final String identifier, final ComponentLog componentLog,
+    public StandardProcessorInitializationContext(final String identifier, final ComponentLog componentLog,
             final ControllerServiceProvider serviceProvider, final NodeTypeProvider nodeTypeProvider,
-            final NiFiProperties nifiProperties) {
+            final KerberosConfig kerberosConfig) {
         this.identifier = identifier;
         this.logger = componentLog;
         this.serviceProvider = serviceProvider;
         this.nodeTypeProvider = nodeTypeProvider;
-        this.nifiProperties = nifiProperties;
+        this.kerberosConfig = kerberosConfig;
     }
 
     @Override
@@ -64,16 +64,16 @@ public class StandardProcessorInitializationContext implements ProcessorInitiali
 
     @Override
     public String getKerberosServicePrincipal() {
-        return nifiProperties.getKerberosServicePrincipal();
+        return kerberosConfig.getPrincipal();
     }
 
     @Override
     public File getKerberosServiceKeytab() {
-        return nifiProperties.getKerberosServiceKeytabLocation() == null ? null : new File(nifiProperties.getKerberosServiceKeytabLocation());
+        return kerberosConfig.getKeytabLocation();
     }
 
     @Override
     public File getKerberosConfigurationFile() {
-        return nifiProperties.getKerberosConfigurationFile();
+        return kerberosConfig.getConfigFile();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
new file mode 100644
index 0000000..8dc1a9d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.Processor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ComponentIdentifierLookup implements IdentifierLookup {
+    private final FlowController flowController;
+
+    public ComponentIdentifierLookup(final FlowController flowController) {
+        this.flowController = flowController;
+    }
+
+    @Override
+    public List<String> getComponentIdentifiers() {
+        final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+
+        final List<String> componentIds = new ArrayList<>();
+        rootGroup.findAllProcessors().forEach(proc -> componentIds.add(proc.getIdentifier()));
+        rootGroup.getInputPorts().forEach(port -> componentIds.add(port.getIdentifier()));
+        rootGroup.getOutputPorts().forEach(port -> componentIds.add(port.getIdentifier()));
+
+        return componentIds;
+    }
+
+    @Override
+    public List<String> getComponentTypes() {
+        final Set<Class> procClasses = flowController.getExtensionManager().getExtensions(Processor.class);
+
+        final List<String> componentTypes = new ArrayList<>(procClasses.size() + 2);
+        componentTypes.add(ProvenanceEventRecord.REMOTE_INPUT_PORT_TYPE);
+        componentTypes.add(ProvenanceEventRecord.REMOTE_OUTPUT_PORT_TYPE);
+
+        procClasses.stream()
+            .map(Class::getSimpleName)
+            .forEach(componentTypes::add);
+
+        return componentTypes;
+    }
+
+    @Override
+    public List<String> getQueueIdentifiers() {
+        final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+
+        return rootGroup.findAllConnections().stream()
+            .map(Connection::getIdentifier)
+            .collect(Collectors.toList());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/StandardProvenanceAuthorizableFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/StandardProvenanceAuthorizableFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/StandardProvenanceAuthorizableFactory.java
new file mode 100644
index 0000000..9eddf76
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/StandardProvenanceAuthorizableFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.authorization.resource.DataAuthorizable;
+import org.apache.nifi.authorization.resource.ProvenanceDataAuthorizable;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.web.ResourceNotFoundException;
+
+public class StandardProvenanceAuthorizableFactory implements ProvenanceAuthorizableFactory {
+    private final FlowController flowController;
+
+    public StandardProvenanceAuthorizableFactory(final FlowController flowController) {
+        this.flowController = flowController;
+    }
+
+
+    @Override
+    public Authorizable createLocalDataAuthorizable(final String componentId) {
+        final FlowManager flowManager = flowController.getFlowManager();
+        final String rootGroupId = flowManager.getRootGroupId();
+
+        // Provenance Events are generated only by connectable components, with the exception of DOWNLOAD events,
+        // which have the root process group's identifier assigned as the component ID, and DROP events, which
+        // could have the connection identifier assigned as the component ID. So, we check if the component ID
+        // is set to the root group and otherwise assume that the ID is that of a connectable or connection.
+        final DataAuthorizable authorizable;
+        if (rootGroupId.equals(componentId)) {
+            authorizable = new DataAuthorizable(flowManager.getRootGroup());
+        } else {
+            // check if the component is a connectable, this should be the case most often
+            final Connectable connectable = flowManager.findConnectable(componentId);
+            if (connectable == null) {
+                // if the component id is not a connectable then consider a connection
+                final Connection connection = flowManager.getRootGroup().findConnection(componentId);
+
+                if (connection == null) {
+                    throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");
+                } else {
+                    // authorizable for connection data is associated with the source connectable
+                    authorizable = new DataAuthorizable(connection.getSource());
+                }
+            } else {
+                authorizable = new DataAuthorizable(connectable);
+            }
+        }
+
+        return authorizable;
+    }
+
+
+
+    @Override
+    public Authorizable createRemoteDataAuthorizable(String remoteGroupPortId) {
+        final DataAuthorizable authorizable;
+
+        final RemoteGroupPort remoteGroupPort = flowController.getFlowManager().getRootGroup().findRemoteGroupPort(remoteGroupPortId);
+        if (remoteGroupPort == null) {
+            throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");
+        } else {
+            // authorizable for remote group ports should be the remote process group
+            authorizable = new DataAuthorizable(remoteGroupPort.getRemoteProcessGroup());
+        }
+
+        return authorizable;
+    }
+
+    @Override
+    public Authorizable createProvenanceDataAuthorizable(String componentId) {
+        final FlowManager flowManager = flowController.getFlowManager();
+        final String rootGroupId = flowManager.getRootGroupId();
+
+        // Provenance Events are generated only by connectable components, with the exception of DOWNLOAD events,
+        // which have the root process group's identifier assigned as the component ID, and DROP events, which
+        // could have the connection identifier assigned as the component ID. So, we check if the component ID
+        // is set to the root group and otherwise assume that the ID is that of a connectable or connection.
+        final ProvenanceDataAuthorizable authorizable;
+        if (rootGroupId.equals(componentId)) {
+            authorizable = new ProvenanceDataAuthorizable(flowManager.getRootGroup());
+        } else {
+            // check if the component is a connectable, this should be the case most often
+            final Connectable connectable = flowManager.findConnectable(componentId);
+            if (connectable == null) {
+                // if the component id is not a connectable then consider a connection
+                final Connection connection = flowManager.getRootGroup().findConnection(componentId);
+
+                if (connection == null) {
+                    throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");
+                } else {
+                    // authorizable for connection data is associated with the source connectable
+                    authorizable = new ProvenanceDataAuthorizable(connection.getSource());
+                }
+            } else {
+                authorizable = new ProvenanceDataAuthorizable(connectable);
+            }
+        }
+
+        return authorizable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index c4621e6..fe78a59 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -16,40 +16,6 @@
  */
 package org.apache.nifi.remote;
 
-import static java.util.Objects.requireNonNull;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.core.Response;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
@@ -60,7 +26,6 @@ import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
-import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.exception.CommunicationsException;
@@ -84,10 +49,43 @@ import org.apache.nifi.web.api.dto.PortDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
 /**
  * Represents the Root Process Group of a remote NiFi Instance. Holds
- * information about that remote instance, as well as {@link IncomingPort}s and
- * {@link OutgoingPort}s for communicating with the remote instance.
+ * information about that remote instance, as well as Incoming Ports and
+ * Outgoing Ports for communicating with the remote instance.
  */
 public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
@@ -148,8 +146,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
     private final ScheduledExecutorService backgroundThreadExecutor;
 
-    public StandardRemoteProcessGroup(final String id, final String targetUris, final ProcessGroup processGroup,
-                                      final FlowController flowController, final SSLContext sslContext, final NiFiProperties nifiProperties) {
+    public StandardRemoteProcessGroup(final String id, final String targetUris, final ProcessGroup processGroup, final ProcessScheduler processScheduler,
+                                      final BulletinRepository bulletinRepository, final SSLContext sslContext, final NiFiProperties nifiProperties) {
         this.nifiProperties = nifiProperties;
         this.id = requireNonNull(id);
 
@@ -157,13 +155,12 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         this.targetId = null;
         this.processGroup = new AtomicReference<>(processGroup);
         this.sslContext = sslContext;
-        this.scheduler = flowController.getProcessScheduler();
+        this.scheduler = processScheduler;
         this.authorizationIssue = "Establishing connection to " + targetUris;
 
         final String expirationPeriod = nifiProperties.getProperty(NiFiProperties.REMOTE_CONTENTS_CACHE_EXPIRATION, "30 secs");
         remoteContentsCacheExpiration = FormatUtils.getTimeDuration(expirationPeriod, TimeUnit.MILLISECONDS);
 
-        final BulletinRepository bulletinRepository = flowController.getBulletinRepository();
         eventReporter = new EventReporter() {
             private static final long serialVersionUID = 1L;
 
@@ -700,7 +697,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     }
 
     /**
-     * @return a set of {@link OutgoingPort}s used for transmitting FlowFiles to
+     * @return a set of Outgoing Ports used for transmitting FlowFiles to
      * the remote instance
      */
     @Override