You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/11/06 16:24:07 UTC
[6/9] nifi git commit: NIFI-5769: Refactored FlowController to use
Composition over Inheritance - Ensure that when root group is set,
that we register its ID in FlowManager
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
new file mode 100644
index 0000000..6a579e9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.processor.StandardProcessContext;
+import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Set;
+
+public class StandardReloadComponent implements ReloadComponent {
+ private static final Logger logger = LoggerFactory.getLogger(StandardReloadComponent.class);
+
+ private final FlowController flowController;
+
+ public StandardReloadComponent(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+
+ @Override
+ public void reload(final ProcessorNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
+ throws ProcessorInstantiationException {
+ if (existingNode == null) {
+ throw new IllegalStateException("Existing ProcessorNode cannot be null");
+ }
+
+ final String id = existingNode.getProcessor().getIdentifier();
+
+ // ghost components will have a null logger
+ if (existingNode.getLogger() != null) {
+ existingNode.getLogger().debug("Reloading component {} to type {} from bundle {}", new Object[]{id, newType, bundleCoordinate});
+ }
+
+ final ExtensionManager extensionManager = flowController.getExtensionManager();
+
+ // createProcessor will create a new instance class loader for the same id so
+ // save the instance class loader to use it for calling OnRemoved on the existing processor
+ final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
+
+ // create a new node with firstTimeAdded as true so lifecycle methods get fired
+ // attempt the creation to make sure it works before firing the OnRemoved methods below
+ final ProcessorNode newNode = flowController.getFlowManager().createProcessor(newType, id, bundleCoordinate, additionalUrls, true, false);
+
+ // call OnRemoved for the existing processor using the previous instance class loader
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+ final StateManager stateManager = flowController.getStateManagerProvider().getStateManager(id);
+ final StandardProcessContext processContext = new StandardProcessContext(existingNode, flowController.getControllerServiceProvider(),
+ flowController.getEncryptor(), stateManager, () -> false);
+
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
+ } finally {
+ extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
+ }
+
+ // set the new processor in the existing node
+ final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getProcessor());
+ final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
+ LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
+
+ final LoggableComponent<Processor> newProcessor = new LoggableComponent<>(newNode.getProcessor(), newNode.getBundleCoordinate(), terminationAwareLogger);
+ existingNode.setProcessor(newProcessor);
+ existingNode.setExtensionMissing(newNode.isExtensionMissing());
+
+ // need to refresh the properties in case we are changing from ghost component to real component
+ existingNode.refreshProperties();
+
+ logger.debug("Triggering async validation of {} due to processor reload", existingNode);
+ flowController.getValidationTrigger().trigger(existingNode);
+ }
+
+ @Override
+ public void reload(final ControllerServiceNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
+ throws ControllerServiceInstantiationException {
+ if (existingNode == null) {
+ throw new IllegalStateException("Existing ControllerServiceNode cannot be null");
+ }
+
+ final String id = existingNode.getIdentifier();
+
+ // ghost components will have a null logger
+ if (existingNode.getLogger() != null) {
+ existingNode.getLogger().debug("Reloading component {} to type {} from bundle {}", new Object[]{id, newType, bundleCoordinate});
+ }
+
+ final ExtensionManager extensionManager = flowController.getExtensionManager();
+
+ // createControllerService will create a new instance class loader for the same id so
+ // save the instance class loader to use it for calling OnRemoved on the existing service
+ final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
+
+ // create a new node with firstTimeAdded as true so lifecycle methods get called
+ // attempt the creation to make sure it works before firing the OnRemoved methods below
+ final ControllerServiceNode newNode = flowController.getFlowManager().createControllerService(newType, id, bundleCoordinate, additionalUrls, true, false);
+
+ // call OnRemoved for the existing service using the previous instance class loader
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+ final ConfigurationContext configurationContext = new StandardConfigurationContext(existingNode, flowController.getControllerServiceProvider(),
+ null, flowController.getVariableRegistry());
+
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getControllerServiceImplementation(), configurationContext);
+ } finally {
+ extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
+ }
+
+ // take the invocation handler that was created for new proxy and is set to look at the new node,
+ // and set it to look at the existing node
+ final ControllerServiceInvocationHandler invocationHandler = newNode.getInvocationHandler();
+ invocationHandler.setServiceNode(existingNode);
+
+ // create LoggableComponents for the proxy and implementation
+ final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getControllerServiceImplementation());
+ final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
+ LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
+
+ final LoggableComponent<ControllerService> loggableProxy = new LoggableComponent<>(newNode.getProxiedControllerService(), bundleCoordinate, terminationAwareLogger);
+ final LoggableComponent<ControllerService> loggableImplementation = new LoggableComponent<>(newNode.getControllerServiceImplementation(), bundleCoordinate, terminationAwareLogger);
+
+ // set the new impl, proxy, and invocation handler into the existing node
+ existingNode.setControllerServiceAndProxy(loggableImplementation, loggableProxy, invocationHandler);
+ existingNode.setExtensionMissing(newNode.isExtensionMissing());
+
+ // need to refresh the properties in case we are changing from ghost component to real component
+ existingNode.refreshProperties();
+
+ logger.debug("Triggering async validation of {} due to controller service reload", existingNode);
+ flowController.getValidationTrigger().triggerAsync(existingNode);
+ }
+
+ @Override
+ public void reload(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
+ throws ReportingTaskInstantiationException {
+ if (existingNode == null) {
+ throw new IllegalStateException("Existing ReportingTaskNode cannot be null");
+ }
+
+ final String id = existingNode.getReportingTask().getIdentifier();
+
+ // ghost components will have a null logger
+ if (existingNode.getLogger() != null) {
+ existingNode.getLogger().debug("Reloading component {} to type {} from bundle {}", new Object[]{id, newType, bundleCoordinate});
+ }
+
+ final ExtensionManager extensionManager = flowController.getExtensionManager();
+
+ // createReportingTask will create a new instance class loader for the same id so
+ // save the instance class loader to use it for calling OnRemoved on the existing processor
+ final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
+
+ // set firstTimeAdded to true so lifecycle annotations get fired, but don't register this node
+ // attempt the creation to make sure it works before firing the OnRemoved methods below
+ final ReportingTaskNode newNode = flowController.getFlowManager().createReportingTask(newType, id, bundleCoordinate, additionalUrls, true, false);
+
+ // call OnRemoved for the existing reporting task using the previous instance class loader
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getReportingTask(), existingNode.getConfigurationContext());
+ } finally {
+ extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
+ }
+
+ // set the new reporting task into the existing node
+ final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getReportingTask());
+ final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
+ LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
+
+ final LoggableComponent<ReportingTask> newReportingTask = new LoggableComponent<>(newNode.getReportingTask(), newNode.getBundleCoordinate(), terminationAwareLogger);
+ existingNode.setReportingTask(newReportingTask);
+ existingNode.setExtensionMissing(newNode.isExtensionMissing());
+
+ // need to refresh the properties in case we are changing from ghost component to real component
+ existingNode.refreshProperties();
+
+ logger.debug("Triggering async validation of {} due to reporting task reload", existingNode);
+ flowController.getValidationTrigger().triggerAsync(existingNode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
new file mode 100644
index 0000000..f100092
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.flow;
+
+import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.LocalPort;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ExtensionBuilder;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.FlowSnippet;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.StandardFlowSnippet;
+import org.apache.nifi.controller.StandardFunnel;
+import org.apache.nifi.controller.StandardProcessorNode;
+import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.label.StandardLabel;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.groups.StandardProcessGroup;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.logging.ProcessorLogObserver;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.variable.MutableVariableRegistry;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.StandardRemoteProcessGroup;
+import org.apache.nifi.remote.StandardRootGroupPort;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static java.util.Objects.requireNonNull;
+
+public class StandardFlowManager implements FlowManager {
+ private static final Logger logger = LoggerFactory.getLogger(StandardFlowManager.class);
+
+ private final NiFiProperties nifiProperties;
+ private final BulletinRepository bulletinRepository;
+ private final StandardProcessScheduler processScheduler;
+ private final Authorizer authorizer;
+ private final SSLContext sslContext;
+ private final FlowController flowController;
+ private final FlowFileEventRepository flowFileEventRepository;
+
+ private final boolean isSiteToSiteSecure;
+
+ private volatile ProcessGroup rootGroup;
+ private final ConcurrentMap<String, ProcessGroup> allProcessGroups = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, ProcessorNode> allProcessors = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, ReportingTaskNode> allReportingTasks = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, ControllerServiceNode> rootControllerServices = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Connection> allConnections = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Port> allInputPorts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Port> allOutputPorts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Funnel> allFunnels = new ConcurrentHashMap<>();
+
+ public StandardFlowManager(final NiFiProperties nifiProperties, final SSLContext sslContext, final FlowController flowController, final FlowFileEventRepository flowFileEventRepository) {
+ this.nifiProperties = nifiProperties;
+ this.flowController = flowController;
+ this.bulletinRepository = flowController.getBulletinRepository();
+ this.processScheduler = flowController.getProcessScheduler();
+ this.authorizer = flowController.getAuthorizer();
+ this.sslContext = sslContext;
+ this.flowFileEventRepository = flowFileEventRepository;
+
+ this.isSiteToSiteSecure = Boolean.TRUE.equals(nifiProperties.isSiteToSiteSecure());
+ }
+
+ public Port createRemoteInputPort(String id, String name) {
+ id = requireNonNull(id).intern();
+ name = requireNonNull(name).intern();
+ verifyPortIdDoesNotExist(id);
+ return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
+ authorizer, bulletinRepository, processScheduler, isSiteToSiteSecure, nifiProperties);
+ }
+
+ public Port createRemoteOutputPort(String id, String name) {
+ id = requireNonNull(id).intern();
+ name = requireNonNull(name).intern();
+ verifyPortIdDoesNotExist(id);
+ return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
+ authorizer, bulletinRepository, processScheduler, isSiteToSiteSecure, nifiProperties);
+ }
+
+ public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) {
+ return new StandardRemoteProcessGroup(requireNonNull(id), uris, null, processScheduler, bulletinRepository, sslContext, nifiProperties);
+ }
+
+ public void setRootGroup(final ProcessGroup rootGroup) {
+ this.rootGroup = rootGroup;
+ allProcessGroups.put(ROOT_GROUP_ID_ALIAS, rootGroup);
+ allProcessGroups.put(rootGroup.getIdentifier(), rootGroup);
+ }
+
+ public ProcessGroup getRootGroup() {
+ return rootGroup;
+ }
+
+ @Override
+ public String getRootGroupId() {
+ return rootGroup.getIdentifier();
+ }
+
+ public boolean areGroupsSame(final String id1, final String id2) {
+ if (id1 == null || id2 == null) {
+ return false;
+ } else if (id1.equals(id2)) {
+ return true;
+ } else {
+ final String comparable1 = id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1;
+ final String comparable2 = id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2;
+ return comparable1.equals(comparable2);
+ }
+ }
+
+ private void verifyPortIdDoesNotExist(final String id) {
+ final ProcessGroup rootGroup = getRootGroup();
+ Port port = rootGroup.findOutputPort(id);
+ if (port != null) {
+ throw new IllegalStateException("An Input Port already exists with ID " + id);
+ }
+ port = rootGroup.findInputPort(id);
+ if (port != null) {
+ throw new IllegalStateException("An Input Port already exists with ID " + id);
+ }
+ }
+
+ public Label createLabel(final String id, final String text) {
+ return new StandardLabel(requireNonNull(id).intern(), text);
+ }
+
+ public Funnel createFunnel(final String id) {
+ return new StandardFunnel(id.intern(), null, processScheduler);
+ }
+
+ public Port createLocalInputPort(String id, String name) {
+ id = requireNonNull(id).intern();
+ name = requireNonNull(name).intern();
+ verifyPortIdDoesNotExist(id);
+ return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, processScheduler);
+ }
+
+ public Port createLocalOutputPort(String id, String name) {
+ id = requireNonNull(id).intern();
+ name = requireNonNull(name).intern();
+ verifyPortIdDoesNotExist(id);
+ return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, processScheduler);
+ }
+
+ public ProcessGroup createProcessGroup(final String id) {
+ final MutableVariableRegistry mutableVariableRegistry = new MutableVariableRegistry(flowController.getVariableRegistry());
+
+ final ProcessGroup group = new StandardProcessGroup(requireNonNull(id), flowController.getControllerServiceProvider(), processScheduler, nifiProperties, flowController.getEncryptor(),
+ flowController, mutableVariableRegistry);
+ allProcessGroups.put(group.getIdentifier(), group);
+
+ return group;
+ }
+
+ public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
+ requireNonNull(group);
+ requireNonNull(dto);
+
+ final FlowSnippet snippet = new StandardFlowSnippet(dto, flowController.getExtensionManager());
+ snippet.validate(group);
+ snippet.instantiate(this, group);
+
+ group.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
+ }
+
+ public FlowFilePrioritizer createPrioritizer(final String type) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+ FlowFilePrioritizer prioritizer;
+
+ final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ final List<Bundle> prioritizerBundles = flowController.getExtensionManager().getBundles(type);
+ if (prioritizerBundles.size() == 0) {
+ throw new IllegalStateException(String.format("The specified class '%s' is not known to this nifi.", type));
+ }
+ if (prioritizerBundles.size() > 1) {
+ throw new IllegalStateException(String.format("Multiple bundles found for the specified class '%s', only one is allowed.", type));
+ }
+
+ final Bundle bundle = prioritizerBundles.get(0);
+ final ClassLoader detectedClassLoaderForType = bundle.getClassLoader();
+ final Class<?> rawClass = Class.forName(type, true, detectedClassLoaderForType);
+
+ Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+ final Class<? extends FlowFilePrioritizer> prioritizerClass = rawClass.asSubclass(FlowFilePrioritizer.class);
+ final Object processorObj = prioritizerClass.newInstance();
+ prioritizer = prioritizerClass.cast(processorObj);
+
+ return prioritizer;
+ } finally {
+ if (ctxClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(ctxClassLoader);
+ }
+ }
+ }
+
+ public ProcessGroup getGroup(final String id) {
+ return allProcessGroups.get(requireNonNull(id));
+ }
+
+ public void onProcessGroupAdded(final ProcessGroup group) {
+ allProcessGroups.put(group.getIdentifier(), group);
+ }
+
+ public void onProcessGroupRemoved(final ProcessGroup group) {
+ allProcessGroups.remove(group.getIdentifier());
+ }
+
+ public ProcessorNode createProcessor(final String type, final String id, final BundleCoordinate coordinate) {
+ return createProcessor(type, id, coordinate, true);
+ }
+
+ public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final boolean firstTimeAdded) {
+ return createProcessor(type, id, coordinate, Collections.emptySet(), firstTimeAdded, true);
+ }
+
+ public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final Set<URL> additionalUrls,
+ final boolean firstTimeAdded, final boolean registerLogObserver) {
+
+ // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+ final ExtensionManager extensionManager = flowController.getExtensionManager();
+
+ final ProcessorNode procNode = new ExtensionBuilder()
+ .identifier(id)
+ .type(type)
+ .bundleCoordinate(coordinate)
+ .extensionManager(extensionManager)
+ .controllerServiceProvider(flowController.getControllerServiceProvider())
+ .processScheduler(processScheduler)
+ .nodeTypeProvider(flowController)
+ .validationTrigger(flowController.getValidationTrigger())
+ .reloadComponent(flowController.getReloadComponent())
+ .variableRegistry(flowController.getVariableRegistry())
+ .addClasspathUrls(additionalUrls)
+ .kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+ .extensionManager(extensionManager)
+ .buildProcessor();
+
+ LogRepositoryFactory.getRepository(procNode.getIdentifier()).setLogger(procNode.getLogger());
+ if (registerLogObserver) {
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, procNode.getBulletinLevel(), new ProcessorLogObserver(bulletinRepository, procNode));
+ }
+
+ if (firstTimeAdded) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, procNode.getProcessor());
+ } catch (final Exception e) {
+ if (registerLogObserver) {
+ logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
+ }
+ throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
+ }
+
+ if (firstTimeAdded) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
+ }
+ }
+ }
+
+ return procNode;
+ }
+
+ public void onProcessorAdded(final ProcessorNode procNode) {
+ allProcessors.put(procNode.getIdentifier(), procNode);
+ }
+
+ public void onProcessorRemoved(final ProcessorNode procNode) {
+ String identifier = procNode.getIdentifier();
+ flowFileEventRepository.purgeTransferEvents(identifier);
+ allProcessors.remove(identifier);
+ }
+
+ public Connectable findConnectable(final String id) {
+ final ProcessorNode procNode = getProcessorNode(id);
+ if (procNode != null) {
+ return procNode;
+ }
+
+ final Port inPort = getInputPort(id);
+ if (inPort != null) {
+ return inPort;
+ }
+
+ final Port outPort = getOutputPort(id);
+ if (outPort != null) {
+ return outPort;
+ }
+
+ final Funnel funnel = getFunnel(id);
+ if (funnel != null) {
+ return funnel;
+ }
+
+ final RemoteGroupPort remoteGroupPort = getRootGroup().findRemoteGroupPort(id);
+ if (remoteGroupPort != null) {
+ return remoteGroupPort;
+ }
+
+ return null;
+ }
+
+ public ProcessorNode getProcessorNode(final String id) {
+ return allProcessors.get(id);
+ }
+
+ public void onConnectionAdded(final Connection connection) {
+ allConnections.put(connection.getIdentifier(), connection);
+
+ if (flowController.isInitialized()) {
+ connection.getFlowFileQueue().startLoadBalancing();
+ }
+ }
+
+ public void onConnectionRemoved(final Connection connection) {
+ String identifier = connection.getIdentifier();
+ flowFileEventRepository.purgeTransferEvents(identifier);
+ allConnections.remove(identifier);
+ }
+
+ public Connection getConnection(final String id) {
+ return allConnections.get(id);
+ }
+
+ public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) {
+ return flowController.createConnection(id, name, source, destination, relationshipNames);
+ }
+
+ public Set<Connection> findAllConnections() {
+ return new HashSet<>(allConnections.values());
+ }
+
+ public void onInputPortAdded(final Port inputPort) {
+ allInputPorts.put(inputPort.getIdentifier(), inputPort);
+ }
+
+ public void onInputPortRemoved(final Port inputPort) {
+ String identifier = inputPort.getIdentifier();
+ flowFileEventRepository.purgeTransferEvents(identifier);
+ allInputPorts.remove(identifier);
+ }
+
+ public Port getInputPort(final String id) {
+ return allInputPorts.get(id);
+ }
+
+ public void onOutputPortAdded(final Port outputPort) {
+ allOutputPorts.put(outputPort.getIdentifier(), outputPort);
+ }
+
+ public void onOutputPortRemoved(final Port outputPort) {
+ String identifier = outputPort.getIdentifier();
+ flowFileEventRepository.purgeTransferEvents(identifier);
+ allOutputPorts.remove(identifier);
+ }
+
+ public Port getOutputPort(final String id) {
+ return allOutputPorts.get(id);
+ }
+
+ public void onFunnelAdded(final Funnel funnel) {
+ allFunnels.put(funnel.getIdentifier(), funnel);
+ }
+
+ public void onFunnelRemoved(final Funnel funnel) {
+ String identifier = funnel.getIdentifier();
+ flowFileEventRepository.purgeTransferEvents(identifier);
+ allFunnels.remove(identifier);
+ }
+
+ public Funnel getFunnel(final String id) {
+ return allFunnels.get(id);
+ }
+
+ public ReportingTaskNode createReportingTask(final String type, final BundleCoordinate bundleCoordinate) {
+ return createReportingTask(type, bundleCoordinate, true);
+ }
+
+ public ReportingTaskNode createReportingTask(final String type, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) {
+ return createReportingTask(type, UUID.randomUUID().toString(), bundleCoordinate, firstTimeAdded);
+ }
+
+ @Override
+ public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) {
+ return createReportingTask(type, id, bundleCoordinate, Collections.emptySet(), firstTimeAdded, true);
+ }
+
+ public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls,
+ final boolean firstTimeAdded, final boolean register) {
+ if (type == null || id == null || bundleCoordinate == null) {
+ throw new NullPointerException();
+ }
+
+ // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+ final ExtensionManager extensionManager = flowController.getExtensionManager();
+
+ final ReportingTaskNode taskNode = new ExtensionBuilder()
+ .identifier(id)
+ .type(type)
+ .bundleCoordinate(bundleCoordinate)
+ .extensionManager(flowController.getExtensionManager())
+ .controllerServiceProvider(flowController.getControllerServiceProvider())
+ .processScheduler(processScheduler)
+ .nodeTypeProvider(flowController)
+ .validationTrigger(flowController.getValidationTrigger())
+ .reloadComponent(flowController.getReloadComponent())
+ .variableRegistry(flowController.getVariableRegistry())
+ .addClasspathUrls(additionalUrls)
+ .kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+ .flowController(flowController)
+ .extensionManager(extensionManager)
+ .buildReportingTask();
+
+ LogRepositoryFactory.getRepository(taskNode.getIdentifier()).setLogger(taskNode.getLogger());
+
+ if (firstTimeAdded) {
+ final Class<?> taskClass = taskNode.getReportingTask().getClass();
+ final String identifier = taskNode.getReportingTask().getIdentifier();
+
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), taskClass, identifier)) {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask());
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask());
+ } catch (final Exception e) {
+ throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + taskNode.getReportingTask(), e);
+ }
+ }
+
+ if (register) {
+ allReportingTasks.put(id, taskNode);
+
+ // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+ new ReportingTaskLogObserver(bulletinRepository, taskNode));
+ }
+
+ return taskNode;
+ }
+
+ public ReportingTaskNode getReportingTaskNode(final String taskId) {
+ return allReportingTasks.get(taskId);
+ }
+
+ @Override
+ public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
+ final ReportingTaskNode existing = allReportingTasks.get(reportingTaskNode.getIdentifier());
+ if (existing == null || existing != reportingTaskNode) {
+ throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
+ }
+
+ reportingTaskNode.verifyCanDelete();
+
+ final Class<?> taskClass = reportingTaskNode.getReportingTask().getClass();
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), taskClass, reportingTaskNode.getReportingTask().getIdentifier())) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
+ }
+
+ for (final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet()) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.getControllerServiceDefinition() != null) {
+ final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
+ if (value != null) {
+ final ControllerServiceNode serviceNode = flowController.getControllerServiceProvider().getControllerServiceNode(value);
+ if (serviceNode != null) {
+ serviceNode.removeReference(reportingTaskNode);
+ }
+ }
+ }
+ }
+
+ allReportingTasks.remove(reportingTaskNode.getIdentifier());
+ LogRepositoryFactory.removeRepository(reportingTaskNode.getIdentifier());
+ processScheduler.onReportingTaskRemoved(reportingTaskNode);
+
+ flowController.getExtensionManager().removeInstanceClassLoader(reportingTaskNode.getIdentifier());
+ }
+
+ @Override
+ public Set<ReportingTaskNode> getAllReportingTasks() {
+ return new HashSet<>(allReportingTasks.values());
+ }
+
+ public Set<ControllerServiceNode> getRootControllerServices() {
+ return new HashSet<>(rootControllerServices.values());
+ }
+
+ public void addRootControllerService(final ControllerServiceNode serviceNode) {
+ final ControllerServiceNode existing = rootControllerServices.putIfAbsent(serviceNode.getIdentifier(), serviceNode);
+ if (existing != null) {
+ throw new IllegalStateException("Controller Service with ID " + serviceNode.getIdentifier() + " already exists at the Controller level");
+ }
+ }
+
+ public ControllerServiceNode getRootControllerService(final String serviceIdentifier) {
+ return rootControllerServices.get(serviceIdentifier);
+ }
+
+ public void removeRootControllerService(final ControllerServiceNode service) {
+ final ControllerServiceNode existing = rootControllerServices.get(requireNonNull(service).getIdentifier());
+ if (existing == null) {
+ throw new IllegalStateException(service + " is not a member of this Process Group");
+ }
+
+ service.verifyCanDelete();
+
+ final ExtensionManager extensionManager = flowController.getExtensionManager();
+ final VariableRegistry variableRegistry = flowController.getVariableRegistry();
+
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, service.getControllerServiceImplementation().getClass(), service.getIdentifier())) {
+ final ConfigurationContext configurationContext = new StandardConfigurationContext(service, flowController.getControllerServiceProvider(), null, variableRegistry);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext);
+ }
+
+ for (final Map.Entry<PropertyDescriptor, String> entry : service.getProperties().entrySet()) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.getControllerServiceDefinition() != null) {
+ final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
+ if (value != null) {
+ final ControllerServiceNode referencedNode = getRootControllerService(value);
+ if (referencedNode != null) {
+ referencedNode.removeReference(service);
+ }
+ }
+ }
+ }
+
+ rootControllerServices.remove(service.getIdentifier());
+ flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier());
+
+ extensionManager.removeInstanceClassLoader(service.getIdentifier());
+
+ logger.info("{} removed from Flow Controller", service, this);
+ }
+
+ public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded,
+ final boolean registerLogObserver) {
+ // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+ final ExtensionManager extensionManager = flowController.getExtensionManager();
+ final ControllerServiceProvider controllerServiceProvider = flowController.getControllerServiceProvider();
+
+ final ControllerServiceNode serviceNode = new ExtensionBuilder()
+ .identifier(id)
+ .type(type)
+ .bundleCoordinate(bundleCoordinate)
+ .controllerServiceProvider(flowController.getControllerServiceProvider())
+ .processScheduler(processScheduler)
+ .nodeTypeProvider(flowController)
+ .validationTrigger(flowController.getValidationTrigger())
+ .reloadComponent(flowController.getReloadComponent())
+ .variableRegistry(flowController.getVariableRegistry())
+ .addClasspathUrls(additionalUrls)
+ .kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+ .stateManagerProvider(flowController.getStateManagerProvider())
+ .extensionManager(extensionManager)
+ .buildControllerService();
+
+ LogRepositoryFactory.getRepository(serviceNode.getIdentifier()).setLogger(serviceNode.getLogger());
+ if (registerLogObserver) {
+ // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ControllerServiceLogObserver(bulletinRepository, serviceNode));
+ }
+
+ if (firstTimeAdded) {
+ final ControllerService service = serviceNode.getControllerServiceImplementation();
+
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
+ }
+
+ final ControllerService serviceImpl = serviceNode.getControllerServiceImplementation();
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, serviceImpl.getClass(), serviceImpl.getIdentifier())) {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, serviceImpl);
+ } catch (final Exception e) {
+ throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + serviceImpl, e);
+ }
+ }
+
+ controllerServiceProvider.onControllerServiceAdded(serviceNode);
+
+ return serviceNode;
+ }
+
+ public Set<ControllerServiceNode> getAllControllerServices() {
+ final Set<ControllerServiceNode> allServiceNodes = new HashSet<>();
+ allServiceNodes.addAll(flowController.getControllerServiceProvider().getNonRootControllerServices());
+ allServiceNodes.addAll(rootControllerServices.values());
+ return allServiceNodes;
+ }
+
+ public ControllerServiceNode getControllerServiceNode(final String id) {
+ return flowController.getControllerServiceProvider().getControllerServiceNode(id);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/kerberos/KerberosConfig.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/kerberos/KerberosConfig.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/kerberos/KerberosConfig.java
new file mode 100644
index 0000000..8a6f939
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/kerberos/KerberosConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.kerberos;
+
+import java.io.File;
+
+public class KerberosConfig {
+ private final String principal;
+ private final File keytabLocation;
+ private final File configFile;
+
+ public KerberosConfig(final String principal, final File keytabLocation, final File kerberosConfigurationFile) {
+ this.principal = principal;
+ this.keytabLocation = keytabLocation;
+ this.configFile = kerberosConfigurationFile;
+ }
+
+ public String getPrincipal() {
+ return principal;
+ }
+
+ public File getKeytabLocation() {
+ return keytabLocation;
+ }
+
+ public File getConfigFile() {
+ return configFile;
+ }
+
+ public static final KerberosConfig NOT_CONFIGURED = new KerberosConfig(null, null, null);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index dc780db..5c1f8e9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -236,7 +236,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
return;
}
- final Connection connection = flowController.getConnection(connectionId);
+ final Connection connection = flowController.getFlowManager().getConnection(connectionId);
if (connection == null) {
logger.error("Attempted to receive FlowFiles from Peer {} for Connection with ID {} but no connection exists with that ID", peerDescription, connectionId);
throw new TransactionAbortedException("Attempted to receive FlowFiles from Peer " + peerDescription + " for Connection with ID " + connectionId + " but no Connection exists with that ID");
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index d95a220..66ff5f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -55,13 +55,13 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
private final VariableRegistry variableRegistry;
public StandardReportingContext(final FlowController flowController, final BulletinRepository bulletinRepository,
- final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider, final ReportingTask reportingTask,
+ final Map<PropertyDescriptor, String> properties, final ReportingTask reportingTask,
final VariableRegistry variableRegistry) {
this.flowController = flowController;
- this.eventAccess = flowController;
+ this.eventAccess = flowController.getEventAccess();
this.bulletinRepository = bulletinRepository;
this.properties = Collections.unmodifiableMap(properties);
- this.serviceProvider = serviceProvider;
+ this.serviceProvider = flowController.getControllerServiceProvider();
this.reportingTask = reportingTask;
this.variableRegistry = variableRegistry;
preparedQueries = new HashMap<>();
@@ -94,7 +94,7 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
@Override
public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) {
- final Connectable connectable = flowController.findLocalConnectable(componentId);
+ final Connectable connectable = flowController.getFlowManager().findConnectable(componentId);
if (connectable == null) {
throw new IllegalStateException("Cannot create Component-Level Bulletin because no component can be found with ID " + componentId);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
index ebe774b..d66ed35 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
@@ -16,19 +16,19 @@
*/
package org.apache.nifi.controller.reporting;
-import java.io.File;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
+
+import java.io.File;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
public class StandardReportingInitializationContext implements ReportingInitializationContext, ControllerServiceLookup {
@@ -38,21 +38,19 @@ public class StandardReportingInitializationContext implements ReportingInitiali
private final SchedulingStrategy schedulingStrategy;
private final ControllerServiceProvider serviceProvider;
private final ComponentLog logger;
- private final NiFiProperties nifiProperties;
+ private final KerberosConfig kerberosConfig;
private final NodeTypeProvider nodeTypeProvider;
- public StandardReportingInitializationContext(
- final String id, final String name, final SchedulingStrategy schedulingStrategy,
- final String schedulingPeriod, final ComponentLog logger,
- final ControllerServiceProvider serviceProvider, final NiFiProperties nifiProperties,
- final NodeTypeProvider nodeTypeProvider) {
+ public StandardReportingInitializationContext(final String id, final String name, final SchedulingStrategy schedulingStrategy, final String schedulingPeriod,
+ final ComponentLog logger, final ControllerServiceProvider serviceProvider, final KerberosConfig kerberosConfig,
+ final NodeTypeProvider nodeTypeProvider) {
this.id = id;
this.name = name;
this.schedulingPeriod = schedulingPeriod;
this.serviceProvider = serviceProvider;
this.schedulingStrategy = schedulingStrategy;
this.logger = logger;
- this.nifiProperties = nifiProperties;
+ this.kerberosConfig = kerberosConfig;
this.nodeTypeProvider = nodeTypeProvider;
}
@@ -126,17 +124,17 @@ public class StandardReportingInitializationContext implements ReportingInitiali
@Override
public String getKerberosServicePrincipal() {
- return nifiProperties.getKerberosServicePrincipal();
+ return kerberosConfig.getPrincipal();
}
@Override
public File getKerberosServiceKeytab() {
- return nifiProperties.getKerberosServiceKeytabLocation() == null ? null : new File(nifiProperties.getKerberosServiceKeytabLocation());
+ return kerberosConfig.getKeytabLocation();
}
@Override
public File getKerberosConfigurationFile() {
- return nifiProperties.getKerberosConfigurationFile();
+ return kerberosConfig.getConfigFile();
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index 1cc5325..b63fffd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -40,18 +40,17 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
- final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent,
- final ExtensionManager extensionManager, final ValidationTrigger validationTrigger) {
- super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, reloadComponent, extensionManager, validationTrigger);
+ final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ExtensionManager extensionManager,
+ final ValidationTrigger validationTrigger) {
+ super(reportingTask, id, controller.getControllerServiceProvider(), processScheduler, validationContextFactory, variableRegistry, reloadComponent, extensionManager, validationTrigger);
this.flowController = controller;
}
public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
final String componentType, final String canonicalClassName, final ComponentVariableRegistry variableRegistry,
- final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger,
- final boolean isExtensionMissing) {
- super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,
+ final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger, final boolean isExtensionMissing) {
+ super(reportingTask, id, controller.getControllerServiceProvider(), processScheduler, validationContextFactory, componentType, canonicalClassName,
variableRegistry, reloadComponent, extensionManager, validationTrigger, isExtensionMissing);
this.flowController = controller;
}
@@ -83,6 +82,6 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
@Override
public ReportingContext getReportingContext() {
- return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), flowController, getReportingTask(), getVariableRegistry());
+ return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), getReportingTask(), getVariableRegistry());
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardQueueProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardQueueProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardQueueProvider.java
new file mode 100644
index 0000000..55e36c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardQueueProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class StandardQueueProvider implements QueueProvider {
+ private final FlowController flowController;
+
+ public StandardQueueProvider(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+
+ @Override
+ public Collection<FlowFileQueue> getAllQueues() {
+ final Collection<Connection> connections = flowController.getFlowManager().findAllConnections();
+ final List<FlowFileQueue> queues = new ArrayList<>(connections.size());
+ for (final Connection connection : connections) {
+ queues.add(connection.getFlowFileQueue());
+ }
+
+ return queues;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 4e396fd..6313097 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -77,6 +77,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
private final long administrativeYieldMillis;
private final String administrativeYieldDuration;
private final StateManagerProvider stateManagerProvider;
+ private final long processorStartTimeoutMillis;
private final ConcurrentMap<Object, LifecycleState> lifecycleStates = new ConcurrentHashMap<>();
private final ScheduledExecutorService frameworkTaskExecutor;
@@ -91,7 +92,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
public StandardProcessScheduler(final FlowEngine componentLifecycleThreadPool, final FlowController flowController, final StringEncryptor encryptor,
final StateManagerProvider stateManagerProvider, final NiFiProperties nifiProperties) {
this.componentLifeCycleThreadPool = componentLifecycleThreadPool;
- this.controllerServiceProvider = flowController;
+ this.controllerServiceProvider = flowController.getControllerServiceProvider();
this.flowController = flowController;
this.encryptor = encryptor;
this.stateManagerProvider = stateManagerProvider;
@@ -99,6 +100,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
administrativeYieldDuration = nifiProperties.getAdministrativeYieldDuration();
administrativeYieldMillis = FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS);
+ final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
+ processorStartTimeoutMillis = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
+
frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread");
}
@@ -283,10 +287,10 @@ public final class StandardProcessScheduler implements ProcessScheduler {
/**
* Starts the given {@link Processor} by invoking its
- * {@link ProcessorNode#start(ScheduledExecutorService, long, ProcessContext, SchedulingAgentCallback, boolean)}
+ * {@link ProcessorNode#start(ScheduledExecutorService, long, long, ProcessContext, SchedulingAgentCallback, boolean)}
* method.
*
- * @see StandardProcessorNode#start(ScheduledExecutorService, long, ProcessContext, SchedulingAgentCallback, boolean)
+ * @see StandardProcessorNode#start(ScheduledExecutorService, long, long, ProcessContext, SchedulingAgentCallback, boolean)
*/
@Override
public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode procNode, final boolean failIfStopping) {
@@ -317,7 +321,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
};
LOG.info("Starting {}", procNode);
- procNode.start(this.componentMonitoringThreadPool, this.administrativeYieldMillis, processContext, callback, failIfStopping);
+ procNode.start(componentMonitoringThreadPool, administrativeYieldMillis, processorStartTimeoutMillis, processContext, callback, failIfStopping);
return future;
}
@@ -359,7 +363,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
getSchedulingAgent(procNode).incrementMaxThreadCount(tasksTerminated);
try {
- flowController.reload(procNode, procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(), Collections.emptySet());
+ flowController.getReloadComponent().reload(procNode, procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(), Collections.emptySet());
} catch (final ProcessorInstantiationException e) {
// This shouldn't happen because we already have been able to instantiate the processor before
LOG.error("Failed to replace instance of Processor for {} when terminating Processor", procNode);
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index 597c8fb..87b6540 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -106,12 +106,12 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
rootNode.appendChild(registriesElement);
addFlowRegistries(registriesElement, controller.getFlowRegistryClient());
- addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup", scheduledStateLookup);
+ addProcessGroup(rootNode, controller.getFlowManager().getRootGroup(), "rootGroup", scheduledStateLookup);
// Add root-level controller services
final Element controllerServicesNode = doc.createElement("controllerServices");
rootNode.appendChild(controllerServicesNode);
- for (final ControllerServiceNode serviceNode : controller.getRootControllerServices()) {
+ for (final ControllerServiceNode serviceNode : controller.getFlowManager().getRootControllerServices()) {
addControllerService(controllerServicesNode, serviceNode);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index e82c436..f226bb6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -113,7 +113,7 @@ public class ControllerServiceLoader {
for (final Element serviceElement : serviceElements) {
final ControllerServiceNode serviceNode = createControllerService(controller, serviceElement, encryptor);
if (parentGroup == null) {
- controller.addRootControllerService(serviceNode);
+ controller.getFlowManager().addRootControllerService(serviceNode);
} else {
parentGroup.addControllerService(serviceNode);
}
@@ -162,19 +162,20 @@ public class ControllerServiceLoader {
// Start services
if (autoResumeState) {
logger.debug("Enabling Controller Services {}", nodesToEnable);
- nodesToEnable.stream().forEach(ControllerServiceNode::performValidation); // validate services before attempting to enable them
- controller.enableControllerServices(nodesToEnable);
+ nodesToEnable.forEach(ControllerServiceNode::performValidation); // validate services before attempting to enable them
+
+ controller.getControllerServiceProvider().enableControllerServices(nodesToEnable);
} else {
logger.debug("Will not enable the following Controller Services because 'auto-resume state' flag is false: {}", nodesToEnable);
}
}
- public static ControllerServiceNode cloneControllerService(final ControllerServiceProvider provider, final ControllerServiceNode controllerService) {
+ public static ControllerServiceNode cloneControllerService(final FlowController flowController, final ControllerServiceNode controllerService) {
// create a new id for the clone seeded from the original id so that it is consistent in a cluster
final UUID id = UUID.nameUUIDFromBytes(controllerService.getIdentifier().getBytes(StandardCharsets.UTF_8));
- final ControllerServiceNode clone = provider.createControllerService(controllerService.getCanonicalClassName(), id.toString(),
- controllerService.getBundleCoordinate(), Collections.emptySet(), false);
+ final ControllerServiceNode clone = flowController.getFlowManager().createControllerService(controllerService.getCanonicalClassName(), id.toString(),
+ controllerService.getBundleCoordinate(), Collections.emptySet(), false, true);
clone.setName(controllerService.getName());
clone.setComments(controllerService.getComments());
@@ -189,12 +190,12 @@ public class ControllerServiceLoader {
return clone;
}
- private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) {
+ private static ControllerServiceNode createControllerService(final FlowController flowController, final Element controllerServiceElement, final StringEncryptor encryptor) {
final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
BundleCoordinate coordinate;
try {
- coordinate = BundleUtils.getCompatibleBundle(provider.getExtensionManager(), dto.getType(), dto.getBundle());
+ coordinate = BundleUtils.getCompatibleBundle(flowController.getExtensionManager(), dto.getType(), dto.getBundle());
} catch (final IllegalStateException e) {
final BundleDTO bundleDTO = dto.getBundle();
if (bundleDTO == null) {
@@ -204,7 +205,7 @@ public class ControllerServiceLoader {
}
}
- final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), coordinate, Collections.emptySet(), false);
+ final ControllerServiceNode node = flowController.getFlowManager().createControllerService(dto.getType(), dto.getId(), coordinate, Collections.emptySet(), false, true);
node.setName(dto.getName());
node.setComments(dto.getComments());
node.setVersionedComponentId(dto.getVersionedComponentId());
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/GhostControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/GhostControllerService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/GhostControllerService.java
new file mode 100644
index 0000000..ff0086b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/GhostControllerService.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class GhostControllerService implements ControllerService {
+
+ private final String identifier;
+ private final String canonicalClassName;
+
+ public GhostControllerService(final String identifier, final String canonicalClassName) {
+ this.identifier = identifier;
+ this.canonicalClassName = canonicalClassName;
+ }
+
+ @Override
+ public void initialize(final ControllerServiceInitializationContext context) throws InitializationException {
+ }
+
+ @Override
+ public Collection<ValidationResult> validate(final ValidationContext context) {
+ return Collections.singleton(new ValidationResult.Builder()
+ .input("Any Property")
+ .subject("Missing Controller Service")
+ .valid(false)
+ .explanation("Controller Service is of type " + canonicalClassName + ", but this is not a valid Reporting Task type")
+ .build());
+ }
+
+ @Override
+ public PropertyDescriptor getPropertyDescriptor(final String propertyName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyName)
+ .description(propertyName)
+ .required(true)
+ .sensitive(true)
+ .build();
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ }
+
+ @Override
+ public List<PropertyDescriptor> getPropertyDescriptors() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public String toString() {
+ return "GhostControllerService[id=" + identifier + ", type=" + canonicalClassName + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
index f169662..4d2bbee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
@@ -16,15 +16,15 @@
*/
package org.apache.nifi.controller.service;
-import java.io.File;
-import java.util.Set;
-
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.NiFiProperties;
+
+import java.io.File;
+import java.util.Set;
public class StandardControllerServiceInitializationContext implements ControllerServiceInitializationContext, ControllerServiceLookup {
@@ -32,17 +32,17 @@ public class StandardControllerServiceInitializationContext implements Controlle
private final ControllerServiceProvider serviceProvider;
private final ComponentLog logger;
private final StateManager stateManager;
- private final NiFiProperties nifiProperties;
+ private final KerberosConfig kerberosConfig;
public StandardControllerServiceInitializationContext(
final String identifier, final ComponentLog logger,
final ControllerServiceProvider serviceProvider, final StateManager stateManager,
- final NiFiProperties nifiProperties) {
+ final KerberosConfig kerberosConfig) {
this.id = identifier;
this.logger = logger;
this.serviceProvider = serviceProvider;
this.stateManager = stateManager;
- this.nifiProperties = nifiProperties;
+ this.kerberosConfig = kerberosConfig;
}
@Override
@@ -97,16 +97,16 @@ public class StandardControllerServiceInitializationContext implements Controlle
@Override
public String getKerberosServicePrincipal() {
- return nifiProperties.getKerberosServicePrincipal();
+ return kerberosConfig.getPrincipal();
}
@Override
public File getKerberosServiceKeytab() {
- return nifiProperties.getKerberosServiceKeytabLocation() == null ? null : new File(nifiProperties.getKerberosServiceKeytabLocation());
+ return kerberosConfig.getKeytabLocation();
}
@Override
public File getKerberosConfigurationFile() {
- return nifiProperties.getKerberosConfigurationFile();
+ return kerberosConfig.getConfigFile();
}
}