You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/11/06 16:24:07 UTC

[6/9] nifi git commit: NIFI-5769: Refactored FlowController to use Composition over Inheritance - Ensure that when root group is set, that we register its ID in FlowManager

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
new file mode 100644
index 0000000..6a579e9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.processor.StandardProcessContext;
+import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Set;
+
+public class StandardReloadComponent implements ReloadComponent {
+    private static final Logger logger = LoggerFactory.getLogger(StandardReloadComponent.class);
+
+    private final FlowController flowController;
+
+    public StandardReloadComponent(final FlowController flowController) {
+        this.flowController = flowController;
+    }
+
+
+    @Override
+    public void reload(final ProcessorNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
+        throws ProcessorInstantiationException {
+        if (existingNode == null) {
+            throw new IllegalStateException("Existing ProcessorNode cannot be null");
+        }
+
+        final String id = existingNode.getProcessor().getIdentifier();
+
+        // ghost components will have a null logger
+        if (existingNode.getLogger() != null) {
+            existingNode.getLogger().debug("Reloading component {} to type {} from bundle {}", new Object[]{id, newType, bundleCoordinate});
+        }
+
+        final ExtensionManager extensionManager = flowController.getExtensionManager();
+
+        // createProcessor will create a new instance class loader for the same id so
+        // save the instance class loader to use it for calling OnRemoved on the existing processor
+        final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
+
+        // create a new node with firstTimeAdded as true so lifecycle methods get fired
+        // attempt the creation to make sure it works before firing the OnRemoved methods below
+        final ProcessorNode newNode = flowController.getFlowManager().createProcessor(newType, id, bundleCoordinate, additionalUrls, true, false);
+
+        // call OnRemoved for the existing processor using the previous instance class loader
+        try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+            final StateManager stateManager = flowController.getStateManagerProvider().getStateManager(id);
+            final StandardProcessContext processContext = new StandardProcessContext(existingNode, flowController.getControllerServiceProvider(),
+                flowController.getEncryptor(), stateManager, () -> false);
+
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
+        } finally {
+            extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
+        }
+
+        // set the new processor in the existing node
+        final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getProcessor());
+        final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
+        LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
+
+        final LoggableComponent<Processor> newProcessor = new LoggableComponent<>(newNode.getProcessor(), newNode.getBundleCoordinate(), terminationAwareLogger);
+        existingNode.setProcessor(newProcessor);
+        existingNode.setExtensionMissing(newNode.isExtensionMissing());
+
+        // need to refresh the properties in case we are changing from ghost component to real component
+        existingNode.refreshProperties();
+
+        logger.debug("Triggering async validation of {} due to processor reload", existingNode);
+        flowController.getValidationTrigger().trigger(existingNode);
+    }
+
+    @Override
+    public void reload(final ControllerServiceNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
+        throws ControllerServiceInstantiationException {
+        if (existingNode == null) {
+            throw new IllegalStateException("Existing ControllerServiceNode cannot be null");
+        }
+
+        final String id = existingNode.getIdentifier();
+
+        // ghost components will have a null logger
+        if (existingNode.getLogger() != null) {
+            existingNode.getLogger().debug("Reloading component {} to type {} from bundle {}", new Object[]{id, newType, bundleCoordinate});
+        }
+
+        final ExtensionManager extensionManager = flowController.getExtensionManager();
+
+        // createControllerService will create a new instance class loader for the same id so
+        // save the instance class loader to use it for calling OnRemoved on the existing service
+        final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
+
+        // create a new node with firstTimeAdded as true so lifecycle methods get called
+        // attempt the creation to make sure it works before firing the OnRemoved methods below
+        final ControllerServiceNode newNode = flowController.getFlowManager().createControllerService(newType, id, bundleCoordinate, additionalUrls, true, false);
+
+        // call OnRemoved for the existing service using the previous instance class loader
+        try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+            final ConfigurationContext configurationContext = new StandardConfigurationContext(existingNode, flowController.getControllerServiceProvider(),
+                null, flowController.getVariableRegistry());
+
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getControllerServiceImplementation(), configurationContext);
+        } finally {
+            extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
+        }
+
+        // take the invocation handler that was created for new proxy and is set to look at the new node,
+        // and set it to look at the existing node
+        final ControllerServiceInvocationHandler invocationHandler = newNode.getInvocationHandler();
+        invocationHandler.setServiceNode(existingNode);
+
+        // create LoggableComponents for the proxy and implementation
+        final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getControllerServiceImplementation());
+        final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
+        LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
+
+        final LoggableComponent<ControllerService> loggableProxy = new LoggableComponent<>(newNode.getProxiedControllerService(), bundleCoordinate, terminationAwareLogger);
+        final LoggableComponent<ControllerService> loggableImplementation = new LoggableComponent<>(newNode.getControllerServiceImplementation(), bundleCoordinate, terminationAwareLogger);
+
+        // set the new impl, proxy, and invocation handler into the existing node
+        existingNode.setControllerServiceAndProxy(loggableImplementation, loggableProxy, invocationHandler);
+        existingNode.setExtensionMissing(newNode.isExtensionMissing());
+
+        // need to refresh the properties in case we are changing from ghost component to real component
+        existingNode.refreshProperties();
+
+        logger.debug("Triggering async validation of {} due to controller service reload", existingNode);
+        flowController.getValidationTrigger().triggerAsync(existingNode);
+    }
+
+    @Override
+    public void reload(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
+        throws ReportingTaskInstantiationException {
+        if (existingNode == null) {
+            throw new IllegalStateException("Existing ReportingTaskNode cannot be null");
+        }
+
+        final String id = existingNode.getReportingTask().getIdentifier();
+
+        // ghost components will have a null logger
+        if (existingNode.getLogger() != null) {
+            existingNode.getLogger().debug("Reloading component {} to type {} from bundle {}", new Object[]{id, newType, bundleCoordinate});
+        }
+
+        final ExtensionManager extensionManager = flowController.getExtensionManager();
+
+        // createReportingTask will create a new instance class loader for the same id so
+        // save the instance class loader to use it for calling OnRemoved on the existing processor
+        final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
+
+        // set firstTimeAdded to true so lifecycle annotations get fired, but don't register this node
+        // attempt the creation to make sure it works before firing the OnRemoved methods below
+        final ReportingTaskNode newNode = flowController.getFlowManager().createReportingTask(newType, id, bundleCoordinate, additionalUrls, true, false);
+
+        // call OnRemoved for the existing reporting task using the previous instance class loader
+        try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getReportingTask(), existingNode.getConfigurationContext());
+        } finally {
+            extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
+        }
+
+        // set the new reporting task into the existing node
+        final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getReportingTask());
+        final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
+        LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
+
+        final LoggableComponent<ReportingTask> newReportingTask = new LoggableComponent<>(newNode.getReportingTask(), newNode.getBundleCoordinate(), terminationAwareLogger);
+        existingNode.setReportingTask(newReportingTask);
+        existingNode.setExtensionMissing(newNode.isExtensionMissing());
+
+        // need to refresh the properties in case we are changing from ghost component to real component
+        existingNode.refreshProperties();
+
+        logger.debug("Triggering async validation of {} due to reporting task reload", existingNode);
+        flowController.getValidationTrigger().triggerAsync(existingNode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
new file mode 100644
index 0000000..f100092
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.flow;
+
+import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.LocalPort;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ExtensionBuilder;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.FlowSnippet;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.StandardFlowSnippet;
+import org.apache.nifi.controller.StandardFunnel;
+import org.apache.nifi.controller.StandardProcessorNode;
+import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.label.StandardLabel;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.groups.StandardProcessGroup;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.logging.ProcessorLogObserver;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.variable.MutableVariableRegistry;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.StandardRemoteProcessGroup;
+import org.apache.nifi.remote.StandardRootGroupPort;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static java.util.Objects.requireNonNull;
+
+public class StandardFlowManager implements FlowManager {
+    private static final Logger logger = LoggerFactory.getLogger(StandardFlowManager.class);
+
+    private final NiFiProperties nifiProperties;
+    private final BulletinRepository bulletinRepository;
+    private final StandardProcessScheduler processScheduler;
+    private final Authorizer authorizer;
+    private final SSLContext sslContext;
+    private final FlowController flowController;
+    private final FlowFileEventRepository flowFileEventRepository;
+
+    private final boolean isSiteToSiteSecure;
+
+    private volatile ProcessGroup rootGroup;
+    private final ConcurrentMap<String, ProcessGroup> allProcessGroups = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ProcessorNode> allProcessors = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ReportingTaskNode> allReportingTasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ControllerServiceNode> rootControllerServices = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Connection> allConnections = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Port> allInputPorts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Port> allOutputPorts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Funnel> allFunnels = new ConcurrentHashMap<>();
+
+    public StandardFlowManager(final NiFiProperties nifiProperties, final SSLContext sslContext, final FlowController flowController, final FlowFileEventRepository flowFileEventRepository) {
+        this.nifiProperties = nifiProperties;
+        this.flowController = flowController;
+        this.bulletinRepository = flowController.getBulletinRepository();
+        this.processScheduler = flowController.getProcessScheduler();
+        this.authorizer = flowController.getAuthorizer();
+        this.sslContext = sslContext;
+        this.flowFileEventRepository = flowFileEventRepository;
+
+        this.isSiteToSiteSecure = Boolean.TRUE.equals(nifiProperties.isSiteToSiteSecure());
+    }
+
+    public Port createRemoteInputPort(String id, String name) {
+        id = requireNonNull(id).intern();
+        name = requireNonNull(name).intern();
+        verifyPortIdDoesNotExist(id);
+        return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
+            authorizer, bulletinRepository, processScheduler, isSiteToSiteSecure, nifiProperties);
+    }
+
+    public Port createRemoteOutputPort(String id, String name) {
+        id = requireNonNull(id).intern();
+        name = requireNonNull(name).intern();
+        verifyPortIdDoesNotExist(id);
+        return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
+            authorizer, bulletinRepository, processScheduler, isSiteToSiteSecure, nifiProperties);
+    }
+
+    public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) {
+        return new StandardRemoteProcessGroup(requireNonNull(id), uris, null, processScheduler, bulletinRepository, sslContext, nifiProperties);
+    }
+
+    public void setRootGroup(final ProcessGroup rootGroup) {
+        this.rootGroup = rootGroup;
+        allProcessGroups.put(ROOT_GROUP_ID_ALIAS, rootGroup);
+        allProcessGroups.put(rootGroup.getIdentifier(), rootGroup);
+    }
+
+    public ProcessGroup getRootGroup() {
+        return rootGroup;
+    }
+
+    @Override
+    public String getRootGroupId() {
+        return rootGroup.getIdentifier();
+    }
+
+    public boolean areGroupsSame(final String id1, final String id2) {
+        if (id1 == null || id2 == null) {
+            return false;
+        } else if (id1.equals(id2)) {
+            return true;
+        } else {
+            final String comparable1 = id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1;
+            final String comparable2 = id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2;
+            return comparable1.equals(comparable2);
+        }
+    }
+
+    private void verifyPortIdDoesNotExist(final String id) {
+        final ProcessGroup rootGroup = getRootGroup();
+        Port port = rootGroup.findOutputPort(id);
+        if (port != null) {
+            throw new IllegalStateException("An Input Port already exists with ID " + id);
+        }
+        port = rootGroup.findInputPort(id);
+        if (port != null) {
+            throw new IllegalStateException("An Input Port already exists with ID " + id);
+        }
+    }
+
+    public Label createLabel(final String id, final String text) {
+        return new StandardLabel(requireNonNull(id).intern(), text);
+    }
+
+    public Funnel createFunnel(final String id) {
+        return new StandardFunnel(id.intern(), null, processScheduler);
+    }
+
+    public Port createLocalInputPort(String id, String name) {
+        id = requireNonNull(id).intern();
+        name = requireNonNull(name).intern();
+        verifyPortIdDoesNotExist(id);
+        return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, processScheduler);
+    }
+
+    public Port createLocalOutputPort(String id, String name) {
+        id = requireNonNull(id).intern();
+        name = requireNonNull(name).intern();
+        verifyPortIdDoesNotExist(id);
+        return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, processScheduler);
+    }
+
+    public ProcessGroup createProcessGroup(final String id) {
+        final MutableVariableRegistry mutableVariableRegistry = new MutableVariableRegistry(flowController.getVariableRegistry());
+
+        final ProcessGroup group = new StandardProcessGroup(requireNonNull(id), flowController.getControllerServiceProvider(), processScheduler, nifiProperties, flowController.getEncryptor(),
+            flowController, mutableVariableRegistry);
+        allProcessGroups.put(group.getIdentifier(), group);
+
+        return group;
+    }
+
+    public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
+        requireNonNull(group);
+        requireNonNull(dto);
+
+        final FlowSnippet snippet = new StandardFlowSnippet(dto, flowController.getExtensionManager());
+        snippet.validate(group);
+        snippet.instantiate(this, group);
+
+        group.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
+    }
+
+    public FlowFilePrioritizer createPrioritizer(final String type) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+        FlowFilePrioritizer prioritizer;
+
+        final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            final List<Bundle> prioritizerBundles = flowController.getExtensionManager().getBundles(type);
+            if (prioritizerBundles.size() == 0) {
+                throw new IllegalStateException(String.format("The specified class '%s' is not known to this nifi.", type));
+            }
+            if (prioritizerBundles.size() > 1) {
+                throw new IllegalStateException(String.format("Multiple bundles found for the specified class '%s', only one is allowed.", type));
+            }
+
+            final Bundle bundle = prioritizerBundles.get(0);
+            final ClassLoader detectedClassLoaderForType = bundle.getClassLoader();
+            final Class<?> rawClass = Class.forName(type, true, detectedClassLoaderForType);
+
+            Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+            final Class<? extends FlowFilePrioritizer> prioritizerClass = rawClass.asSubclass(FlowFilePrioritizer.class);
+            final Object processorObj = prioritizerClass.newInstance();
+            prioritizer = prioritizerClass.cast(processorObj);
+
+            return prioritizer;
+        } finally {
+            if (ctxClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(ctxClassLoader);
+            }
+        }
+    }
+
+    public ProcessGroup getGroup(final String id) {
+        return allProcessGroups.get(requireNonNull(id));
+    }
+
+    public void onProcessGroupAdded(final ProcessGroup group) {
+        allProcessGroups.put(group.getIdentifier(), group);
+    }
+
+    public void onProcessGroupRemoved(final ProcessGroup group) {
+        allProcessGroups.remove(group.getIdentifier());
+    }
+
+    public ProcessorNode createProcessor(final String type, final String id, final BundleCoordinate coordinate) {
+        return createProcessor(type, id, coordinate, true);
+    }
+
+    public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final boolean firstTimeAdded) {
+        return createProcessor(type, id, coordinate, Collections.emptySet(), firstTimeAdded, true);
+    }
+
+    public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final Set<URL> additionalUrls,
+                                         final boolean firstTimeAdded, final boolean registerLogObserver) {
+
+        // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        final ExtensionManager extensionManager = flowController.getExtensionManager();
+
+        final ProcessorNode procNode = new ExtensionBuilder()
+            .identifier(id)
+            .type(type)
+            .bundleCoordinate(coordinate)
+            .extensionManager(extensionManager)
+            .controllerServiceProvider(flowController.getControllerServiceProvider())
+            .processScheduler(processScheduler)
+            .nodeTypeProvider(flowController)
+            .validationTrigger(flowController.getValidationTrigger())
+            .reloadComponent(flowController.getReloadComponent())
+            .variableRegistry(flowController.getVariableRegistry())
+            .addClasspathUrls(additionalUrls)
+            .kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+            .extensionManager(extensionManager)
+            .buildProcessor();
+
+        LogRepositoryFactory.getRepository(procNode.getIdentifier()).setLogger(procNode.getLogger());
+        if (registerLogObserver) {
+            logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, procNode.getBulletinLevel(), new ProcessorLogObserver(bulletinRepository, procNode));
+        }
+
+        if (firstTimeAdded) {
+            try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, procNode.getProcessor());
+            } catch (final Exception e) {
+                if (registerLogObserver) {
+                    logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
+                }
+                throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
+            }
+
+            if (firstTimeAdded) {
+                try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
+                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
+                }
+            }
+        }
+
+        return procNode;
+    }
+
+    public void onProcessorAdded(final ProcessorNode procNode) {
+        allProcessors.put(procNode.getIdentifier(), procNode);
+    }
+
+    public void onProcessorRemoved(final ProcessorNode procNode) {
+        String identifier = procNode.getIdentifier();
+        flowFileEventRepository.purgeTransferEvents(identifier);
+        allProcessors.remove(identifier);
+    }
+
+    public Connectable findConnectable(final String id) {
+        final ProcessorNode procNode = getProcessorNode(id);
+        if (procNode != null) {
+            return procNode;
+        }
+
+        final Port inPort = getInputPort(id);
+        if (inPort != null) {
+            return inPort;
+        }
+
+        final Port outPort = getOutputPort(id);
+        if (outPort != null) {
+            return outPort;
+        }
+
+        final Funnel funnel = getFunnel(id);
+        if (funnel != null) {
+            return funnel;
+        }
+
+        final RemoteGroupPort remoteGroupPort = getRootGroup().findRemoteGroupPort(id);
+        if (remoteGroupPort != null) {
+            return remoteGroupPort;
+        }
+
+        return null;
+    }
+
+    public ProcessorNode getProcessorNode(final String id) {
+        return allProcessors.get(id);
+    }
+
+    public void onConnectionAdded(final Connection connection) {
+        allConnections.put(connection.getIdentifier(), connection);
+
+        if (flowController.isInitialized()) {
+            connection.getFlowFileQueue().startLoadBalancing();
+        }
+    }
+
+    public void onConnectionRemoved(final Connection connection) {
+        String identifier = connection.getIdentifier();
+        flowFileEventRepository.purgeTransferEvents(identifier);
+        allConnections.remove(identifier);
+    }
+
+    public Connection getConnection(final String id) {
+        return allConnections.get(id);
+    }
+
+    public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) {
+        return flowController.createConnection(id, name, source, destination, relationshipNames);
+    }
+
+    public Set<Connection> findAllConnections() {
+        return new HashSet<>(allConnections.values());
+    }
+
+    public void onInputPortAdded(final Port inputPort) {
+        allInputPorts.put(inputPort.getIdentifier(), inputPort);
+    }
+
+    public void onInputPortRemoved(final Port inputPort) {
+        String identifier = inputPort.getIdentifier();
+        flowFileEventRepository.purgeTransferEvents(identifier);
+        allInputPorts.remove(identifier);
+    }
+
+    public Port getInputPort(final String id) {
+        return allInputPorts.get(id);
+    }
+
+    public void onOutputPortAdded(final Port outputPort) {
+        allOutputPorts.put(outputPort.getIdentifier(), outputPort);
+    }
+
+    public void onOutputPortRemoved(final Port outputPort) {
+        String identifier = outputPort.getIdentifier();
+        flowFileEventRepository.purgeTransferEvents(identifier);
+        allOutputPorts.remove(identifier);
+    }
+
+    public Port getOutputPort(final String id) {
+        return allOutputPorts.get(id);
+    }
+
+    public void onFunnelAdded(final Funnel funnel) {
+        allFunnels.put(funnel.getIdentifier(), funnel);
+    }
+
+    public void onFunnelRemoved(final Funnel funnel) {
+        String identifier = funnel.getIdentifier();
+        flowFileEventRepository.purgeTransferEvents(identifier);
+        allFunnels.remove(identifier);
+    }
+
+    public Funnel getFunnel(final String id) {
+        return allFunnels.get(id);
+    }
+
+    public ReportingTaskNode createReportingTask(final String type, final BundleCoordinate bundleCoordinate) {
+        return createReportingTask(type, bundleCoordinate, true);
+    }
+
+    public ReportingTaskNode createReportingTask(final String type, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) {
+        return createReportingTask(type, UUID.randomUUID().toString(), bundleCoordinate, firstTimeAdded);
+    }
+
+    @Override
+    public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) {
+        return createReportingTask(type, id, bundleCoordinate, Collections.emptySet(), firstTimeAdded, true);
+    }
+
+    public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls,
+                                                 final boolean firstTimeAdded, final boolean register) {
+        if (type == null || id == null || bundleCoordinate == null) {
+            throw new NullPointerException();
+        }
+
+        // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        final ExtensionManager extensionManager = flowController.getExtensionManager();
+
+        final ReportingTaskNode taskNode = new ExtensionBuilder()
+            .identifier(id)
+            .type(type)
+            .bundleCoordinate(bundleCoordinate)
+            .extensionManager(flowController.getExtensionManager())
+            .controllerServiceProvider(flowController.getControllerServiceProvider())
+            .processScheduler(processScheduler)
+            .nodeTypeProvider(flowController)
+            .validationTrigger(flowController.getValidationTrigger())
+            .reloadComponent(flowController.getReloadComponent())
+            .variableRegistry(flowController.getVariableRegistry())
+            .addClasspathUrls(additionalUrls)
+            .kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+            .flowController(flowController)
+            .extensionManager(extensionManager)
+            .buildReportingTask();
+
+        LogRepositoryFactory.getRepository(taskNode.getIdentifier()).setLogger(taskNode.getLogger());
+
+        if (firstTimeAdded) {
+            final Class<?> taskClass = taskNode.getReportingTask().getClass();
+            final String identifier = taskNode.getReportingTask().getIdentifier();
+
+            try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), taskClass, identifier)) {
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask());
+                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask());
+            } catch (final Exception e) {
+                throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + taskNode.getReportingTask(), e);
+            }
+        }
+
+        if (register) {
+            allReportingTasks.put(id, taskNode);
+
+            // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+            logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+                new ReportingTaskLogObserver(bulletinRepository, taskNode));
+        }
+
+        return taskNode;
+    }
+
+    public ReportingTaskNode getReportingTaskNode(final String taskId) {
+        return allReportingTasks.get(taskId);
+    }
+
+    @Override
+    public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
+        final ReportingTaskNode existing = allReportingTasks.get(reportingTaskNode.getIdentifier());
+        if (existing == null || existing != reportingTaskNode) {
+            throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
+        }
+
+        reportingTaskNode.verifyCanDelete();
+
+        final Class<?> taskClass = reportingTaskNode.getReportingTask().getClass();
+        try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), taskClass, reportingTaskNode.getReportingTask().getIdentifier())) {
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
+        }
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet()) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.getControllerServiceDefinition() != null) {
+                final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
+                if (value != null) {
+                    final ControllerServiceNode serviceNode = flowController.getControllerServiceProvider().getControllerServiceNode(value);
+                    if (serviceNode != null) {
+                        serviceNode.removeReference(reportingTaskNode);
+                    }
+                }
+            }
+        }
+
+        allReportingTasks.remove(reportingTaskNode.getIdentifier());
+        LogRepositoryFactory.removeRepository(reportingTaskNode.getIdentifier());
+        processScheduler.onReportingTaskRemoved(reportingTaskNode);
+
+        flowController.getExtensionManager().removeInstanceClassLoader(reportingTaskNode.getIdentifier());
+    }
+
+    @Override
+    public Set<ReportingTaskNode> getAllReportingTasks() {
+        return new HashSet<>(allReportingTasks.values());
+    }
+
+    public Set<ControllerServiceNode> getRootControllerServices() {
+        return new HashSet<>(rootControllerServices.values());
+    }
+
+    public void addRootControllerService(final ControllerServiceNode serviceNode) {
+        final ControllerServiceNode existing = rootControllerServices.putIfAbsent(serviceNode.getIdentifier(), serviceNode);
+        if (existing != null) {
+            throw new IllegalStateException("Controller Service with ID " + serviceNode.getIdentifier() + " already exists at the Controller level");
+        }
+    }
+
+    public ControllerServiceNode getRootControllerService(final String serviceIdentifier) {
+        return rootControllerServices.get(serviceIdentifier);
+    }
+
+    public void removeRootControllerService(final ControllerServiceNode service) {
+        final ControllerServiceNode existing = rootControllerServices.get(requireNonNull(service).getIdentifier());
+        if (existing == null) {
+            throw new IllegalStateException(service + " is not a member of this Process Group");
+        }
+
+        service.verifyCanDelete();
+
+        final ExtensionManager extensionManager = flowController.getExtensionManager();
+        final VariableRegistry variableRegistry = flowController.getVariableRegistry();
+
+        try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, service.getControllerServiceImplementation().getClass(), service.getIdentifier())) {
+            final ConfigurationContext configurationContext = new StandardConfigurationContext(service, flowController.getControllerServiceProvider(), null, variableRegistry);
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext);
+        }
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : service.getProperties().entrySet()) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.getControllerServiceDefinition() != null) {
+                final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
+                if (value != null) {
+                    final ControllerServiceNode referencedNode = getRootControllerService(value);
+                    if (referencedNode != null) {
+                        referencedNode.removeReference(service);
+                    }
+                }
+            }
+        }
+
+        rootControllerServices.remove(service.getIdentifier());
+        flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier());
+
+        extensionManager.removeInstanceClassLoader(service.getIdentifier());
+
+        logger.info("{} removed from Flow Controller", service, this);
+    }
+
+    public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded,
+                                                         final boolean registerLogObserver) {
+        // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        final ExtensionManager extensionManager = flowController.getExtensionManager();
+        final ControllerServiceProvider controllerServiceProvider = flowController.getControllerServiceProvider();
+
+        final ControllerServiceNode serviceNode = new ExtensionBuilder()
+            .identifier(id)
+            .type(type)
+            .bundleCoordinate(bundleCoordinate)
+            .controllerServiceProvider(flowController.getControllerServiceProvider())
+            .processScheduler(processScheduler)
+            .nodeTypeProvider(flowController)
+            .validationTrigger(flowController.getValidationTrigger())
+            .reloadComponent(flowController.getReloadComponent())
+            .variableRegistry(flowController.getVariableRegistry())
+            .addClasspathUrls(additionalUrls)
+            .kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+            .stateManagerProvider(flowController.getStateManagerProvider())
+            .extensionManager(extensionManager)
+            .buildControllerService();
+
+        LogRepositoryFactory.getRepository(serviceNode.getIdentifier()).setLogger(serviceNode.getLogger());
+        if (registerLogObserver) {
+            // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+            logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ControllerServiceLogObserver(bulletinRepository, serviceNode));
+        }
+
+        if (firstTimeAdded) {
+            final ControllerService service = serviceNode.getControllerServiceImplementation();
+
+            try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) {
+                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
+            }
+
+            final ControllerService serviceImpl = serviceNode.getControllerServiceImplementation();
+            try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, serviceImpl.getClass(), serviceImpl.getIdentifier())) {
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, serviceImpl);
+            } catch (final Exception e) {
+                throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + serviceImpl, e);
+            }
+        }
+
+        controllerServiceProvider.onControllerServiceAdded(serviceNode);
+
+        return serviceNode;
+    }
+
+    public Set<ControllerServiceNode> getAllControllerServices() {
+        final Set<ControllerServiceNode> allServiceNodes = new HashSet<>();
+        allServiceNodes.addAll(flowController.getControllerServiceProvider().getNonRootControllerServices());
+        allServiceNodes.addAll(rootControllerServices.values());
+        return allServiceNodes;
+    }
+
+    public ControllerServiceNode getControllerServiceNode(final String id) {
+        return flowController.getControllerServiceProvider().getControllerServiceNode(id);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/kerberos/KerberosConfig.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/kerberos/KerberosConfig.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/kerberos/KerberosConfig.java
new file mode 100644
index 0000000..8a6f939
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/kerberos/KerberosConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.kerberos;
+
+import java.io.File;
+
+public class KerberosConfig {
+    private final String principal;
+    private final File keytabLocation;
+    private final File configFile;
+
+    public KerberosConfig(final String principal, final File keytabLocation, final File kerberosConfigurationFile) {
+        this.principal = principal;
+        this.keytabLocation = keytabLocation;
+        this.configFile = kerberosConfigurationFile;
+    }
+
+    public String getPrincipal() {
+        return principal;
+    }
+
+    public File getKeytabLocation() {
+        return keytabLocation;
+    }
+
+    public File getConfigFile() {
+        return configFile;
+    }
+
+    public static final KerberosConfig NOT_CONFIGURED = new KerberosConfig(null, null, null);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index dc780db..5c1f8e9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -236,7 +236,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
             return;
         }
 
-        final Connection connection = flowController.getConnection(connectionId);
+        final Connection connection = flowController.getFlowManager().getConnection(connectionId);
         if (connection == null) {
             logger.error("Attempted to receive FlowFiles from Peer {} for Connection with ID {} but no connection exists with that ID", peerDescription, connectionId);
             throw new TransactionAbortedException("Attempted to receive FlowFiles from Peer " + peerDescription + " for Connection with ID " + connectionId + " but no Connection exists with that ID");

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index d95a220..66ff5f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -55,13 +55,13 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
     private final VariableRegistry variableRegistry;
 
     public StandardReportingContext(final FlowController flowController, final BulletinRepository bulletinRepository,
-                                    final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider, final ReportingTask reportingTask,
+                                    final Map<PropertyDescriptor, String> properties, final ReportingTask reportingTask,
                                     final VariableRegistry variableRegistry) {
         this.flowController = flowController;
-        this.eventAccess = flowController;
+        this.eventAccess = flowController.getEventAccess();
         this.bulletinRepository = bulletinRepository;
         this.properties = Collections.unmodifiableMap(properties);
-        this.serviceProvider = serviceProvider;
+        this.serviceProvider = flowController.getControllerServiceProvider();
         this.reportingTask = reportingTask;
         this.variableRegistry = variableRegistry;
         preparedQueries = new HashMap<>();
@@ -94,7 +94,7 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
 
     @Override
     public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) {
-        final Connectable connectable = flowController.findLocalConnectable(componentId);
+        final Connectable connectable = flowController.getFlowManager().findConnectable(componentId);
         if (connectable == null) {
             throw new IllegalStateException("Cannot create Component-Level Bulletin because no component can be found with ID " + componentId);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
index ebe774b..d66ed35 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
@@ -16,19 +16,19 @@
  */
 package org.apache.nifi.controller.reporting;
 
-import java.io.File;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
+
+import java.io.File;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 public class StandardReportingInitializationContext implements ReportingInitializationContext, ControllerServiceLookup {
 
@@ -38,21 +38,19 @@ public class StandardReportingInitializationContext implements ReportingInitiali
     private final SchedulingStrategy schedulingStrategy;
     private final ControllerServiceProvider serviceProvider;
     private final ComponentLog logger;
-    private final NiFiProperties nifiProperties;
+    private final KerberosConfig kerberosConfig;
     private final NodeTypeProvider nodeTypeProvider;
 
-    public StandardReportingInitializationContext(
-            final String id, final String name, final SchedulingStrategy schedulingStrategy,
-            final String schedulingPeriod, final ComponentLog logger,
-            final ControllerServiceProvider serviceProvider, final NiFiProperties nifiProperties,
-            final NodeTypeProvider nodeTypeProvider) {
+    public StandardReportingInitializationContext(final String id, final String name, final SchedulingStrategy schedulingStrategy, final String schedulingPeriod,
+                                                  final ComponentLog logger, final ControllerServiceProvider serviceProvider, final KerberosConfig kerberosConfig,
+                                                  final NodeTypeProvider nodeTypeProvider) {
         this.id = id;
         this.name = name;
         this.schedulingPeriod = schedulingPeriod;
         this.serviceProvider = serviceProvider;
         this.schedulingStrategy = schedulingStrategy;
         this.logger = logger;
-        this.nifiProperties = nifiProperties;
+        this.kerberosConfig = kerberosConfig;
         this.nodeTypeProvider = nodeTypeProvider;
     }
 
@@ -126,17 +124,17 @@ public class StandardReportingInitializationContext implements ReportingInitiali
 
     @Override
     public String getKerberosServicePrincipal() {
-        return nifiProperties.getKerberosServicePrincipal();
+        return kerberosConfig.getPrincipal();
     }
 
     @Override
     public File getKerberosServiceKeytab() {
-        return nifiProperties.getKerberosServiceKeytabLocation() == null ? null : new File(nifiProperties.getKerberosServiceKeytabLocation());
+        return kerberosConfig.getKeytabLocation();
     }
 
     @Override
     public File getKerberosConfigurationFile() {
-        return nifiProperties.getKerberosConfigurationFile();
+        return kerberosConfig.getConfigFile();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index 1cc5325..b63fffd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -40,18 +40,17 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
 
     public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
                                      final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
-                                     final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent,
-                                     final ExtensionManager extensionManager, final ValidationTrigger validationTrigger) {
-        super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, reloadComponent, extensionManager, validationTrigger);
+                                     final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ExtensionManager extensionManager,
+                                     final ValidationTrigger validationTrigger) {
+        super(reportingTask, id, controller.getControllerServiceProvider(), processScheduler, validationContextFactory, variableRegistry, reloadComponent, extensionManager, validationTrigger);
         this.flowController = controller;
     }
 
     public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
                                      final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
                                      final String componentType, final String canonicalClassName, final ComponentVariableRegistry variableRegistry,
-                                     final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger,
-                                     final boolean isExtensionMissing) {
-        super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,
+                                     final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger, final boolean isExtensionMissing) {
+        super(reportingTask, id, controller.getControllerServiceProvider(), processScheduler, validationContextFactory, componentType, canonicalClassName,
             variableRegistry, reloadComponent, extensionManager, validationTrigger, isExtensionMissing);
         this.flowController = controller;
     }
@@ -83,6 +82,6 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
 
     @Override
     public ReportingContext getReportingContext() {
-        return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), flowController, getReportingTask(), getVariableRegistry());
+        return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), getReportingTask(), getVariableRegistry());
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardQueueProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardQueueProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardQueueProvider.java
new file mode 100644
index 0000000..55e36c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardQueueProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class StandardQueueProvider implements QueueProvider {
+    private final FlowController flowController;
+
+    public StandardQueueProvider(final FlowController flowController) {
+        this.flowController = flowController;
+    }
+
+
+    @Override
+    public Collection<FlowFileQueue> getAllQueues() {
+        final Collection<Connection> connections = flowController.getFlowManager().findAllConnections();
+        final List<FlowFileQueue> queues = new ArrayList<>(connections.size());
+        for (final Connection connection : connections) {
+            queues.add(connection.getFlowFileQueue());
+        }
+
+        return queues;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 4e396fd..6313097 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -77,6 +77,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
     private final long administrativeYieldMillis;
     private final String administrativeYieldDuration;
     private final StateManagerProvider stateManagerProvider;
+    private final long processorStartTimeoutMillis;
 
     private final ConcurrentMap<Object, LifecycleState> lifecycleStates = new ConcurrentHashMap<>();
     private final ScheduledExecutorService frameworkTaskExecutor;
@@ -91,7 +92,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
     public StandardProcessScheduler(final FlowEngine componentLifecycleThreadPool, final FlowController flowController, final StringEncryptor encryptor,
         final StateManagerProvider stateManagerProvider, final NiFiProperties nifiProperties) {
         this.componentLifeCycleThreadPool = componentLifecycleThreadPool;
-        this.controllerServiceProvider = flowController;
+        this.controllerServiceProvider = flowController.getControllerServiceProvider();
         this.flowController = flowController;
         this.encryptor = encryptor;
         this.stateManagerProvider = stateManagerProvider;
@@ -99,6 +100,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         administrativeYieldDuration = nifiProperties.getAdministrativeYieldDuration();
         administrativeYieldMillis = FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS);
 
+        final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
+        processorStartTimeoutMillis = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
+
         frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread");
     }
 
@@ -283,10 +287,10 @@ public final class StandardProcessScheduler implements ProcessScheduler {
 
     /**
      * Starts the given {@link Processor} by invoking its
-     * {@link ProcessorNode#start(ScheduledExecutorService, long, ProcessContext, SchedulingAgentCallback, boolean)}
+     * {@link ProcessorNode#start(ScheduledExecutorService, long, long, ProcessContext, SchedulingAgentCallback, boolean)}
      * method.
      *
-     * @see StandardProcessorNode#start(ScheduledExecutorService, long, ProcessContext, SchedulingAgentCallback, boolean)
+     * @see StandardProcessorNode#start(ScheduledExecutorService, long, long, ProcessContext, SchedulingAgentCallback, boolean)
      */
     @Override
     public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode procNode, final boolean failIfStopping) {
@@ -317,7 +321,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         };
 
         LOG.info("Starting {}", procNode);
-        procNode.start(this.componentMonitoringThreadPool, this.administrativeYieldMillis, processContext, callback, failIfStopping);
+        procNode.start(componentMonitoringThreadPool, administrativeYieldMillis, processorStartTimeoutMillis, processContext, callback, failIfStopping);
         return future;
     }
 
@@ -359,7 +363,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         getSchedulingAgent(procNode).incrementMaxThreadCount(tasksTerminated);
 
         try {
-            flowController.reload(procNode, procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(), Collections.emptySet());
+            flowController.getReloadComponent().reload(procNode, procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(), Collections.emptySet());
         } catch (final ProcessorInstantiationException e) {
             // This shouldn't happen because we already have been able to instantiate the processor before
             LOG.error("Failed to replace instance of Processor for {} when terminating Processor", procNode);

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index 597c8fb..87b6540 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -106,12 +106,12 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
             rootNode.appendChild(registriesElement);
 
             addFlowRegistries(registriesElement, controller.getFlowRegistryClient());
-            addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup", scheduledStateLookup);
+            addProcessGroup(rootNode, controller.getFlowManager().getRootGroup(), "rootGroup", scheduledStateLookup);
 
             // Add root-level controller services
             final Element controllerServicesNode = doc.createElement("controllerServices");
             rootNode.appendChild(controllerServicesNode);
-            for (final ControllerServiceNode serviceNode : controller.getRootControllerServices()) {
+            for (final ControllerServiceNode serviceNode : controller.getFlowManager().getRootControllerServices()) {
                 addControllerService(controllerServicesNode, serviceNode);
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index e82c436..f226bb6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -113,7 +113,7 @@ public class ControllerServiceLoader {
         for (final Element serviceElement : serviceElements) {
             final ControllerServiceNode serviceNode = createControllerService(controller, serviceElement, encryptor);
             if (parentGroup == null) {
-                controller.addRootControllerService(serviceNode);
+                controller.getFlowManager().addRootControllerService(serviceNode);
             } else {
                 parentGroup.addControllerService(serviceNode);
             }
@@ -162,19 +162,20 @@ public class ControllerServiceLoader {
         // Start services
         if (autoResumeState) {
             logger.debug("Enabling Controller Services {}", nodesToEnable);
-            nodesToEnable.stream().forEach(ControllerServiceNode::performValidation); // validate services before attempting to enable them
-            controller.enableControllerServices(nodesToEnable);
+            nodesToEnable.forEach(ControllerServiceNode::performValidation); // validate services before attempting to enable them
+
+            controller.getControllerServiceProvider().enableControllerServices(nodesToEnable);
         } else {
             logger.debug("Will not enable the following Controller Services because 'auto-resume state' flag is false: {}", nodesToEnable);
         }
     }
 
-    public static ControllerServiceNode cloneControllerService(final ControllerServiceProvider provider, final ControllerServiceNode controllerService) {
+    public static ControllerServiceNode cloneControllerService(final FlowController flowController, final ControllerServiceNode controllerService) {
         // create a new id for the clone seeded from the original id so that it is consistent in a cluster
         final UUID id = UUID.nameUUIDFromBytes(controllerService.getIdentifier().getBytes(StandardCharsets.UTF_8));
 
-        final ControllerServiceNode clone = provider.createControllerService(controllerService.getCanonicalClassName(), id.toString(),
-                controllerService.getBundleCoordinate(), Collections.emptySet(), false);
+        final ControllerServiceNode clone = flowController.getFlowManager().createControllerService(controllerService.getCanonicalClassName(), id.toString(),
+                controllerService.getBundleCoordinate(), Collections.emptySet(), false, true);
         clone.setName(controllerService.getName());
         clone.setComments(controllerService.getComments());
 
@@ -189,12 +190,12 @@ public class ControllerServiceLoader {
         return clone;
     }
 
-    private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) {
+    private static ControllerServiceNode createControllerService(final FlowController flowController, final Element controllerServiceElement, final StringEncryptor encryptor) {
         final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
 
         BundleCoordinate coordinate;
         try {
-            coordinate = BundleUtils.getCompatibleBundle(provider.getExtensionManager(), dto.getType(), dto.getBundle());
+            coordinate = BundleUtils.getCompatibleBundle(flowController.getExtensionManager(), dto.getType(), dto.getBundle());
         } catch (final IllegalStateException e) {
             final BundleDTO bundleDTO = dto.getBundle();
             if (bundleDTO == null) {
@@ -204,7 +205,7 @@ public class ControllerServiceLoader {
             }
         }
 
-        final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), coordinate, Collections.emptySet(), false);
+        final ControllerServiceNode node = flowController.getFlowManager().createControllerService(dto.getType(), dto.getId(), coordinate, Collections.emptySet(), false, true);
         node.setName(dto.getName());
         node.setComments(dto.getComments());
         node.setVersionedComponentId(dto.getVersionedComponentId());

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/GhostControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/GhostControllerService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/GhostControllerService.java
new file mode 100644
index 0000000..ff0086b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/GhostControllerService.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class GhostControllerService implements ControllerService {
+
+    private final String identifier;
+    private final String canonicalClassName;
+
+    public GhostControllerService(final String identifier, final String canonicalClassName) {
+        this.identifier = identifier;
+        this.canonicalClassName = canonicalClassName;
+    }
+
+    @Override
+    public void initialize(final ControllerServiceInitializationContext context) throws InitializationException {
+    }
+
+    @Override
+    public Collection<ValidationResult> validate(final ValidationContext context) {
+        return Collections.singleton(new ValidationResult.Builder()
+            .input("Any Property")
+            .subject("Missing Controller Service")
+            .valid(false)
+            .explanation("Controller Service is of type " + canonicalClassName + ", but this is not a valid Reporting Task type")
+            .build());
+    }
+
+    @Override
+    public PropertyDescriptor getPropertyDescriptor(final String propertyName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyName)
+            .description(propertyName)
+            .required(true)
+            .sensitive(true)
+            .build();
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+    }
+
+    @Override
+    public List<PropertyDescriptor> getPropertyDescriptors() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public String toString() {
+        return "GhostControllerService[id=" + identifier + ", type=" + canonicalClassName + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
index f169662..4d2bbee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
@@ -16,15 +16,15 @@
  */
 package org.apache.nifi.controller.service;
 
-import java.io.File;
-import java.util.Set;
-
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.NiFiProperties;
+
+import java.io.File;
+import java.util.Set;
 
 public class StandardControllerServiceInitializationContext implements ControllerServiceInitializationContext, ControllerServiceLookup {
 
@@ -32,17 +32,17 @@ public class StandardControllerServiceInitializationContext implements Controlle
     private final ControllerServiceProvider serviceProvider;
     private final ComponentLog logger;
     private final StateManager stateManager;
-    private final NiFiProperties nifiProperties;
+    private final KerberosConfig kerberosConfig;
 
     public StandardControllerServiceInitializationContext(
             final String identifier, final ComponentLog logger,
             final ControllerServiceProvider serviceProvider, final StateManager stateManager,
-            final NiFiProperties nifiProperties) {
+            final KerberosConfig kerberosConfig) {
         this.id = identifier;
         this.logger = logger;
         this.serviceProvider = serviceProvider;
         this.stateManager = stateManager;
-        this.nifiProperties = nifiProperties;
+        this.kerberosConfig = kerberosConfig;
     }
 
     @Override
@@ -97,16 +97,16 @@ public class StandardControllerServiceInitializationContext implements Controlle
 
     @Override
     public String getKerberosServicePrincipal() {
-        return nifiProperties.getKerberosServicePrincipal();
+        return kerberosConfig.getPrincipal();
     }
 
     @Override
     public File getKerberosServiceKeytab() {
-        return nifiProperties.getKerberosServiceKeytabLocation() == null ? null : new File(nifiProperties.getKerberosServiceKeytabLocation());
+        return kerberosConfig.getKeytabLocation();
     }
 
     @Override
     public File getKerberosConfigurationFile() {
-        return nifiProperties.getKerberosConfigurationFile();
+        return kerberosConfig.getConfigFile();
     }
 }