You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/11/06 16:24:06 UTC
[5/9] nifi git commit: NIFI-5769: Refactored FlowController to use
Composition over Inheritance - Ensure that when root group is set,
that we register its ID in FlowManager
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 06d32e2..e545558 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -16,11 +16,24 @@
*/
package org.apache.nifi.controller.service;
-import static java.util.Objects.requireNonNull;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -39,46 +52,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.lifecycle.OnAdded;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.bundle.BundleCoordinate;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.state.StateManager;
-import org.apache.nifi.components.state.StateManagerProvider;
-import org.apache.nifi.components.validation.ValidationTrigger;
-import org.apache.nifi.controller.ComponentNode;
-import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.controller.LoggableComponent;
-import org.apache.nifi.controller.ProcessorNode;
-import org.apache.nifi.controller.ReportingTaskNode;
-import org.apache.nifi.controller.ScheduledState;
-import org.apache.nifi.controller.TerminationAwareLogger;
-import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.exception.ComponentLifeCycleException;
-import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
-import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
-import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.logging.LogRepositoryFactory;
-import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.nar.NarCloseable;
-import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.processor.StandardValidationContextFactory;
-import org.apache.nifi.registry.ComponentVariableRegistry;
-import org.apache.nifi.registry.VariableRegistry;
-import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.ReflectionUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static java.util.Objects.requireNonNull;
public class StandardControllerServiceProvider implements ControllerServiceProvider {
@@ -86,170 +60,21 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private final StandardProcessScheduler processScheduler;
private final BulletinRepository bulletinRepo;
- private final StateManagerProvider stateManagerProvider;
- private final VariableRegistry variableRegistry;
private final FlowController flowController;
- private final NiFiProperties nifiProperties;
+ private final FlowManager flowManager;
private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>();
- private final ValidationTrigger validationTrigger;
-
- public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo,
- final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties, final ValidationTrigger validationTrigger) {
+ public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo) {
this.flowController = flowController;
this.processScheduler = scheduler;
this.bulletinRepo = bulletinRepo;
- this.stateManagerProvider = stateManagerProvider;
- this.variableRegistry = variableRegistry;
- this.nifiProperties = nifiProperties;
- this.validationTrigger = validationTrigger;
- }
-
- private StateManager getStateManager(final String componentId) {
- return stateManagerProvider.getStateManager(componentId);
+ this.flowManager = flowController.getFlowManager();
}
@Override
- public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded) {
- if (type == null || id == null || bundleCoordinate == null) {
- throw new NullPointerException();
- }
-
- ClassLoader cl = null;
- final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
- final ExtensionManager extensionManager = flowController.getExtensionManager();
- try {
- final Class<?> rawClass;
- try {
- final Bundle csBundle = extensionManager.getBundle(bundleCoordinate);
- if (csBundle == null) {
- throw new ControllerServiceInstantiationException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
- }
-
- cl = extensionManager.createInstanceClassLoader(type, id, csBundle, additionalUrls);
- Thread.currentThread().setContextClassLoader(cl);
- rawClass = Class.forName(type, false, cl);
- } catch (final Exception e) {
- logger.error("Could not create Controller Service of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", e);
- Thread.currentThread().setContextClassLoader(currentContextClassLoader);
- return createGhostControllerService(type, id, bundleCoordinate);
- }
-
- final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
-
- final ControllerService originalService = controllerServiceClass.newInstance();
- final StandardControllerServiceInvocationHandler invocationHandler = new StandardControllerServiceInvocationHandler(extensionManager, originalService);
-
- // extract all interfaces... controllerServiceClass is non null so getAllInterfaces is non null
- final List<Class<?>> interfaceList = ClassUtils.getAllInterfaces(controllerServiceClass);
- final Class<?>[] interfaces = interfaceList.toArray(new Class<?>[interfaceList.size()]);
-
- final ControllerService proxiedService;
- if (cl == null) {
- proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), interfaces, invocationHandler);
- } else {
- proxiedService = (ControllerService) Proxy.newProxyInstance(cl, interfaces, invocationHandler);
- }
- logger.info("Created Controller Service of type {} with identifier {}", type, id);
-
- final ComponentLog serviceLogger = new SimpleProcessLogger(id, originalService);
- final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(serviceLogger);
-
- originalService.initialize(new StandardControllerServiceInitializationContext(id, terminationAwareLogger, this, getStateManager(id), nifiProperties));
-
-
-
- final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(originalService, bundleCoordinate, terminationAwareLogger);
- final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, terminationAwareLogger);
-
- final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
- final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this, componentVarRegistry);
- final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler,
- id, validationContextFactory, this, componentVarRegistry, flowController, flowController.getExtensionManager(), validationTrigger);
- serviceNode.setName(rawClass.getSimpleName());
-
- invocationHandler.setServiceNode(serviceNode);
-
- if (firstTimeAdded) {
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), originalService.getClass(), originalService.getIdentifier())) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService);
- } catch (final Exception e) {
- throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e);
- }
- }
-
- serviceCache.putIfAbsent(id, serviceNode);
-
- return serviceNode;
- } catch (final Throwable t) {
- throw new ControllerServiceInstantiationException(t);
- } finally {
- if (currentContextClassLoader != null) {
- Thread.currentThread().setContextClassLoader(currentContextClassLoader);
- }
- }
- }
-
- private ControllerServiceNode createGhostControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate) {
- final ControllerServiceInvocationHandler invocationHandler = new ControllerServiceInvocationHandler() {
- @Override
- public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
- final String methodName = method.getName();
-
- if ("validate".equals(methodName)) {
- final ValidationResult result = new ValidationResult.Builder()
- .input("Any Property")
- .subject("Missing Controller Service")
- .valid(false)
- .explanation("Controller Service could not be created because the Controller Service Type (" + type + ") could not be found")
- .build();
- return Collections.singleton(result);
- } else if ("getPropertyDescriptor".equals(methodName)) {
- final String propertyName = (String) args[0];
- return new PropertyDescriptor.Builder()
- .name(propertyName)
- .description(propertyName)
- .sensitive(true)
- .required(true)
- .build();
- } else if ("getPropertyDescriptors".equals(methodName)) {
- return Collections.emptyList();
- } else if ("onPropertyModified".equals(methodName)) {
- return null;
- } else if ("getIdentifier".equals(methodName)) {
- return id;
- } else if ("toString".equals(methodName)) {
- return "GhostControllerService[id=" + id + ", type=" + type + "]";
- } else if ("hashCode".equals(methodName)) {
- return 91 * type.hashCode() + 41 * id.hashCode();
- } else if ("equals".equals(methodName)) {
- return proxy == args[0];
- } else {
- throw new IllegalStateException("Controller Service could not be created because the Controller Service Type (" + type + ") could not be found");
- }
- }
- @Override
- public void setServiceNode(ControllerServiceNode serviceNode) {
- // nothing to do
- }
- };
-
- final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(),
- new Class[]{ControllerService.class}, invocationHandler);
-
- final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
- final String componentType = "(Missing) " + simpleClassName;
-
- final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, null);
-
- final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
- final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, id,
- new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, componentVarRegistry, flowController,
- flowController.getExtensionManager(), validationTrigger, true);
-
- serviceCache.putIfAbsent(id, serviceNode);
- return serviceNode;
+ public void onControllerServiceAdded(final ControllerServiceNode serviceNode) {
+ serviceCache.putIfAbsent(serviceNode.getIdentifier(), serviceNode);
}
@Override
@@ -364,7 +189,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
while (serviceIter.hasNext() && shouldStart) {
ControllerServiceNode controllerServiceNode = serviceIter.next();
- List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices();
+ List<ControllerServiceNode> requiredServices = controllerServiceNode.getRequiredControllerServices();
for (ControllerServiceNode requiredService : requiredServices) {
if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
shouldStart = false;
@@ -411,10 +236,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) {
// validate that we are able to start all of the services.
- Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
- while (serviceIter.hasNext()) {
- ControllerServiceNode controllerServiceNode = serviceIter.next();
- List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices();
+ for (final ControllerServiceNode controllerServiceNode : serviceNodes) {
+ List<ControllerServiceNode> requiredServices = controllerServiceNode.getRequiredControllerServices();
for (ControllerServiceNode requiredService : requiredServices) {
if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
logger.error("Cannot enable {} because it has a dependency on {}, which is not enabled", controllerServiceNode, requiredService);
@@ -502,7 +325,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
for (final ControllerServiceNode node : serviceNodeMap.values()) {
final List<ControllerServiceNode> branch = new ArrayList<>();
- determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>());
+ determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<>());
orderedNodeLists.add(branch);
}
@@ -601,15 +424,15 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
private ProcessGroup getRootGroup() {
- return flowController.getGroup(flowController.getRootGroupId());
+ return flowManager.getRootGroup();
}
@Override
public ControllerService getControllerServiceForComponent(final String serviceIdentifier, final String componentId) {
// Find the Process Group that owns the component.
- ProcessGroup groupOfInterest = null;
+ ProcessGroup groupOfInterest;
- final ProcessorNode procNode = flowController.getProcessorNode(componentId);
+ final ProcessorNode procNode = flowManager.getProcessorNode(componentId);
if (procNode == null) {
final ControllerServiceNode serviceNode = getControllerServiceNode(componentId);
if (serviceNode == null) {
@@ -620,7 +443,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
// we have confirmed that the component is a reporting task. We can only reference Controller Services
// that are scoped at the FlowController level in this case.
- final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
+ final ControllerServiceNode rootServiceNode = flowManager.getRootControllerService(serviceIdentifier);
return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
} else {
groupOfInterest = serviceNode.getProcessGroup();
@@ -630,7 +453,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
if (groupOfInterest == null) {
- final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
+ final ControllerServiceNode rootServiceNode = flowManager.getRootControllerService(serviceIdentifier);
return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
}
@@ -663,7 +486,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
- final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
+ final ControllerServiceNode rootServiceNode = flowManager.getRootControllerService(serviceIdentifier);
if (rootServiceNode != null) {
return rootServiceNode;
}
@@ -675,10 +498,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, final String groupId) {
final Set<ControllerServiceNode> serviceNodes;
if (groupId == null) {
- serviceNodes = flowController.getRootControllerServices();
+ serviceNodes = flowManager.getRootControllerServices();
} else {
ProcessGroup group = getRootGroup();
- if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) {
+ if (!FlowManager.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) {
group = group.findProcessGroup(groupId);
}
@@ -706,7 +529,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public void removeControllerService(final ControllerServiceNode serviceNode) {
final ProcessGroup group = requireNonNull(serviceNode).getProcessGroup();
if (group == null) {
- flowController.removeRootControllerService(serviceNode);
+ flowManager.removeRootControllerService(serviceNode);
return;
}
@@ -718,12 +541,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
@Override
- public Set<ControllerServiceNode> getAllControllerServices() {
- final Set<ControllerServiceNode> allServices = new HashSet<>();
- allServices.addAll(flowController.getRootControllerServices());
- allServices.addAll(serviceCache.values());
-
- return allServices;
+ public Collection<ControllerServiceNode> getNonRootControllerServices() {
+ return serviceCache.values();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
index 639f8a2..6a60058 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
@@ -17,9 +17,6 @@
package org.apache.nifi.controller.state;
-import java.io.IOException;
-import java.util.Map;
-
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
@@ -29,6 +26,9 @@ import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.processor.SimpleProcessLogger;
+import java.io.IOException;
+import java.util.Map;
+
public class StandardStateManager implements StateManager {
private final StateProvider localProvider;
private final StateProvider clusterProvider;
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 851aad3..5a49c72 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -19,10 +19,12 @@ package org.apache.nifi.controller.tasks;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.lifecycle.TaskTerminationAwareStateManager;
+import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.ActiveProcessSessionFactory;
import org.apache.nifi.controller.repository.BatchingSessionFactory;
import org.apache.nifi.controller.repository.RepositoryContext;
@@ -80,7 +82,7 @@ public class ConnectableTask {
final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
if (connectable instanceof ProcessorNode) {
- processContext = new StandardProcessContext((ProcessorNode) connectable, flowController, encryptor, stateManager, scheduleState::isTerminated);
+ processContext = new StandardProcessContext((ProcessorNode) connectable, flowController.getControllerServiceProvider(), encryptor, stateManager, scheduleState::isTerminated);
} else {
processContext = new ConnectableProcessContext(connectable, encryptor, stateManager);
}
@@ -142,8 +144,8 @@ public class ConnectableTask {
private boolean isBackPressureEngaged() {
return connectable.getIncomingConnections().stream()
.filter(con -> con.getSource() == connectable)
- .map(con -> con.getFlowFileQueue())
- .anyMatch(queue -> queue.isFull());
+ .map(Connection::getFlowFileQueue)
+ .anyMatch(FlowFileQueue::isFull);
}
public InvocationResult invoke() {
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
index 6e2ee4c..0c8e8a9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
@@ -16,9 +16,6 @@
*/
package org.apache.nifi.controller.tasks;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
@@ -35,6 +32,9 @@ import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* This task runs through all Connectable Components and goes through its incoming queues, polling for FlowFiles and accepting none. This causes the desired side effect of expiring old FlowFiles.
*/
@@ -51,7 +51,7 @@ public class ExpireFlowFiles implements Runnable {
@Override
public void run() {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
try {
expireFlowFiles(rootGroup);
} catch (final Exception e) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index a27962a..61a902a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -52,6 +52,7 @@ import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
@@ -64,6 +65,7 @@ import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.Relationship;
@@ -160,6 +162,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private final StandardProcessScheduler scheduler;
private final ControllerServiceProvider controllerServiceProvider;
private final FlowController flowController;
+ private final FlowManager flowManager;
private final Map<String, Port> inputPorts = new HashMap<>();
private final Map<String, Port> outputPorts = new HashMap<>();
@@ -192,6 +195,7 @@ public final class StandardProcessGroup implements ProcessGroup {
this.encryptor = encryptor;
this.flowController = flowController;
this.variableRegistry = variableRegistry;
+ this.flowManager = flowController.getFlowManager();
name = new AtomicReference<>();
position = new AtomicReference<>(new Position(0D, 0D));
@@ -422,9 +426,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
});
- findAllInputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> {
- port.getProcessGroup().startInputPort(port);
- });
+ findAllInputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> port.getProcessGroup().startInputPort(port));
findAllOutputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> {
port.getProcessGroup().startOutputPort(port);
@@ -446,13 +448,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
});
- findAllInputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> {
- port.getProcessGroup().stopInputPort(port);
- });
-
- findAllOutputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> {
- port.getProcessGroup().stopOutputPort(port);
- });
+ findAllInputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> port.getProcessGroup().stopInputPort(port));
+ findAllOutputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> port.getProcessGroup().stopOutputPort(port));
} finally {
readLock.unlock();
}
@@ -513,7 +510,7 @@ public final class StandardProcessGroup implements ProcessGroup {
port.setProcessGroup(this);
inputPorts.put(requireNonNull(port).getIdentifier(), port);
- flowController.onInputPortAdded(port);
+ flowManager.onInputPortAdded(port);
onComponentModified();
} finally {
writeLock.unlock();
@@ -552,7 +549,7 @@ public final class StandardProcessGroup implements ProcessGroup {
scheduler.onPortRemoved(port);
onComponentModified();
- flowController.onInputPortRemoved(port);
+ flowManager.onInputPortRemoved(port);
LOG.info("Input Port {} removed from flow", port);
} finally {
writeLock.unlock();
@@ -598,7 +595,7 @@ public final class StandardProcessGroup implements ProcessGroup {
port.setProcessGroup(this);
outputPorts.put(port.getIdentifier(), port);
- flowController.onOutputPortAdded(port);
+ flowManager.onOutputPortAdded(port);
onComponentModified();
} finally {
writeLock.unlock();
@@ -628,7 +625,7 @@ public final class StandardProcessGroup implements ProcessGroup {
scheduler.onPortRemoved(port);
onComponentModified();
- flowController.onOutputPortRemoved(port);
+ flowManager.onOutputPortRemoved(port);
LOG.info("Output Port {} removed from flow", port);
} finally {
writeLock.unlock();
@@ -667,10 +664,10 @@ public final class StandardProcessGroup implements ProcessGroup {
group.getVariableRegistry().setParent(getVariableRegistry());
processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
- flowController.onProcessGroupAdded(group);
+ flowManager.onProcessGroupAdded(group);
- group.findAllControllerServices().stream().forEach(this::updateControllerServiceReferences);
- group.findAllProcessors().stream().forEach(this::updateControllerServiceReferences);
+ group.findAllControllerServices().forEach(this::updateControllerServiceReferences);
+ group.findAllProcessors().forEach(this::updateControllerServiceReferences);
onComponentModified();
} finally {
@@ -714,7 +711,7 @@ public final class StandardProcessGroup implements ProcessGroup {
processGroups.remove(group.getIdentifier());
onComponentModified();
- flowController.onProcessGroupRemoved(group);
+ flowManager.onProcessGroupRemoved(group);
LOG.info("{} removed from flow", group);
} finally {
writeLock.unlock();
@@ -752,7 +749,7 @@ public final class StandardProcessGroup implements ProcessGroup {
for (final ControllerServiceNode cs : group.getControllerServices(false)) {
// Must go through Controller Service here because we need to ensure that it is removed from the cache
- flowController.removeControllerService(cs);
+ flowController.getControllerServiceProvider().removeControllerService(cs);
}
for (final ProcessGroup childGroup : new ArrayList<>(group.getProcessGroups())) {
@@ -820,8 +817,8 @@ public final class StandardProcessGroup implements ProcessGroup {
LOG.warn("Failed to clean up resources for {} due to {}", remoteGroup, e);
}
- remoteGroup.getInputPorts().stream().forEach(scheduler::onPortRemoved);
- remoteGroup.getOutputPorts().stream().forEach(scheduler::onPortRemoved);
+ remoteGroup.getInputPorts().forEach(scheduler::onPortRemoved);
+ remoteGroup.getOutputPorts().forEach(scheduler::onPortRemoved);
remoteGroups.remove(remoteGroupId);
LOG.info("{} removed from flow", remoteProcessGroup);
@@ -843,7 +840,7 @@ public final class StandardProcessGroup implements ProcessGroup {
processor.setProcessGroup(this);
processor.getVariableRegistry().setParent(getVariableRegistry());
processors.put(processorId, processor);
- flowController.onProcessorAdded(processor);
+ flowManager.onProcessorAdded(processor);
updateControllerServiceReferences(processor);
onComponentModified();
} finally {
@@ -926,9 +923,12 @@ public final class StandardProcessGroup implements ProcessGroup {
onComponentModified();
scheduler.onProcessorRemoved(processor);
- flowController.onProcessorRemoved(processor);
+ flowManager.onProcessorRemoved(processor);
- LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(processor.getIdentifier());
+ if (logRepository != null) {
+ logRepository.removeAllObservers();
+ }
final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider();
scheduler.submitFrameworkTask(new Runnable() {
@@ -1068,7 +1068,7 @@ public final class StandardProcessGroup implements ProcessGroup {
destination.addConnection(connection);
}
connections.put(connection.getIdentifier(), connection);
- flowController.onConnectionAdded(connection);
+ flowManager.onConnectionAdded(connection);
onComponentModified();
} finally {
writeLock.unlock();
@@ -1133,7 +1133,7 @@ public final class StandardProcessGroup implements ProcessGroup {
LOG.info("{} removed from flow", connection);
onComponentModified();
- flowController.onConnectionRemoved(connection);
+ flowManager.onConnectionRemoved(connection);
} finally {
writeLock.unlock();
}
@@ -1161,7 +1161,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Connection findConnection(final String id) {
- final Connection connection = flowController.getConnection(id);
+ final Connection connection = flowManager.getConnection(id);
if (connection == null) {
return null;
}
@@ -1601,12 +1601,12 @@ public final class StandardProcessGroup implements ProcessGroup {
return this;
}
- final ProcessGroup group = flowController.getGroup(id);
+ final ProcessGroup group = flowManager.getGroup(id);
if (group == null) {
return null;
}
- // We found a Processor in the Controller, but we only want to return it if
+ // We found a Process Group in the Controller, but we only want to return it if
// the Process Group is this or is a child of this.
if (isOwner(group.getParent())) {
return group;
@@ -1664,7 +1664,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public ProcessorNode findProcessor(final String id) {
- final ProcessorNode node = flowController.getProcessorNode(id);
+ final ProcessorNode node = flowManager.getProcessorNode(id);
if (node == null) {
return null;
}
@@ -1769,7 +1769,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Port findInputPort(final String id) {
- final Port port = flowController.getInputPort(id);
+ final Port port = flowManager.getInputPort(id);
if (port == null) {
return null;
}
@@ -1796,7 +1796,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Port findOutputPort(final String id) {
- final Port port = flowController.getOutputPort(id);
+ final Port port = flowManager.getOutputPort(id);
if (port == null) {
return null;
}
@@ -1904,7 +1904,7 @@ public final class StandardProcessGroup implements ProcessGroup {
funnel.setProcessGroup(this);
funnels.put(funnel.getIdentifier(), funnel);
- flowController.onFunnelAdded(funnel);
+ flowManager.onFunnelAdded(funnel);
if (autoStart) {
startFunnel(funnel);
@@ -1928,7 +1928,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Funnel findFunnel(final String id) {
- final Funnel funnel = flowController.getFunnel(id);
+ final Funnel funnel = flowManager.getFunnel(id);
if (funnel == null) {
return funnel;
}
@@ -2026,7 +2026,7 @@ public final class StandardProcessGroup implements ProcessGroup {
funnels.remove(funnel.getIdentifier());
onComponentModified();
- flowController.onFunnelRemoved(funnel);
+ flowManager.onFunnelRemoved(funnel);
LOG.info("{} removed from flow", funnel);
} finally {
writeLock.unlock();
@@ -2988,7 +2988,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final Map<VariableDescriptor, String> variableMap = new HashMap<>();
- variables.entrySet().stream() // cannot use Collectors.toMap because value may be null
+ variables.entrySet() // cannot use Collectors.toMap because value may be null
.forEach(entry -> variableMap.put(new VariableDescriptor(entry.getKey()), entry.getValue()));
variableRegistry.setVariables(variableMap);
@@ -3199,30 +3199,27 @@ public final class StandardProcessGroup implements ProcessGroup {
private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function<String, String> lookup) {
processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier()));
- processGroup.getConnections().stream()
+ processGroup.getConnections()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
- processGroup.getProcessors().stream()
+ processGroup.getProcessors()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
- processGroup.getInputPorts().stream()
+ processGroup.getInputPorts()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
- processGroup.getOutputPorts().stream()
+ processGroup.getOutputPorts()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
- processGroup.getLabels().stream()
+ processGroup.getLabels()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
- processGroup.getFunnels().stream()
+ processGroup.getFunnels()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
- processGroup.getControllerServices(false).stream()
+ processGroup.getControllerServices(false)
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
- processGroup.getRemoteProcessGroups().stream()
+ processGroup.getRemoteProcessGroups()
.forEach(rpg -> {
rpg.setVersionedComponentId(lookup.apply(rpg.getIdentifier()));
- rpg.getInputPorts().stream()
- .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
-
- rpg.getOutputPorts().stream()
- .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
+ rpg.getInputPorts().forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
+ rpg.getOutputPorts().forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
});
for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
@@ -3552,7 +3549,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (childGroup == null) {
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
- flowController.onProcessGroupAdded(added);
+ flowManager.onProcessGroupAdded(added);
added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
LOG.info("Added {} to {}", added, this);
} else if (childCoordinates == null || updateDescendantVersionedGroups) {
@@ -3572,7 +3569,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier());
if (funnel == null) {
final Funnel added = addFunnel(group, proposedFunnel, componentIdSeed);
- flowController.onFunnelAdded(added);
+ flowManager.onFunnelAdded(added);
LOG.info("Added {} to {}", added, this);
} else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) {
updateFunnel(funnel, proposedFunnel);
@@ -3594,7 +3591,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
if (port == null) {
final Port added = addInputPort(group, proposedPort, componentIdSeed);
- flowController.onInputPortAdded(added);
+ flowManager.onInputPortAdded(added);
LOG.info("Added {} to {}", added, this);
} else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
updatePort(port, proposedPort);
@@ -3615,7 +3612,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier());
if (port == null) {
final Port added = addOutputPort(group, proposedPort, componentIdSeed);
- flowController.onOutputPortAdded(added);
+ flowManager.onOutputPortAdded(added);
LOG.info("Added {} to {}", added, this);
} else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
updatePort(port, proposedPort);
@@ -3659,7 +3656,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
if (processor == null) {
final ProcessorNode added = addProcessor(group, proposedProcessor, componentIdSeed);
- flowController.onProcessorAdded(added);
+ flowManager.onProcessorAdded(added);
final Set<Relationship> proposedAutoTerminated =
proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
@@ -3718,7 +3715,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
if (connection == null) {
final Connection added = addConnection(group, proposedConnection, componentIdSeed);
- flowController.onConnectionAdded(added);
+ flowManager.onConnectionAdded(added);
LOG.info("Added {} to {}", added, this);
} else if (isUpdateable(connection)) {
// If the connection needs to be updated, then the source and destination will already have
@@ -3739,7 +3736,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final Connection connection = connectionsByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", connection, group);
group.removeConnection(connection);
- flowController.onConnectionRemoved(connection);
+ flowManager.onConnectionRemoved(connection);
}
// Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
@@ -3753,7 +3750,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", service, group);
// Must remove Controller Service through Flow Controller in order to remove from cache
- flowController.removeControllerService(service);
+ flowController.getControllerServiceProvider().removeControllerService(service);
}
for (final String removedVersionedId : funnelsRemoved) {
@@ -3832,7 +3829,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed, final Set<String> variablesToSkip)
throws ProcessorInstantiationException {
- final ProcessGroup group = flowController.createProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
+ final ProcessGroup group = flowManager.createProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
group.setVersionedComponentId(proposed.getIdentifier());
group.setParent(destination);
updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, true, variablesToSkip);
@@ -3862,7 +3859,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final List<FlowFilePrioritizer> prioritizers = proposed.getPrioritizers() == null ? Collections.emptyList() : proposed.getPrioritizers().stream()
.map(prioritizerName -> {
try {
- return flowController.createPrioritizer(prioritizerName);
+ return flowManager.createPrioritizer(prioritizerName);
} catch (final Exception e) {
throw new IllegalStateException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier());
}
@@ -3908,7 +3905,7 @@ public final class StandardProcessGroup implements ProcessGroup {
destinationGroup.addConnection(connection);
updateConnection(connection, proposed);
- flowController.onConnectionAdded(connection);
+ flowManager.onConnectionAdded(connection);
return connection;
}
@@ -4071,7 +4068,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
final List<PropertyDescriptor> descriptors = new ArrayList<>(service.getProperties().keySet());
final Set<URL> additionalUrls = service.getAdditionalClasspathResources(descriptors);
- flowController.reload(service, proposed.getType(), newBundleCoordinate, additionalUrls);
+ flowController.getReloadComponent().reload(service, proposed.getType(), newBundleCoordinate, additionalUrls);
}
} finally {
service.resumeValidationTrigger();
@@ -4107,7 +4104,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final boolean firstTimeAdded = true;
final Set<URL> additionalUrls = Collections.emptySet();
- final ControllerServiceNode newService = flowController.createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded);
+ final ControllerServiceNode newService = flowManager.createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded, true);
newService.setVersionedComponentId(proposed.getIdentifier());
destination.addControllerService(newService);
@@ -4121,7 +4118,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final String componentIdSeed) {
- final Funnel funnel = flowController.createFunnel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
+ final Funnel funnel = flowManager.createFunnel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
funnel.setVersionedComponentId(proposed.getIdentifier());
destination.addFunnel(funnel);
updateFunnel(funnel, proposed);
@@ -4136,7 +4133,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
- final Port port = flowController.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
+ final Port port = flowManager.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
port.setVersionedComponentId(proposed.getIdentifier());
destination.addInputPort(port);
updatePort(port, proposed);
@@ -4145,7 +4142,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
- final Port port = flowController.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
+ final Port port = flowManager.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
port.setVersionedComponentId(proposed.getIdentifier());
destination.addOutputPort(port);
updatePort(port, proposed);
@@ -4154,7 +4151,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
private Label addLabel(final ProcessGroup destination, final VersionedLabel proposed, final String componentIdSeed) {
- final Label label = flowController.createLabel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getLabel());
+ final Label label = flowManager.createLabel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getLabel());
label.setVersionedComponentId(proposed.getIdentifier());
destination.addLabel(label);
updateLabel(label, proposed);
@@ -4171,7 +4168,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final String componentIdSeed) throws ProcessorInstantiationException {
final BundleCoordinate coordinate = toCoordinate(proposed.getBundle());
- final ProcessorNode procNode = flowController.createProcessor(proposed.getType(), generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), coordinate, true);
+ final ProcessorNode procNode = flowManager.createProcessor(proposed.getType(), generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), coordinate, true);
procNode.setVersionedComponentId(proposed.getIdentifier());
destination.addProcessor(procNode);
@@ -4204,7 +4201,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
final Set<URL> additionalUrls = processor.getAdditionalClasspathResources(descriptors);
- flowController.reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls);
+ flowController.getReloadComponent().reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls);
}
} finally {
processor.resumeValidationTrigger();
@@ -4276,7 +4273,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
- final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris());
+ final RemoteProcessGroup rpg = flowManager.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris());
rpg.setVersionedComponentId(proposed.getIdentifier());
destination.addRemoteProcessGroup(rpg);
@@ -4439,7 +4436,7 @@ public final class StandardProcessGroup implements ProcessGroup {
.forEach(port -> removedInputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
flowContents.getInputPorts().stream()
.map(VersionedPort::getIdentifier)
- .forEach(id -> removedInputPortsByVersionId.remove(id));
+ .forEach(removedInputPortsByVersionId::remove);
// Ensure that there are no incoming connections for any Input Port that was removed.
for (final Port inputPort : removedInputPortsByVersionId.values()) {
@@ -4457,7 +4454,7 @@ public final class StandardProcessGroup implements ProcessGroup {
.forEach(port -> removedOutputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
flowContents.getOutputPorts().stream()
.map(VersionedPort::getIdentifier)
- .forEach(id -> removedOutputPortsByVersionId.remove(id));
+ .forEach(removedOutputPortsByVersionId::remove);
// Ensure that there are no outgoing connections for any Output Port that was removed.
for (final Port outputPort : removedOutputPortsByVersionId.values()) {
@@ -4544,7 +4541,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (connectionToAdd.getPrioritizers() != null) {
for (final String prioritizerType : connectionToAdd.getPrioritizers()) {
try {
- flowController.createPrioritizer(prioritizerType);
+ flowManager.createPrioritizer(prioritizerType);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to create Prioritizer of type " + prioritizerType, e);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
index c610f8c..3287632 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
@@ -96,11 +96,9 @@ public class StandardLogRepository implements LogRepository {
try {
final LogObserver observer = removeObserver(observerIdentifier);
- if (observer == null) {
- throw new IllegalArgumentException("The specified observer cannot be found.");
+ if (observer != null) {
+ addObserver(observerIdentifier, level, observer);
}
-
- addObserver(observerIdentifier, level, observer);
} finally {
writeLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
index a0300ee..d90535c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
@@ -16,12 +16,13 @@
*/
package org.apache.nifi.processor;
-import java.io.File;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.NiFiProperties;
+
+import java.io.File;
public class StandardProcessorInitializationContext implements ProcessorInitializationContext {
@@ -29,17 +30,16 @@ public class StandardProcessorInitializationContext implements ProcessorInitiali
private final ComponentLog logger;
private final ControllerServiceProvider serviceProvider;
private final NodeTypeProvider nodeTypeProvider;
- private final NiFiProperties nifiProperties;
+ private final KerberosConfig kerberosConfig;
- public StandardProcessorInitializationContext(
- final String identifier, final ComponentLog componentLog,
+ public StandardProcessorInitializationContext(final String identifier, final ComponentLog componentLog,
final ControllerServiceProvider serviceProvider, final NodeTypeProvider nodeTypeProvider,
- final NiFiProperties nifiProperties) {
+ final KerberosConfig kerberosConfig) {
this.identifier = identifier;
this.logger = componentLog;
this.serviceProvider = serviceProvider;
this.nodeTypeProvider = nodeTypeProvider;
- this.nifiProperties = nifiProperties;
+ this.kerberosConfig = kerberosConfig;
}
@Override
@@ -64,16 +64,16 @@ public class StandardProcessorInitializationContext implements ProcessorInitiali
@Override
public String getKerberosServicePrincipal() {
- return nifiProperties.getKerberosServicePrincipal();
+ return kerberosConfig.getPrincipal();
}
@Override
public File getKerberosServiceKeytab() {
- return nifiProperties.getKerberosServiceKeytabLocation() == null ? null : new File(nifiProperties.getKerberosServiceKeytabLocation());
+ return kerberosConfig.getKeytabLocation();
}
@Override
public File getKerberosConfigurationFile() {
- return nifiProperties.getKerberosConfigurationFile();
+ return kerberosConfig.getConfigFile();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
new file mode 100644
index 0000000..8dc1a9d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.Processor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ComponentIdentifierLookup implements IdentifierLookup {
+ private final FlowController flowController;
+
+ public ComponentIdentifierLookup(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ @Override
+ public List<String> getComponentIdentifiers() {
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+
+ final List<String> componentIds = new ArrayList<>();
+ rootGroup.findAllProcessors().forEach(proc -> componentIds.add(proc.getIdentifier()));
+ rootGroup.getInputPorts().forEach(port -> componentIds.add(port.getIdentifier()));
+ rootGroup.getOutputPorts().forEach(port -> componentIds.add(port.getIdentifier()));
+
+ return componentIds;
+ }
+
+ @Override
+ public List<String> getComponentTypes() {
+ final Set<Class> procClasses = flowController.getExtensionManager().getExtensions(Processor.class);
+
+ final List<String> componentTypes = new ArrayList<>(procClasses.size() + 2);
+ componentTypes.add(ProvenanceEventRecord.REMOTE_INPUT_PORT_TYPE);
+ componentTypes.add(ProvenanceEventRecord.REMOTE_OUTPUT_PORT_TYPE);
+
+ procClasses.stream()
+ .map(Class::getSimpleName)
+ .forEach(componentTypes::add);
+
+ return componentTypes;
+ }
+
+ @Override
+ public List<String> getQueueIdentifiers() {
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+
+ return rootGroup.findAllConnections().stream()
+ .map(Connection::getIdentifier)
+ .collect(Collectors.toList());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/StandardProvenanceAuthorizableFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/StandardProvenanceAuthorizableFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/StandardProvenanceAuthorizableFactory.java
new file mode 100644
index 0000000..9eddf76
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/StandardProvenanceAuthorizableFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.authorization.resource.DataAuthorizable;
+import org.apache.nifi.authorization.resource.ProvenanceDataAuthorizable;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.web.ResourceNotFoundException;
+
+public class StandardProvenanceAuthorizableFactory implements ProvenanceAuthorizableFactory {
+ private final FlowController flowController;
+
+ public StandardProvenanceAuthorizableFactory(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+
+ @Override
+ public Authorizable createLocalDataAuthorizable(final String componentId) {
+ final FlowManager flowManager = flowController.getFlowManager();
+ final String rootGroupId = flowManager.getRootGroupId();
+
+ // Provenance Events are generated only by connectable components, with the exception of DOWNLOAD events,
+ // which have the root process group's identifier assigned as the component ID, and DROP events, which
+ // could have the connection identifier assigned as the component ID. So, we check if the component ID
+ // is set to the root group and otherwise assume that the ID is that of a connectable or connection.
+ final DataAuthorizable authorizable;
+ if (rootGroupId.equals(componentId)) {
+ authorizable = new DataAuthorizable(flowManager.getRootGroup());
+ } else {
+ // check if the component is a connectable, this should be the case most often
+ final Connectable connectable = flowManager.findConnectable(componentId);
+ if (connectable == null) {
+ // if the component id is not a connectable then consider a connection
+ final Connection connection = flowManager.getRootGroup().findConnection(componentId);
+
+ if (connection == null) {
+ throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");
+ } else {
+ // authorizable for connection data is associated with the source connectable
+ authorizable = new DataAuthorizable(connection.getSource());
+ }
+ } else {
+ authorizable = new DataAuthorizable(connectable);
+ }
+ }
+
+ return authorizable;
+ }
+
+
+
+ @Override
+ public Authorizable createRemoteDataAuthorizable(String remoteGroupPortId) {
+ final DataAuthorizable authorizable;
+
+ final RemoteGroupPort remoteGroupPort = flowController.getFlowManager().getRootGroup().findRemoteGroupPort(remoteGroupPortId);
+ if (remoteGroupPort == null) {
+ throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");
+ } else {
+ // authorizable for remote group ports should be the remote process group
+ authorizable = new DataAuthorizable(remoteGroupPort.getRemoteProcessGroup());
+ }
+
+ return authorizable;
+ }
+
+ @Override
+ public Authorizable createProvenanceDataAuthorizable(String componentId) {
+ final FlowManager flowManager = flowController.getFlowManager();
+ final String rootGroupId = flowManager.getRootGroupId();
+
+ // Provenance Events are generated only by connectable components, with the exception of DOWNLOAD events,
+ // which have the root process group's identifier assigned as the component ID, and DROP events, which
+ // could have the connection identifier assigned as the component ID. So, we check if the component ID
+ // is set to the root group and otherwise assume that the ID is that of a connectable or connection.
+ final ProvenanceDataAuthorizable authorizable;
+ if (rootGroupId.equals(componentId)) {
+ authorizable = new ProvenanceDataAuthorizable(flowManager.getRootGroup());
+ } else {
+ // check if the component is a connectable, this should be the case most often
+ final Connectable connectable = flowManager.findConnectable(componentId);
+ if (connectable == null) {
+ // if the component id is not a connectable then consider a connection
+ final Connection connection = flowManager.getRootGroup().findConnection(componentId);
+
+ if (connection == null) {
+ throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");
+ } else {
+ // authorizable for connection data is associated with the source connectable
+ authorizable = new ProvenanceDataAuthorizable(connection.getSource());
+ }
+ } else {
+ authorizable = new ProvenanceDataAuthorizable(connectable);
+ }
+ }
+
+ return authorizable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index c4621e6..fe78a59 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -16,40 +16,6 @@
*/
package org.apache.nifi.remote;
-import static java.util.Objects.requireNonNull;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.core.Response;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
@@ -60,7 +26,6 @@ import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
-import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.CommunicationsException;
@@ -84,10 +49,43 @@ import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
/**
* Represents the Root Process Group of a remote NiFi Instance. Holds
- * information about that remote instance, as well as {@link IncomingPort}s and
- * {@link OutgoingPort}s for communicating with the remote instance.
+ * information about that remote instance, as well as Incoming Ports and
+ * Outgoing Ports for communicating with the remote instance.
*/
public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@@ -148,8 +146,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final ScheduledExecutorService backgroundThreadExecutor;
- public StandardRemoteProcessGroup(final String id, final String targetUris, final ProcessGroup processGroup,
- final FlowController flowController, final SSLContext sslContext, final NiFiProperties nifiProperties) {
+ public StandardRemoteProcessGroup(final String id, final String targetUris, final ProcessGroup processGroup, final ProcessScheduler processScheduler,
+ final BulletinRepository bulletinRepository, final SSLContext sslContext, final NiFiProperties nifiProperties) {
this.nifiProperties = nifiProperties;
this.id = requireNonNull(id);
@@ -157,13 +155,12 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
this.targetId = null;
this.processGroup = new AtomicReference<>(processGroup);
this.sslContext = sslContext;
- this.scheduler = flowController.getProcessScheduler();
+ this.scheduler = processScheduler;
this.authorizationIssue = "Establishing connection to " + targetUris;
final String expirationPeriod = nifiProperties.getProperty(NiFiProperties.REMOTE_CONTENTS_CACHE_EXPIRATION, "30 secs");
remoteContentsCacheExpiration = FormatUtils.getTimeDuration(expirationPeriod, TimeUnit.MILLISECONDS);
- final BulletinRepository bulletinRepository = flowController.getBulletinRepository();
eventReporter = new EventReporter() {
private static final long serialVersionUID = 1L;
@@ -700,7 +697,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
/**
- * @return a set of {@link OutgoingPort}s used for transmitting FlowFiles to
+ * @return a set of Outgoing Ports used for transmitting FlowFiles to
* the remote instance
*/
@Override