You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/08/17 14:43:22 UTC
[5/9] nifi git commit: NIFI-4224: - Initial implementation of Process
Group level Variable Registry - Updated to incorporate PR Feedback - Changed
log message because slf4j-simple apparently has a memory leak;
passing a String instead of passing in the C
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 9611b73..07923c6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -16,6 +16,14 @@
*/
package org.apache.nifi.controller.reporting;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
@@ -33,7 +41,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.CharacterFilterUtils;
@@ -42,14 +50,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.AnnotationUtils;
-import java.net.URL;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode {
private static final Logger LOG = LoggerFactory.getLogger(AbstractReportingTaskNode.class);
@@ -66,7 +66,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
- final ValidationContextFactory validationContextFactory, final VariableRegistry variableRegistry,
+ final ValidationContextFactory validationContextFactory, final ComponentVariableRegistry variableRegistry,
final ReloadComponent reloadComponent) {
this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory,
@@ -77,7 +77,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
- final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry,
+ final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry,
final ReloadComponent reloadComponent, final boolean isExtensionMissing) {
super(id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing);
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index 53b7d15..40bdf8b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -22,13 +22,13 @@ import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
-import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.LoggableComponent;
import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingTask;
@@ -38,14 +38,14 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
- final VariableRegistry variableRegistry, final ReloadComponent reloadComponent) {
+ final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent) {
super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, reloadComponent);
this.flowController = controller;
}
public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
- final String componentType, final String canonicalClassName, final VariableRegistry variableRegistry,
+ final String componentType, final String canonicalClassName, final ComponentVariableRegistry variableRegistry,
final ReloadComponent reloadComponent, final boolean isExtensionMissing) {
super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,
variableRegistry, reloadComponent, isExtensionMissing);
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 0af8657..58d25bb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -45,7 +45,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.Connectables;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ReflectionUtils;
@@ -62,7 +61,6 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
private final ProcessContextFactory contextFactory;
private final AtomicInteger maxThreadCount;
private final StringEncryptor encryptor;
- private final VariableRegistry variableRegistry;
private volatile String adminYieldDuration = "1 sec";
@@ -70,8 +68,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = new ConcurrentHashMap<>();
public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider,
- final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor,
- final VariableRegistry variableRegistry) {
+ final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) {
super(flowEngine);
this.serviceProvider = serviceProvider;
this.stateManagerProvider = stateManagerProvider;
@@ -79,7 +76,6 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
this.contextFactory = contextFactory;
this.maxThreadCount = new AtomicInteger(maxThreadCount);
this.encryptor = encryptor;
- this.variableRegistry = variableRegistry;
for (int i = 0; i < maxThreadCount; i++) {
final Runnable eventDrivenTask = new EventDrivenTask(workerQueue);
@@ -188,8 +184,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
if (connectable instanceof ProcessorNode) {
final ProcessorNode procNode = (ProcessorNode) connectable;
- final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider,
- encryptor, getStateManager(connectable.getIdentifier()), variableRegistry);
+ final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, getStateManager(connectable.getIdentifier()));
final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
final ProcessSessionFactory sessionFactory;
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index bd3c2bb..eb855d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -38,7 +38,6 @@ import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.FormatUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
@@ -51,18 +50,15 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
private final FlowController flowController;
private final ProcessContextFactory contextFactory;
private final StringEncryptor encryptor;
- private final VariableRegistry variableRegistry;
private volatile String adminYieldDuration = "1 sec";
private final Map<Object, List<AtomicBoolean>> canceledTriggers = new HashMap<>();
- public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor enryptor,
- final VariableRegistry variableRegistry) {
+ public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor enryptor) {
super(flowEngine);
this.flowController = flowController;
this.contextFactory = contextFactory;
this.encryptor = enryptor;
- this.variableRegistry = variableRegistry;
}
private StateManager getStateManager(final String componentId) {
@@ -145,7 +141,7 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
final ProcessorNode procNode = (ProcessorNode) connectable;
- final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()), variableRegistry);
+ final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()));
ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext);
continuallyRunTask = runnableTask;
} else {
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 5368d37..122e50c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -53,7 +53,6 @@ import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
-import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
@@ -84,19 +83,16 @@ public final class StandardProcessScheduler implements ProcessScheduler {
private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(8, "StandardProcessScheduler", true);
private final StringEncryptor encryptor;
- private final VariableRegistry variableRegistry;
public StandardProcessScheduler(
final ControllerServiceProvider controllerServiceProvider,
final StringEncryptor encryptor,
final StateManagerProvider stateManagerProvider,
- final VariableRegistry variableRegistry,
final NiFiProperties nifiProperties
) {
this.controllerServiceProvider = controllerServiceProvider;
this.encryptor = encryptor;
this.stateManagerProvider = stateManagerProvider;
- this.variableRegistry = variableRegistry;
administrativeYieldDuration = nifiProperties.getAdministrativeYieldDuration();
administrativeYieldMillis = FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS);
@@ -301,15 +297,17 @@ public final class StandardProcessScheduler implements ProcessScheduler {
* @see StandardProcessorNode#start(ScheduledExecutorService, long, org.apache.nifi.processor.ProcessContext, Runnable).
*/
@Override
- public synchronized void startProcessor(final ProcessorNode procNode) {
+ public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode procNode) {
StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider,
- this.encryptor, getStateManager(procNode.getIdentifier()), variableRegistry);
+ this.encryptor, getStateManager(procNode.getIdentifier()));
final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode));
+ final CompletableFuture<Void> future = new CompletableFuture<>();
SchedulingAgentCallback callback = new SchedulingAgentCallback() {
@Override
public void trigger() {
getSchedulingAgent(procNode).schedule(procNode, scheduleState);
+ future.complete(null);
}
@Override
@@ -324,7 +322,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
};
+ LOG.info("Starting {}", procNode);
procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback);
+ return future;
}
/**
@@ -335,12 +335,13 @@ public final class StandardProcessScheduler implements ProcessScheduler {
* @see StandardProcessorNode#stop(ScheduledExecutorService, org.apache.nifi.processor.ProcessContext, SchedulingAgent, ScheduleState)
*/
@Override
- public synchronized void stopProcessor(final ProcessorNode procNode) {
+ public synchronized CompletableFuture<Void> stopProcessor(final ProcessorNode procNode) {
StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider,
- this.encryptor, getStateManager(procNode.getIdentifier()), variableRegistry);
+ this.encryptor, getStateManager(procNode.getIdentifier()));
final ScheduleState state = getScheduleState(procNode);
- procNode.stop(this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state);
+ LOG.info("Stopping {}", procNode);
+ return procNode.stop(this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state);
}
@Override
@@ -537,20 +538,35 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override
public CompletableFuture<Void> enableControllerService(final ControllerServiceNode service) {
+ LOG.info("Enabling " + service);
return service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis);
}
@Override
- public void disableControllerService(final ControllerServiceNode service) {
- service.disable(this.componentLifeCycleThreadPool);
+ public CompletableFuture<Void> disableControllerService(final ControllerServiceNode service) {
+ LOG.info("Disabling {}", service);
+ return service.disable(this.componentLifeCycleThreadPool);
}
@Override
- public void disableControllerServices(final List<ControllerServiceNode> services) {
+ public CompletableFuture<Void> disableControllerServices(final List<ControllerServiceNode> services) {
+ if (services == null || services.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ CompletableFuture<Void> future = null;
if (!requireNonNull(services).isEmpty()) {
for (ControllerServiceNode controllerServiceNode : services) {
- this.disableControllerService(controllerServiceNode);
+ final CompletableFuture<Void> serviceFuture = this.disableControllerService(controllerServiceNode);
+
+ if (future == null) {
+ future = serviceFuture;
+ } else {
+ future = CompletableFuture.allOf(future, serviceFuture);
+ }
}
}
+
+ return future;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index a82fde4..9dc329a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -37,7 +37,6 @@ import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
@@ -51,7 +50,6 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
private final FlowController flowController;
private final ProcessContextFactory contextFactory;
private final StringEncryptor encryptor;
- private final VariableRegistry variableRegistry;
private volatile String adminYieldDuration = "1 sec";
@@ -60,13 +58,11 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
final FlowEngine flowEngine,
final ProcessContextFactory contextFactory,
final StringEncryptor encryptor,
- final VariableRegistry variableRegistry,
final NiFiProperties nifiProperties) {
super(flowEngine);
this.flowController = flowController;
this.contextFactory = contextFactory;
this.encryptor = encryptor;
- this.variableRegistry = variableRegistry;
final String boredYieldDuration = nifiProperties.getBoredYieldDuration();
try {
@@ -109,7 +105,7 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
// Determine the task to run and create it.
if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
final ProcessorNode procNode = (ProcessorNode) connectable;
- final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()), variableRegistry);
+ final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()));
final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController,
contextFactory, scheduleState, standardProcContext);
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index 61d9d29..f30a71e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -144,6 +144,16 @@ public class FlowFromDOMFactory {
dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
dto.setComments(getString(element, "comment"));
+ final Map<String, String> variables = new HashMap<>();
+ final NodeList variableList = DomUtils.getChildNodesByTagName(element, "variable");
+ for (int i = 0; i < variableList.getLength(); i++) {
+ final Element variableElement = (Element) variableList.item(i);
+ final String name = variableElement.getAttribute("name");
+ final String value = variableElement.getAttribute("value");
+ variables.put(name, value);
+ }
+ dto.setVariables(variables);
+
final Set<ProcessorDTO> processors = new HashSet<>();
final Set<ConnectionDTO> connections = new HashSet<>();
final Set<FunnelDTO> funnels = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index 4a9a0f7..2a8df96 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -37,6 +37,8 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.util.CharacterFilterUtils;
@@ -70,7 +72,7 @@ import java.util.concurrent.TimeUnit;
*/
public class StandardFlowSerializer implements FlowSerializer {
- private static final String MAX_ENCODING_VERSION = "1.1";
+ private static final String MAX_ENCODING_VERSION = "1.2";
private final StringEncryptor encryptor;
@@ -202,6 +204,18 @@ public class StandardFlowSerializer implements FlowSerializer {
for (final Template template : group.getTemplates()) {
addTemplate(element, template);
}
+
+ final VariableRegistry variableRegistry = group.getVariableRegistry();
+ for (final Map.Entry<VariableDescriptor, String> entry : variableRegistry.getVariableMap().entrySet()) {
+ addVariable(element, entry.getKey().getName(), entry.getValue());
+ }
+ }
+
+ private static void addVariable(final Element parentElement, final String variableName, final String variableValue) {
+ final Element variableElement = parentElement.getOwnerDocument().createElement("variable");
+ variableElement.setAttribute("name", variableName);
+ variableElement.setAttribute("value", variableValue);
+ parentElement.appendChild(variableElement);
}
private static void addBundle(final Element parentElement, final BundleCoordinate coordinate) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
new file mode 100644
index 0000000..148b847
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class ServiceStateTransition {
+ private ControllerServiceState state = ControllerServiceState.DISABLED;
+ private final List<CompletableFuture<?>> enabledFutures = new ArrayList<>();
+ private final List<CompletableFuture<?>> disabledFutures = new ArrayList<>();
+
+
+ public synchronized boolean transitionToEnabling(final ControllerServiceState expectedState, final CompletableFuture<?> enabledFuture) {
+ if (expectedState != state) {
+ return false;
+ }
+
+ state = ControllerServiceState.ENABLING;
+ enabledFutures.add(enabledFuture);
+ return true;
+ }
+
+ public synchronized boolean enable() {
+ if (state != ControllerServiceState.ENABLING) {
+ return false;
+ }
+
+ state = ControllerServiceState.ENABLED;
+ enabledFutures.stream().forEach(future -> future.complete(null));
+ return true;
+ }
+
+ public synchronized boolean transitionToDisabling(final ControllerServiceState expectedState, final CompletableFuture<?> disabledFuture) {
+ if (expectedState != state) {
+ return false;
+ }
+
+ state = ControllerServiceState.DISABLING;
+ disabledFutures.add(disabledFuture);
+ return true;
+ }
+
+ public synchronized void disable() {
+ state = ControllerServiceState.DISABLED;
+ disabledFutures.stream().forEach(future -> future.complete(null));
+ }
+
+ public synchronized ControllerServiceState getState() {
+ return state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index fa4ab84..216a996 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,6 +16,24 @@
*/
package org.apache.nifi.controller.service;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
@@ -30,48 +48,30 @@ import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent;
-import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.LoggableComponent;
+import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
private final AtomicReference<ControllerServiceDetails> controllerServiceHolder = new AtomicReference<>(null);
private final ControllerServiceProvider serviceProvider;
- private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED);
+ private final ServiceStateTransition stateTransition = new ServiceStateTransition();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@@ -85,7 +85,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService,
final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory,
- final ControllerServiceProvider serviceProvider, final VariableRegistry variableRegistry, final ReloadComponent reloadComponent) {
+ final ControllerServiceProvider serviceProvider, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent) {
this(implementation, proxiedControllerService, invocationHandler, id, validationContextFactory, serviceProvider,
implementation.getComponent().getClass().getSimpleName(), implementation.getComponent().getClass().getCanonicalName(), variableRegistry, reloadComponent, false);
@@ -94,7 +94,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService,
final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory,
final ControllerServiceProvider serviceProvider, final String componentType, final String componentCanonicalClass,
- final VariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) {
+ final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) {
super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing);
this.serviceProvider = serviceProvider;
@@ -363,7 +363,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public ControllerServiceState getState() {
- return stateRef.get();
+ return stateTransition.getState();
}
@Override
@@ -394,7 +394,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
public CompletableFuture<Void> enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) {
final CompletableFuture<Void> future = new CompletableFuture<>();
- if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) {
+ if (this.stateTransition.transitionToEnabling(ControllerServiceState.DISABLED, future)) {
synchronized (active) {
this.active.set(true);
}
@@ -410,17 +410,15 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
boolean shouldEnable = false;
synchronized (active) {
- shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED);
+ shouldEnable = active.get() && stateTransition.enable();
}
- future.complete(null);
-
if (!shouldEnable) {
LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated.");
// Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be
// set to DISABLING (see disable() operation)
invokeDisable(configContext);
- stateRef.set(ControllerServiceState.DISABLED);
+ stateTransition.disable();
}
} catch (Exception e) {
future.completeExceptionally(e);
@@ -437,7 +435,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
}
- stateRef.set(ControllerServiceState.DISABLED);
+ stateTransition.disable();
}
}
}
@@ -464,7 +462,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
* DISABLED state.
*/
@Override
- public void disable(ScheduledExecutorService scheduler) {
+ public CompletableFuture<Void> disable(ScheduledExecutorService scheduler) {
/*
* The reason for synchronization is to ensure consistency of the
* service state when another thread is in the middle of enabling this
@@ -475,7 +473,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
this.active.set(false);
}
- if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ if (this.stateTransition.transitionToDisabling(ControllerServiceState.ENABLED, future)) {
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, getVariableRegistry());
scheduler.execute(new Runnable() {
@Override
@@ -483,13 +482,15 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
try {
invokeDisable(configContext);
} finally {
- stateRef.set(ControllerServiceState.DISABLED);
+ stateTransition.disable();
}
}
});
} else {
- this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING);
+ this.stateTransition.transitionToDisabling(ControllerServiceState.ENABLING, future);
}
+
+ return future;
}
/**
@@ -515,7 +516,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
Collection<ValidationResult> results = null;
- if (stateRef.get() == ControllerServiceState.DISABLED) {
+ if (getState() == ControllerServiceState.DISABLED) {
results = super.getValidationErrors(serviceIdentifiersNotToValidate);
}
return results != null ? results : Collections.emptySet();
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 5c4e394..0745ed0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
@@ -62,7 +63,9 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.NiFiProperties;
@@ -148,8 +151,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(originalService, bundleCoordinate, serviceLogger);
final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, serviceLogger);
+ final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler,
- id, validationContextFactory, this, variableRegistry, flowController);
+ id, validationContextFactory, this, componentVarRegistry, flowController);
serviceNode.setName(rawClass.getSimpleName());
invocationHandler.setServiceNode(serviceNode);
@@ -226,8 +230,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, null);
+ final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, id,
- new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry, flowController, true);
+ new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, componentVarRegistry, flowController, true);
serviceCache.putIfAbsent(id, serviceNode);
return serviceNode;
@@ -235,9 +240,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public Set<ConfiguredComponent> disableReferencingServices(final ControllerServiceNode serviceNode) {
- // Get a list of all Controller Services that need to be disabled, in the order that they need to be
- // disabled.
- final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ // Get a list of all Controller Services that need to be disabled, in the order that they need to be disabled.
+ final List<ControllerServiceNode> toDisable = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
@@ -258,8 +262,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public Set<ConfiguredComponent> scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
// find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
// or a service that references this controller service, etc.
- final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
- final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+ final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
+ final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
final Set<ConfiguredComponent> updated = new HashSet<>();
@@ -298,8 +302,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public Set<ConfiguredComponent> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
// find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
// or a service that references this controller service, etc.
- final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
- final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+ final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
+ final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
final Set<ConfiguredComponent> updated = new HashSet<>();
@@ -333,7 +337,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
@Override
- public Future<Void> enableControllerService(final ControllerServiceNode serviceNode) {
+ public CompletableFuture<Void> enableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanEnable();
return processScheduler.enableControllerService(serviceNode);
}
@@ -450,9 +454,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
@Override
- public void disableControllerService(final ControllerServiceNode serviceNode) {
+ public CompletableFuture<Void> disableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanDisable();
- processScheduler.disableControllerService(serviceNode);
+ return processScheduler.disableControllerService(serviceNode);
}
@Override
@@ -589,43 +593,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
return allServices;
}
- /**
- * Returns a List of all components that reference the given referencedNode
- * (either directly or indirectly through another service) that are also of
- * the given componentType. The list that is returned is in the order in
- * which they will need to be 'activated' (enabled/started).
- *
- * @param referencedNode node
- * @param componentType type
- * @return list of components
- */
- private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) {
- final List<T> references = new ArrayList<>();
-
- for (final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents()) {
- if (componentType.isAssignableFrom(referencingComponent.getClass())) {
- references.add(componentType.cast(referencingComponent));
- }
-
- if (referencingComponent instanceof ControllerServiceNode) {
- final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent;
-
- // find components recursively that depend on referencingNode.
- final List<T> recursive = findRecursiveReferences(referencingNode, componentType);
-
- // For anything that depends on referencing node, we want to add it to the list, but we know
- // that it must come after the referencing node, so we first remove any existing occurrence.
- references.removeAll(recursive);
- references.addAll(recursive);
- }
- }
-
- return references;
- }
@Override
public Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode) {
- final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ final List<ControllerServiceNode> recursiveReferences = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
logger.debug("Enabling the following Referencing Services for {}: {}", serviceNode, recursiveReferences);
return enableReferencingServices(serviceNode, recursiveReferences);
}
@@ -658,7 +629,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
- final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
for (final ControllerServiceNode referencingService : referencingServices) {
@@ -668,9 +639,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
- final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
- final List<ReportingTaskNode> referencingReportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
- final List<ProcessorNode> referencingProcessors = findRecursiveReferences(serviceNode, ProcessorNode.class);
+ final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
+ final List<ReportingTaskNode> referencingReportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
+ final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
@@ -689,9 +660,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
- // Get a list of all Controller Services that need to be disabled, in the order that they need to be
- // disabled.
- final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ // Get a list of all Controller Services that need to be disabled, in the order that they need to be disabled.
+ final List<ControllerServiceNode> toDisable = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
for (final ControllerServiceNode nodeToDisable : toDisable) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
index d2f3833..285b8dc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
@@ -16,8 +16,10 @@
*/
package org.apache.nifi.controller.service;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import org.apache.nifi.controller.ConfiguredComponent;
@@ -101,4 +103,35 @@ public class StandardControllerServiceReference implements ControllerServiceRefe
return references;
}
+
+
+ @Override
+ public <T> List<T> findRecursiveReferences(final Class<T> componentType) {
+ return findRecursiveReferences(referenced, componentType);
+ }
+
+ private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) {
+ final List<T> references = new ArrayList<>();
+
+ for (final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents()) {
+ if (componentType.isAssignableFrom(referencingComponent.getClass())) {
+ references.add(componentType.cast(referencingComponent));
+ }
+
+ if (referencingComponent instanceof ControllerServiceNode) {
+ final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent;
+
+ // find components recursively that depend on referencingNode.
+ final List<T> recursive = findRecursiveReferences(referencingNode, componentType);
+
+ // For anything that depends on referencing node, we want to add it to the list, but we know
+ // that it must come after the referencing node, so we first remove any existing occurrence.
+ references.removeAll(recursive);
+ references.addAll(recursive);
+ }
+ }
+
+ return references;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index 1ef3e8b..3957955 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -16,6 +16,24 @@
*/
package org.apache.nifi.fingerprint;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
@@ -38,23 +56,6 @@ import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
-import javax.xml.XMLConstants;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
/**
* <p>Creates a fingerprint of a flow.xml. The order of elements or attributes in the flow.xml does not influence the fingerprint generation.
*
@@ -324,9 +325,22 @@ public class FingerprintFactory {
addFunnelFingerprint(builder, funnelElem);
}
+ // add variables
+ final NodeList variableElems = DomUtils.getChildNodesByTagName(processGroupElem, "variable");
+ final List<Element> sortedVarList = sortElements(variableElems, getVariableNameComparator());
+ for (final Element varElem : sortedVarList) {
+ addVariableFingerprint(builder, varElem);
+ }
+
return builder;
}
+ private void addVariableFingerprint(final StringBuilder builder, final Element variableElement) {
+ final String variableName = variableElement.getAttribute("name");
+ final String variableValue = variableElement.getAttribute("value");
+ builder.append(variableName).append("=").append(variableValue);
+ }
+
private StringBuilder addFlowFileProcessorFingerprint(final StringBuilder builder, final Element processorElem) throws FingerprintException {
// id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "id"));
@@ -662,6 +676,27 @@ public class FingerprintFactory {
};
}
+ private Comparator<Element> getVariableNameComparator() {
+ return new Comparator<Element>() {
+ @Override
+ public int compare(final Element e1, final Element e2) {
+ if (e1 == null && e2 == null) {
+ return 0;
+ }
+ if (e1 == null) {
+ return 1;
+ }
+ if (e2 == null) {
+ return -1;
+ }
+
+ final String varName1 = e1.getAttribute("name");
+ final String varName2 = e2.getAttribute("name");
+ return varName1.compareTo(varName2);
+ }
+ };
+ }
+
private Comparator<Element> getProcessorPropertiesComparator() {
return new Comparator<Element>() {
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 2907704..2b7b51d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -16,13 +16,31 @@
*/
package org.apache.nifi.groups;
-import com.google.common.collect.Sets;
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.attribute.expression.language.Query;
+import org.apache.nifi.attribute.expression.language.VariableImpact;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
@@ -39,6 +57,7 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
@@ -50,13 +69,15 @@ import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.StandardProcessContext;
-import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.util.NiFiProperties;
@@ -66,20 +87,7 @@ import org.apache.nifi.web.api.dto.TemplateDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
+import com.google.common.collect.Sets;
public final class StandardProcessGroup implements ProcessGroup {
@@ -104,7 +112,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>();
private final Map<String, Template> templates = new HashMap<>();
private final StringEncryptor encryptor;
- private final VariableRegistry variableRegistry;
+ private final MutableVariableRegistry variableRegistry;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@@ -114,7 +122,7 @@ public final class StandardProcessGroup implements ProcessGroup {
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final StandardProcessScheduler scheduler,
final NiFiProperties nifiProps, final StringEncryptor encryptor, final FlowController flowController,
- final VariableRegistry variableRegistry) {
+ final MutableVariableRegistry variableRegistry) {
this.id = id;
this.controllerServiceProvider = serviceProvider;
this.parent = new AtomicReference<>();
@@ -361,7 +369,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private void shutdown(final ProcessGroup procGroup) {
for (final ProcessorNode node : procGroup.getProcessors()) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(node.getProcessor().getClass(), node.getIdentifier())) {
- final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), variableRegistry);
+ final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()));
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext);
}
}
@@ -548,6 +556,8 @@ public final class StandardProcessGroup implements ProcessGroup {
writeLock.lock();
try {
group.setParent(this);
+ group.getVariableRegistry().setParent(getVariableRegistry());
+
processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
flowController.onProcessGroupAdded(group);
} finally {
@@ -709,6 +719,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
processor.setProcessGroup(this);
+ processor.getVariableRegistry().setParent(getVariableRegistry());
processors.put(processorId, processor);
flowController.onProcessorAdded(processor);
} finally {
@@ -732,7 +743,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getProcessor().getClass(), processor.getIdentifier())) {
- final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), variableRegistry);
+ final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()));
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of processor with id " + processor.getIdentifier(), e);
@@ -1081,7 +1092,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
- public void startProcessor(final ProcessorNode processor) {
+ public CompletableFuture<Void> startProcessor(final ProcessorNode processor) {
readLock.lock();
try {
if (getProcessor(processor.getIdentifier()) == null) {
@@ -1092,10 +1103,10 @@ public final class StandardProcessGroup implements ProcessGroup {
if (state == ScheduledState.DISABLED) {
throw new IllegalStateException("Processor is disabled");
} else if (state == ScheduledState.RUNNING) {
- return;
+ return CompletableFuture.completedFuture(null);
}
- scheduler.startProcessor(processor);
+ return scheduler.startProcessor(processor);
} finally {
readLock.unlock();
}
@@ -1162,7 +1173,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
- public void stopProcessor(final ProcessorNode processor) {
+ public CompletableFuture<Void> stopProcessor(final ProcessorNode processor) {
readLock.lock();
try {
if (!processors.containsKey(processor.getIdentifier())) {
@@ -1173,10 +1184,10 @@ public final class StandardProcessGroup implements ProcessGroup {
if (state == ScheduledState.DISABLED) {
throw new IllegalStateException("Processor is disabled");
} else if (state == ScheduledState.STOPPED) {
- return;
+ return CompletableFuture.completedFuture(null);
}
- scheduler.stopProcessor(processor);
+ return scheduler.stopProcessor(processor);
} finally {
readLock.unlock();
}
@@ -1854,6 +1865,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
service.setProcessGroup(this);
+ service.getVariableRegistry().setParent(getVariableRegistry());
this.controllerServices.put(service.getIdentifier(), service);
LOG.info("{} added to {}", service, this);
} finally {
@@ -2583,4 +2595,129 @@ public final class StandardProcessGroup implements ProcessGroup {
readLock.unlock();
}
}
+
+ @Override
+ public MutableVariableRegistry getVariableRegistry() {
+ return variableRegistry;
+ }
+
+ @Override
+ public void verifyCanUpdateVariables(final Map<String, String> updatedVariables) {
+ if (updatedVariables == null || updatedVariables.isEmpty()) {
+ return;
+ }
+
+ readLock.lock();
+ try {
+ final Set<String> updatedVariableNames = getUpdatedVariables(updatedVariables);
+ if (updatedVariableNames.isEmpty()) {
+ return;
+ }
+
+ for (final ProcessorNode processor : findAllProcessors()) {
+ if (!processor.isRunning()) {
+ continue;
+ }
+
+ for (final String variableName : updatedVariableNames) {
+ for (final VariableImpact impact : getVariableImpact(processor)) {
+ if (impact.isImpacted(variableName)) {
+ throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + processor + ", which is currently running");
+ }
+ }
+ }
+ }
+
+ for (final ControllerServiceNode service : findAllControllerServices()) {
+ if (!service.isActive()) {
+ continue;
+ }
+
+ for (final String variableName : updatedVariableNames) {
+ for (final VariableImpact impact : getVariableImpact(service)) {
+ if (impact.isImpacted(variableName)) {
+ throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + service + ", which is currently running");
+ }
+ }
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<ConfiguredComponent> getComponentsAffectedByVariable(final String variableName) {
+ final Set<ConfiguredComponent> affected = new HashSet<>();
+
+ // Determine any Processors that references the variable
+ for (final ProcessorNode processor : findAllProcessors()) {
+ for (final VariableImpact impact : getVariableImpact(processor)) {
+ if (impact.isImpacted(variableName)) {
+ affected.add(processor);
+ }
+ }
+ }
+
+ // Determine any Controller Service that references the variable. If Service A references a variable,
+ // then that means that any other component that references that service is also affected, so recursively
+ // find any references to that service and add it.
+ for (final ControllerServiceNode service : findAllControllerServices()) {
+ for (final VariableImpact impact : getVariableImpact(service)) {
+ if (impact.isImpacted(variableName)) {
+ affected.add(service);
+
+ final ControllerServiceReference reference = service.getReferences();
+ affected.addAll(reference.findRecursiveReferences(ConfiguredComponent.class));
+ }
+ }
+ }
+
+ return affected;
+ }
+
+
+ private Set<String> getUpdatedVariables(final Map<String, String> newVariableValues) {
+ final Set<String> updatedVariableNames = new HashSet<>();
+
+ final MutableVariableRegistry registry = getVariableRegistry();
+ for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) {
+ final String varName = entry.getKey();
+ final String newValue = entry.getValue();
+
+ final String curValue = registry.getVariableValue(varName);
+ if (!Objects.equals(newValue, curValue)) {
+ updatedVariableNames.add(varName);
+ }
+ }
+
+ return updatedVariableNames;
+ }
+
+ private List<VariableImpact> getVariableImpact(final ConfiguredComponent component) {
+ return component.getProperties().values().stream()
+ .map(propVal -> Query.prepare(propVal).getVariableImpact())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void setVariables(final Map<String, String> variables) {
+ writeLock.lock();
+ try {
+ verifyCanUpdateVariables(variables);
+
+ if (variables == null) {
+ return;
+ }
+
+ final Map<VariableDescriptor, String> variableMap = new HashMap<>();
+ variables.entrySet().stream() // cannot use Collectors.toMap because value may be null
+ .forEach(entry -> variableMap.put(new VariableDescriptor(entry.getKey()), entry.getValue()));
+
+ variableRegistry.setVariables(variableMap);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 2714392..dfba330 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -26,8 +26,8 @@ import java.util.Set;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
-import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.attribute.expression.language.Query.Range;
+import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
@@ -37,7 +37,6 @@ import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.Connectables;
public class StandardProcessContext implements ProcessContext, ControllerServiceLookup {
@@ -47,15 +46,12 @@ public class StandardProcessContext implements ProcessContext, ControllerService
private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
private final StringEncryptor encryptor;
private final StateManager stateManager;
- private final VariableRegistry variableRegistry;
- public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager,
- final VariableRegistry variableRegistry) {
+ public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager) {
this.procNode = processorNode;
this.controllerServiceProvider = controllerServiceProvider;
this.encryptor = encryptor;
this.stateManager = stateManager;
- this.variableRegistry = variableRegistry;
preparedQueries = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : procNode.getProperties().entrySet()) {
@@ -93,12 +89,12 @@ public class StandardProcessContext implements ProcessContext, ControllerService
final String setPropertyValue = procNode.getProperty(descriptor);
final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue;
- return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor), variableRegistry);
+ return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor), procNode.getVariableRegistry());
}
@Override
public PropertyValue newPropertyValue(final String rawValue) {
- return new StandardPropertyValue(rawValue, this, Query.prepare(rawValue), variableRegistry);
+ return new StandardPropertyValue(rawValue, this, Query.prepare(rawValue), procNode.getVariableRegistry());
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
index 662169c..2f38aee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
@@ -53,8 +53,8 @@ public class StandardValidationContext implements ValidationContext {
private final String componentId;
public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map<PropertyDescriptor, String> properties,
- final String annotationData, final String groupId, final String componentId, VariableRegistry variableRegistry) {
- this(controllerServiceProvider, Collections.<String> emptySet(), properties, annotationData, groupId, componentId,variableRegistry);
+ final String annotationData, final String groupId, final String componentId, final VariableRegistry variableRegistry) {
+ this(controllerServiceProvider, Collections.<String> emptySet(), properties, annotationData, groupId, componentId, variableRegistry);
}
public StandardValidationContext(
@@ -63,7 +63,8 @@ public class StandardValidationContext implements ValidationContext {
final Map<PropertyDescriptor, String> properties,
final String annotationData,
final String groupId,
- final String componentId, VariableRegistry variableRegistry) {
+ final String componentId,
+ final VariableRegistry variableRegistry) {
this.controllerServiceProvider = controllerServiceProvider;
this.properties = new HashMap<>(properties);
this.annotationData = annotationData;