You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/08/17 14:43:22 UTC

[5/9] nifi git commit: NIFI-4224: - Initial implementation of Process Group level Variable Registry - Updated to incorporate PR Feedback - Changed log message because slf4j-simple apparently has a memory leak; passing a String instead of passing in the C

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 9611b73..07923c6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -16,6 +16,14 @@
  */
 package org.apache.nifi.controller.reporting;
 
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
@@ -33,7 +41,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.CharacterFilterUtils;
@@ -42,14 +50,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.AnnotationUtils;
 
-import java.net.URL;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractReportingTaskNode.class);
@@ -66,7 +66,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
 
     public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id,
                                      final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
-                                     final ValidationContextFactory validationContextFactory, final VariableRegistry variableRegistry,
+                                     final ValidationContextFactory validationContextFactory, final ComponentVariableRegistry variableRegistry,
                                      final ReloadComponent reloadComponent) {
 
         this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory,
@@ -77,7 +77,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
 
     public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider,
                                      final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
-                                     final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry,
+                                     final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry,
                                      final ReloadComponent reloadComponent, final boolean isExtensionMissing) {
 
         super(id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing);

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index 53b7d15..40bdf8b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -22,13 +22,13 @@ import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.resource.ResourceType;
-import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.LoggableComponent;
 import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingTask;
 
@@ -38,14 +38,14 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
 
     public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
                                      final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
-                                     final VariableRegistry variableRegistry, final ReloadComponent reloadComponent) {
+                                     final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent) {
         super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, reloadComponent);
         this.flowController = controller;
     }
 
     public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
                                      final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
-                                     final String componentType, final String canonicalClassName, final VariableRegistry variableRegistry,
+                                     final String componentType, final String canonicalClassName, final ComponentVariableRegistry variableRegistry,
                                      final ReloadComponent reloadComponent, final boolean isExtensionMissing) {
         super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,
                 variableRegistry, reloadComponent, isExtensionMissing);

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 0af8657..58d25bb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -45,7 +45,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.util.Connectables;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.ReflectionUtils;
@@ -62,7 +61,6 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
     private final ProcessContextFactory contextFactory;
     private final AtomicInteger maxThreadCount;
     private final StringEncryptor encryptor;
-    private final VariableRegistry variableRegistry;
 
     private volatile String adminYieldDuration = "1 sec";
 
@@ -70,8 +68,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
     private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = new ConcurrentHashMap<>();
 
     public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider,
-                                      final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor,
-                                      final VariableRegistry variableRegistry) {
+        final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) {
         super(flowEngine);
         this.serviceProvider = serviceProvider;
         this.stateManagerProvider = stateManagerProvider;
@@ -79,7 +76,6 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
         this.contextFactory = contextFactory;
         this.maxThreadCount = new AtomicInteger(maxThreadCount);
         this.encryptor = encryptor;
-        this.variableRegistry = variableRegistry;
 
         for (int i = 0; i < maxThreadCount; i++) {
             final Runnable eventDrivenTask = new EventDrivenTask(workerQueue);
@@ -188,8 +184,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
 
                 if (connectable instanceof ProcessorNode) {
                     final ProcessorNode procNode = (ProcessorNode) connectable;
-                    final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider,
-                        encryptor, getStateManager(connectable.getIdentifier()), variableRegistry);
+                    final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, getStateManager(connectable.getIdentifier()));
 
                     final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
                     final ProcessSessionFactory sessionFactory;

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index bd3c2bb..eb855d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -38,7 +38,6 @@ import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.processor.StandardProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.util.FormatUtils;
 import org.quartz.CronExpression;
 import org.slf4j.Logger;
@@ -51,18 +50,15 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
     private final FlowController flowController;
     private final ProcessContextFactory contextFactory;
     private final StringEncryptor encryptor;
-    private final VariableRegistry variableRegistry;
 
     private volatile String adminYieldDuration = "1 sec";
     private final Map<Object, List<AtomicBoolean>> canceledTriggers = new HashMap<>();
 
-    public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor enryptor,
-                                 final VariableRegistry variableRegistry) {
+    public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor enryptor) {
         super(flowEngine);
         this.flowController = flowController;
         this.contextFactory = contextFactory;
         this.encryptor = enryptor;
-        this.variableRegistry = variableRegistry;
     }
 
     private StateManager getStateManager(final String componentId) {
@@ -145,7 +141,7 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
             if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
                 final ProcessorNode procNode = (ProcessorNode) connectable;
 
-                final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()), variableRegistry);
+                final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()));
                 ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext);
                 continuallyRunTask = runnableTask;
             } else {

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 5368d37..122e50c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -53,7 +53,6 @@ import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardProcessContext;
-import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
@@ -84,19 +83,16 @@ public final class StandardProcessScheduler implements ProcessScheduler {
     private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(8, "StandardProcessScheduler", true);
 
     private final StringEncryptor encryptor;
-    private final VariableRegistry variableRegistry;
 
     public StandardProcessScheduler(
             final ControllerServiceProvider controllerServiceProvider,
             final StringEncryptor encryptor,
             final StateManagerProvider stateManagerProvider,
-            final VariableRegistry variableRegistry,
             final NiFiProperties nifiProperties
     ) {
         this.controllerServiceProvider = controllerServiceProvider;
         this.encryptor = encryptor;
         this.stateManagerProvider = stateManagerProvider;
-        this.variableRegistry = variableRegistry;
 
         administrativeYieldDuration = nifiProperties.getAdministrativeYieldDuration();
         administrativeYieldMillis = FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS);
@@ -301,15 +297,17 @@ public final class StandardProcessScheduler implements ProcessScheduler {
      * @see StandardProcessorNode#start(ScheduledExecutorService, long, org.apache.nifi.processor.ProcessContext, Runnable).
      */
     @Override
-    public synchronized void startProcessor(final ProcessorNode procNode) {
+    public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode procNode) {
         StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider,
-                this.encryptor, getStateManager(procNode.getIdentifier()), variableRegistry);
+            this.encryptor, getStateManager(procNode.getIdentifier()));
         final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode));
 
+        final CompletableFuture<Void> future = new CompletableFuture<>();
         SchedulingAgentCallback callback = new SchedulingAgentCallback() {
             @Override
             public void trigger() {
                 getSchedulingAgent(procNode).schedule(procNode, scheduleState);
+                future.complete(null);
             }
 
             @Override
@@ -324,7 +322,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
             }
         };
 
+        LOG.info("Starting {}", procNode);
         procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback);
+        return future;
     }
 
     /**
@@ -335,12 +335,13 @@ public final class StandardProcessScheduler implements ProcessScheduler {
      * @see StandardProcessorNode#stop(ScheduledExecutorService, org.apache.nifi.processor.ProcessContext, SchedulingAgent, ScheduleState)
      */
     @Override
-    public synchronized void stopProcessor(final ProcessorNode procNode) {
+    public synchronized CompletableFuture<Void> stopProcessor(final ProcessorNode procNode) {
         StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider,
-                this.encryptor, getStateManager(procNode.getIdentifier()), variableRegistry);
+            this.encryptor, getStateManager(procNode.getIdentifier()));
         final ScheduleState state = getScheduleState(procNode);
 
-        procNode.stop(this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state);
+        LOG.info("Stopping {}", procNode);
+        return procNode.stop(this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), state);
     }
 
     @Override
@@ -537,20 +538,35 @@ public final class StandardProcessScheduler implements ProcessScheduler {
 
     @Override
     public CompletableFuture<Void> enableControllerService(final ControllerServiceNode service) {
+        LOG.info("Enabling " + service);
         return service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis);
     }
 
     @Override
-    public void disableControllerService(final ControllerServiceNode service) {
-        service.disable(this.componentLifeCycleThreadPool);
+    public CompletableFuture<Void> disableControllerService(final ControllerServiceNode service) {
+        LOG.info("Disabling {}", service);
+        return service.disable(this.componentLifeCycleThreadPool);
     }
 
     @Override
-    public void disableControllerServices(final List<ControllerServiceNode> services) {
+    public CompletableFuture<Void> disableControllerServices(final List<ControllerServiceNode> services) {
+        if (services == null || services.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> future = null;
         if (!requireNonNull(services).isEmpty()) {
             for (ControllerServiceNode controllerServiceNode : services) {
-                this.disableControllerService(controllerServiceNode);
+                final CompletableFuture<Void> serviceFuture = this.disableControllerService(controllerServiceNode);
+
+                if (future == null) {
+                    future = serviceFuture;
+                } else {
+                    future = CompletableFuture.allOf(future, serviceFuture);
+                }
             }
         }
+
+        return future;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index a82fde4..9dc329a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -37,7 +37,6 @@ import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.StandardProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
@@ -51,7 +50,6 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
     private final FlowController flowController;
     private final ProcessContextFactory contextFactory;
     private final StringEncryptor encryptor;
-    private final VariableRegistry variableRegistry;
 
     private volatile String adminYieldDuration = "1 sec";
 
@@ -60,13 +58,11 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
             final FlowEngine flowEngine,
             final ProcessContextFactory contextFactory,
             final StringEncryptor encryptor,
-            final VariableRegistry variableRegistry,
             final NiFiProperties nifiProperties) {
         super(flowEngine);
         this.flowController = flowController;
         this.contextFactory = contextFactory;
         this.encryptor = encryptor;
-        this.variableRegistry = variableRegistry;
 
         final String boredYieldDuration = nifiProperties.getBoredYieldDuration();
         try {
@@ -109,7 +105,7 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
             // Determine the task to run and create it.
             if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
                 final ProcessorNode procNode = (ProcessorNode) connectable;
-                final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()), variableRegistry);
+                final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier()));
                 final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController,
                         contextFactory, scheduleState, standardProcContext);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index 61d9d29..f30a71e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -144,6 +144,16 @@ public class FlowFromDOMFactory {
         dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
         dto.setComments(getString(element, "comment"));
 
+        final Map<String, String> variables = new HashMap<>();
+        final NodeList variableList = DomUtils.getChildNodesByTagName(element, "variable");
+        for (int i = 0; i < variableList.getLength(); i++) {
+            final Element variableElement = (Element) variableList.item(i);
+            final String name = variableElement.getAttribute("name");
+            final String value = variableElement.getAttribute("value");
+            variables.put(name, value);
+        }
+        dto.setVariables(variables);
+
         final Set<ProcessorDTO> processors = new HashSet<>();
         final Set<ConnectionDTO> connections = new HashSet<>();
         final Set<FunnelDTO> funnels = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index 4a9a0f7..2a8df96 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -37,6 +37,8 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.persistence.TemplateSerializer;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.util.CharacterFilterUtils;
@@ -70,7 +72,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class StandardFlowSerializer implements FlowSerializer {
 
-    private static final String MAX_ENCODING_VERSION = "1.1";
+    private static final String MAX_ENCODING_VERSION = "1.2";
 
     private final StringEncryptor encryptor;
 
@@ -202,6 +204,18 @@ public class StandardFlowSerializer implements FlowSerializer {
         for (final Template template : group.getTemplates()) {
             addTemplate(element, template);
         }
+
+        final VariableRegistry variableRegistry = group.getVariableRegistry();
+        for (final Map.Entry<VariableDescriptor, String> entry : variableRegistry.getVariableMap().entrySet()) {
+            addVariable(element, entry.getKey().getName(), entry.getValue());
+        }
+    }
+
+    private static void addVariable(final Element parentElement, final String variableName, final String variableValue) {
+        final Element variableElement = parentElement.getOwnerDocument().createElement("variable");
+        variableElement.setAttribute("name", variableName);
+        variableElement.setAttribute("value", variableValue);
+        parentElement.appendChild(variableElement);
     }
 
     private static void addBundle(final Element parentElement, final BundleCoordinate coordinate) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
new file mode 100644
index 0000000..148b847
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class ServiceStateTransition {
+    private ControllerServiceState state = ControllerServiceState.DISABLED;
+    private final List<CompletableFuture<?>> enabledFutures = new ArrayList<>();
+    private final List<CompletableFuture<?>> disabledFutures = new ArrayList<>();
+
+
+    public synchronized boolean transitionToEnabling(final ControllerServiceState expectedState, final CompletableFuture<?> enabledFuture) {
+        if (expectedState != state) {
+            return false;
+        }
+
+        state = ControllerServiceState.ENABLING;
+        enabledFutures.add(enabledFuture);
+        return true;
+    }
+
+    public synchronized boolean enable() {
+        if (state != ControllerServiceState.ENABLING) {
+            return false;
+        }
+
+        state = ControllerServiceState.ENABLED;
+        enabledFutures.stream().forEach(future -> future.complete(null));
+        return true;
+    }
+
+    public synchronized boolean transitionToDisabling(final ControllerServiceState expectedState, final CompletableFuture<?> disabledFuture) {
+        if (expectedState != state) {
+            return false;
+        }
+
+        state = ControllerServiceState.DISABLING;
+        disabledFutures.add(disabledFuture);
+        return true;
+    }
+
+    public synchronized void disable() {
+        state = ControllerServiceState.DISABLED;
+        disabledFutures.stream().forEach(future -> future.complete(null));
+    }
+
+    public synchronized ControllerServiceState getState() {
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index fa4ab84..216a996 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,6 +16,24 @@
  */
 package org.apache.nifi.controller.service;
 
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.documentation.DeprecationNotice;
@@ -30,48 +48,30 @@ import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractConfiguredComponent;
-import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.LoggableComponent;
+import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.util.CharacterFilterUtils;
 import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
 
     private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
 
     private final AtomicReference<ControllerServiceDetails> controllerServiceHolder = new AtomicReference<>(null);
     private final ControllerServiceProvider serviceProvider;
-    private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED);
+    private final ServiceStateTransition stateTransition = new ServiceStateTransition();
 
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
@@ -85,7 +85,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
 
     public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService,
                                          final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory,
-                                         final ControllerServiceProvider serviceProvider, final VariableRegistry variableRegistry, final ReloadComponent reloadComponent) {
+                                         final ControllerServiceProvider serviceProvider, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent) {
 
         this(implementation, proxiedControllerService, invocationHandler, id, validationContextFactory, serviceProvider,
             implementation.getComponent().getClass().getSimpleName(), implementation.getComponent().getClass().getCanonicalName(), variableRegistry, reloadComponent, false);
@@ -94,7 +94,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService,
                                          final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory,
                                          final ControllerServiceProvider serviceProvider, final String componentType, final String componentCanonicalClass,
-                                         final VariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) {
+                                         final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) {
 
         super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing);
         this.serviceProvider = serviceProvider;
@@ -363,7 +363,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
 
     @Override
     public ControllerServiceState getState() {
-        return stateRef.get();
+        return stateTransition.getState();
     }
 
     @Override
@@ -394,7 +394,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     public CompletableFuture<Void> enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) {
         final CompletableFuture<Void> future = new CompletableFuture<>();
 
-        if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) {
+        if (this.stateTransition.transitionToEnabling(ControllerServiceState.DISABLED, future)) {
             synchronized (active) {
                 this.active.set(true);
             }
@@ -410,17 +410,15 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
 
                         boolean shouldEnable = false;
                         synchronized (active) {
-                            shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED);
+                            shouldEnable = active.get() && stateTransition.enable();
                         }
 
-                        future.complete(null);
-
                         if (!shouldEnable) {
                             LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated.");
                             // Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be
                             // set to DISABLING (see disable() operation)
                             invokeDisable(configContext);
-                            stateRef.set(ControllerServiceState.DISABLED);
+                            stateTransition.disable();
                         }
                     } catch (Exception e) {
                         future.completeExceptionally(e);
@@ -437,7 +435,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                             try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) {
                                 ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
                             }
-                            stateRef.set(ControllerServiceState.DISABLED);
+                            stateTransition.disable();
                         }
                     }
                 }
@@ -464,7 +462,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
      * DISABLED state.
      */
     @Override
-    public void disable(ScheduledExecutorService scheduler) {
+    public CompletableFuture<Void> disable(ScheduledExecutorService scheduler) {
         /*
          * The reason for synchronization is to ensure consistency of the
          * service state when another thread is in the middle of enabling this
@@ -475,7 +473,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
             this.active.set(false);
         }
 
-        if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        if (this.stateTransition.transitionToDisabling(ControllerServiceState.ENABLED, future)) {
             final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, getVariableRegistry());
             scheduler.execute(new Runnable() {
                 @Override
@@ -483,13 +482,15 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                     try {
                         invokeDisable(configContext);
                     } finally {
-                        stateRef.set(ControllerServiceState.DISABLED);
+                        stateTransition.disable();
                     }
                 }
             });
         } else {
-            this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING);
+            this.stateTransition.transitionToDisabling(ControllerServiceState.ENABLING, future);
         }
+
+        return future;
     }
 
     /**
@@ -515,7 +516,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     @Override
     public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
         Collection<ValidationResult> results = null;
-        if (stateRef.get() == ControllerServiceState.DISABLED) {
+        if (getState() == ControllerServiceState.DISABLED) {
             results = super.getValidationErrors(serviceIdentifiersNotToValidate);
         }
         return results != null ? results : Collections.emptySet();

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 5c4e394..0745ed0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
@@ -62,7 +63,9 @@ import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.NiFiProperties;
@@ -148,8 +151,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(originalService, bundleCoordinate, serviceLogger);
             final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, serviceLogger);
 
+            final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
             final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler,
-                    id, validationContextFactory, this, variableRegistry, flowController);
+                    id, validationContextFactory, this, componentVarRegistry, flowController);
             serviceNode.setName(rawClass.getSimpleName());
 
             invocationHandler.setServiceNode(serviceNode);
@@ -226,8 +230,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
         final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, null);
 
+        final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
         final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, id,
-                new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry, flowController, true);
+                new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, componentVarRegistry, flowController, true);
 
         serviceCache.putIfAbsent(id, serviceNode);
         return serviceNode;
@@ -235,9 +240,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     @Override
     public Set<ConfiguredComponent> disableReferencingServices(final ControllerServiceNode serviceNode) {
-        // Get a list of all Controller Services that need to be disabled, in the order that they need to be
-        // disabled.
-        final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+        // Get a list of all Controller Services that need to be disabled, in the order that they need to be disabled.
+        final List<ControllerServiceNode> toDisable = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
 
         final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
 
@@ -258,8 +262,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     public Set<ConfiguredComponent> scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
         // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
         // or a service that references this controller service, etc.
-        final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
-        final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+        final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
+        final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
 
         final Set<ConfiguredComponent> updated = new HashSet<>();
 
@@ -298,8 +302,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     public Set<ConfiguredComponent> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
         // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
         // or a service that references this controller service, etc.
-        final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
-        final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+        final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
+        final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
 
         final Set<ConfiguredComponent> updated = new HashSet<>();
 
@@ -333,7 +337,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     }
 
     @Override
-    public Future<Void> enableControllerService(final ControllerServiceNode serviceNode) {
+    public CompletableFuture<Void> enableControllerService(final ControllerServiceNode serviceNode) {
         serviceNode.verifyCanEnable();
         return processScheduler.enableControllerService(serviceNode);
     }
@@ -450,9 +454,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     }
 
     @Override
-    public void disableControllerService(final ControllerServiceNode serviceNode) {
+    public CompletableFuture<Void> disableControllerService(final ControllerServiceNode serviceNode) {
         serviceNode.verifyCanDisable();
-        processScheduler.disableControllerService(serviceNode);
+        return processScheduler.disableControllerService(serviceNode);
     }
 
     @Override
@@ -589,43 +593,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         return allServices;
     }
 
-    /**
-     * Returns a List of all components that reference the given referencedNode
-     * (either directly or indirectly through another service) that are also of
-     * the given componentType. The list that is returned is in the order in
-     * which they will need to be 'activated' (enabled/started).
-     *
-     * @param referencedNode node
-     * @param componentType type
-     * @return list of components
-     */
-    private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) {
-        final List<T> references = new ArrayList<>();
-
-        for (final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents()) {
-            if (componentType.isAssignableFrom(referencingComponent.getClass())) {
-                references.add(componentType.cast(referencingComponent));
-            }
-
-            if (referencingComponent instanceof ControllerServiceNode) {
-                final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent;
-
-                // find components recursively that depend on referencingNode.
-                final List<T> recursive = findRecursiveReferences(referencingNode, componentType);
-
-                // For anything that depends on referencing node, we want to add it to the list, but we know
-                // that it must come after the referencing node, so we first remove any existing occurrence.
-                references.removeAll(recursive);
-                references.addAll(recursive);
-            }
-        }
-
-        return references;
-    }
 
     @Override
     public Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode) {
-        final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+        final List<ControllerServiceNode> recursiveReferences = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
         logger.debug("Enabling the following Referencing Services for {}: {}", serviceNode, recursiveReferences);
         return enableReferencingServices(serviceNode, recursiveReferences);
     }
@@ -658,7 +629,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     @Override
     public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
-        final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+        final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
         final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
 
         for (final ControllerServiceNode referencingService : referencingServices) {
@@ -668,9 +639,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     @Override
     public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
-        final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
-        final List<ReportingTaskNode> referencingReportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
-        final List<ProcessorNode> referencingProcessors = findRecursiveReferences(serviceNode, ProcessorNode.class);
+        final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
+        final List<ReportingTaskNode> referencingReportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
+        final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
 
         final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
 
@@ -689,9 +660,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     @Override
     public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
-        // Get a list of all Controller Services that need to be disabled, in the order that they need to be
-        // disabled.
-        final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+        // Get a list of all Controller Services that need to be disabled, in the order that they need to be disabled.
+        final List<ControllerServiceNode> toDisable = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
         final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
 
         for (final ControllerServiceNode nodeToDisable : toDisable) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
index d2f3833..285b8dc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
@@ -16,8 +16,10 @@
  */
 package org.apache.nifi.controller.service;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.nifi.controller.ConfiguredComponent;
@@ -101,4 +103,35 @@ public class StandardControllerServiceReference implements ControllerServiceRefe
 
         return references;
     }
+
+
+    @Override
+    public <T> List<T> findRecursiveReferences(final Class<T> componentType) {
+        return findRecursiveReferences(referenced, componentType);
+    }
+
+    private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) {
+        final List<T> references = new ArrayList<>();
+
+        for (final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents()) {
+            if (componentType.isAssignableFrom(referencingComponent.getClass())) {
+                references.add(componentType.cast(referencingComponent));
+            }
+
+            if (referencingComponent instanceof ControllerServiceNode) {
+                final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent;
+
+                // find components recursively that depend on referencingNode.
+                final List<T> recursive = findRecursiveReferences(referencingNode, componentType);
+
+                // For anything that depends on referencing node, we want to add it to the list, but we know
+                // that it must come after the referencing node, so we first remove any existing occurrence.
+                references.removeAll(recursive);
+                references.addAll(recursive);
+            }
+        }
+
+        return references;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index 1ef3e8b..3957955 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -16,6 +16,24 @@
  */
 package org.apache.nifi.fingerprint;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
@@ -38,23 +56,6 @@ import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 
-import javax.xml.XMLConstants;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
 /**
  * <p>Creates a fingerprint of a flow.xml. The order of elements or attributes in the flow.xml does not influence the fingerprint generation.
  *
@@ -324,9 +325,22 @@ public class FingerprintFactory {
             addFunnelFingerprint(builder, funnelElem);
         }
 
+        // add variables
+        final NodeList variableElems = DomUtils.getChildNodesByTagName(processGroupElem, "variable");
+        final List<Element> sortedVarList = sortElements(variableElems, getVariableNameComparator());
+        for (final Element varElem : sortedVarList) {
+            addVariableFingerprint(builder, varElem);
+        }
+
         return builder;
     }
 
+    private void addVariableFingerprint(final StringBuilder builder, final Element variableElement) {
+        final String variableName = variableElement.getAttribute("name");
+        final String variableValue = variableElement.getAttribute("value");
+        builder.append(variableName).append("=").append(variableValue);
+    }
+
     private StringBuilder addFlowFileProcessorFingerprint(final StringBuilder builder, final Element processorElem) throws FingerprintException {
         // id
         appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "id"));
@@ -662,6 +676,27 @@ public class FingerprintFactory {
         };
     }
 
+    private Comparator<Element> getVariableNameComparator() {
+        return new Comparator<Element>() {
+            @Override
+            public int compare(final Element e1, final Element e2) {
+                if (e1 == null && e2 == null) {
+                    return 0;
+                }
+                if (e1 == null) {
+                    return 1;
+                }
+                if (e2 == null) {
+                    return -1;
+                }
+
+                final String varName1 = e1.getAttribute("name");
+                final String varName2 = e2.getAttribute("name");
+                return varName1.compareTo(varName2);
+            }
+        };
+    }
+
     private Comparator<Element> getProcessorPropertiesComparator() {
         return new Comparator<Element>() {
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 2907704..2b7b51d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -16,13 +16,31 @@
  */
 package org.apache.nifi.groups;
 
-import com.google.common.collect.Sets;
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.attribute.expression.language.Query;
+import org.apache.nifi.attribute.expression.language.VariableImpact;
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
@@ -39,6 +57,7 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.connectable.Positionable;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
@@ -50,13 +69,15 @@ import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceReference;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.StandardProcessContext;
-import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.registry.variable.MutableVariableRegistry;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.util.NiFiProperties;
@@ -66,20 +87,7 @@ import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
+import com.google.common.collect.Sets;
 
 public final class StandardProcessGroup implements ProcessGroup {
 
@@ -104,7 +112,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>();
     private final Map<String, Template> templates = new HashMap<>();
     private final StringEncryptor encryptor;
-    private final VariableRegistry variableRegistry;
+    private final MutableVariableRegistry variableRegistry;
 
     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
@@ -114,7 +122,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final StandardProcessScheduler scheduler,
             final NiFiProperties nifiProps, final StringEncryptor encryptor, final FlowController flowController,
-            final VariableRegistry variableRegistry) {
+            final MutableVariableRegistry variableRegistry) {
         this.id = id;
         this.controllerServiceProvider = serviceProvider;
         this.parent = new AtomicReference<>();
@@ -361,7 +369,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     private void shutdown(final ProcessGroup procGroup) {
         for (final ProcessorNode node : procGroup.getProcessors()) {
             try (final NarCloseable x = NarCloseable.withComponentNarLoader(node.getProcessor().getClass(), node.getIdentifier())) {
-                final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), variableRegistry);
+                final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()));
                 ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext);
             }
         }
@@ -548,6 +556,8 @@ public final class StandardProcessGroup implements ProcessGroup {
         writeLock.lock();
         try {
             group.setParent(this);
+            group.getVariableRegistry().setParent(getVariableRegistry());
+
             processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
             flowController.onProcessGroupAdded(group);
         } finally {
@@ -709,6 +719,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             }
 
             processor.setProcessGroup(this);
+            processor.getVariableRegistry().setParent(getVariableRegistry());
             processors.put(processorId, processor);
             flowController.onProcessorAdded(processor);
         } finally {
@@ -732,7 +743,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             }
 
             try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getProcessor().getClass(), processor.getIdentifier())) {
-                final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), variableRegistry);
+                final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()));
                 ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
             } catch (final Exception e) {
                 throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of processor with id " + processor.getIdentifier(), e);
@@ -1081,7 +1092,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     @Override
-    public void startProcessor(final ProcessorNode processor) {
+    public CompletableFuture<Void> startProcessor(final ProcessorNode processor) {
         readLock.lock();
         try {
             if (getProcessor(processor.getIdentifier()) == null) {
@@ -1092,10 +1103,10 @@ public final class StandardProcessGroup implements ProcessGroup {
             if (state == ScheduledState.DISABLED) {
                 throw new IllegalStateException("Processor is disabled");
             } else if (state == ScheduledState.RUNNING) {
-                return;
+                return CompletableFuture.completedFuture(null);
             }
 
-            scheduler.startProcessor(processor);
+            return scheduler.startProcessor(processor);
         } finally {
             readLock.unlock();
         }
@@ -1162,7 +1173,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     @Override
-    public void stopProcessor(final ProcessorNode processor) {
+    public CompletableFuture<Void> stopProcessor(final ProcessorNode processor) {
         readLock.lock();
         try {
             if (!processors.containsKey(processor.getIdentifier())) {
@@ -1173,10 +1184,10 @@ public final class StandardProcessGroup implements ProcessGroup {
             if (state == ScheduledState.DISABLED) {
                 throw new IllegalStateException("Processor is disabled");
             } else if (state == ScheduledState.STOPPED) {
-                return;
+                return CompletableFuture.completedFuture(null);
             }
 
-            scheduler.stopProcessor(processor);
+            return scheduler.stopProcessor(processor);
         } finally {
             readLock.unlock();
         }
@@ -1854,6 +1865,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             }
 
             service.setProcessGroup(this);
+            service.getVariableRegistry().setParent(getVariableRegistry());
             this.controllerServices.put(service.getIdentifier(), service);
             LOG.info("{} added to {}", service, this);
         } finally {
@@ -2583,4 +2595,129 @@ public final class StandardProcessGroup implements ProcessGroup {
             readLock.unlock();
         }
     }
+
+    @Override
+    public MutableVariableRegistry getVariableRegistry() {
+        return variableRegistry;
+    }
+
+    @Override
+    public void verifyCanUpdateVariables(final Map<String, String> updatedVariables) {
+        if (updatedVariables == null || updatedVariables.isEmpty()) {
+            return;
+        }
+
+        readLock.lock();
+        try {
+            final Set<String> updatedVariableNames = getUpdatedVariables(updatedVariables);
+            if (updatedVariableNames.isEmpty()) {
+                return;
+            }
+
+            for (final ProcessorNode processor : findAllProcessors()) {
+                if (!processor.isRunning()) {
+                    continue;
+                }
+
+                for (final String variableName : updatedVariableNames) {
+                    for (final VariableImpact impact : getVariableImpact(processor)) {
+                        if (impact.isImpacted(variableName)) {
+                            throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + processor + ", which is currently running");
+                        }
+                    }
+                }
+            }
+
+            for (final ControllerServiceNode service : findAllControllerServices()) {
+                if (!service.isActive()) {
+                    continue;
+                }
+
+                for (final String variableName : updatedVariableNames) {
+                    for (final VariableImpact impact : getVariableImpact(service)) {
+                        if (impact.isImpacted(variableName)) {
+                            throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + service + ", which is currently running");
+                        }
+                    }
+                }
+            }
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public Set<ConfiguredComponent> getComponentsAffectedByVariable(final String variableName) {
+        final Set<ConfiguredComponent> affected = new HashSet<>();
+
+        // Determine any Processors that references the variable
+        for (final ProcessorNode processor : findAllProcessors()) {
+            for (final VariableImpact impact : getVariableImpact(processor)) {
+                if (impact.isImpacted(variableName)) {
+                    affected.add(processor);
+                }
+            }
+        }
+
+        // Determine any Controller Service that references the variable. If Service A references a variable,
+        // then that means that any other component that references that service is also affected, so recursively
+        // find any references to that service and add it.
+        for (final ControllerServiceNode service : findAllControllerServices()) {
+            for (final VariableImpact impact : getVariableImpact(service)) {
+                if (impact.isImpacted(variableName)) {
+                    affected.add(service);
+
+                    final ControllerServiceReference reference = service.getReferences();
+                    affected.addAll(reference.findRecursiveReferences(ConfiguredComponent.class));
+                }
+            }
+        }
+
+        return affected;
+    }
+
+
+    private Set<String> getUpdatedVariables(final Map<String, String> newVariableValues) {
+        final Set<String> updatedVariableNames = new HashSet<>();
+
+        final MutableVariableRegistry registry = getVariableRegistry();
+        for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) {
+            final String varName = entry.getKey();
+            final String newValue = entry.getValue();
+
+            final String curValue = registry.getVariableValue(varName);
+            if (!Objects.equals(newValue, curValue)) {
+                updatedVariableNames.add(varName);
+            }
+        }
+
+        return updatedVariableNames;
+    }
+
+    private List<VariableImpact> getVariableImpact(final ConfiguredComponent component) {
+        return component.getProperties().values().stream()
+            .map(propVal -> Query.prepare(propVal).getVariableImpact())
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public void setVariables(final Map<String, String> variables) {
+        writeLock.lock();
+        try {
+            verifyCanUpdateVariables(variables);
+
+            if (variables == null) {
+                return;
+            }
+
+            final Map<VariableDescriptor, String> variableMap = new HashMap<>();
+            variables.entrySet().stream() // cannot use Collectors.toMap because value may be null
+                .forEach(entry -> variableMap.put(new VariableDescriptor(entry.getKey()), entry.getValue()));
+
+            variableRegistry.setVariables(variableMap);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 2714392..dfba330 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -26,8 +26,8 @@ import java.util.Set;
 
 import org.apache.nifi.attribute.expression.language.PreparedQuery;
 import org.apache.nifi.attribute.expression.language.Query;
-import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.attribute.expression.language.Query.Range;
+import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.StateManager;
@@ -37,7 +37,6 @@ import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.util.Connectables;
 
 public class StandardProcessContext implements ProcessContext, ControllerServiceLookup {
@@ -47,15 +46,12 @@ public class StandardProcessContext implements ProcessContext, ControllerService
     private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
     private final StringEncryptor encryptor;
     private final StateManager stateManager;
-    private final VariableRegistry variableRegistry;
 
-    public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager,
-                                  final VariableRegistry variableRegistry) {
+    public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager) {
         this.procNode = processorNode;
         this.controllerServiceProvider = controllerServiceProvider;
         this.encryptor = encryptor;
         this.stateManager = stateManager;
-        this.variableRegistry = variableRegistry;
 
         preparedQueries = new HashMap<>();
         for (final Map.Entry<PropertyDescriptor, String> entry : procNode.getProperties().entrySet()) {
@@ -93,12 +89,12 @@ public class StandardProcessContext implements ProcessContext, ControllerService
         final String setPropertyValue = procNode.getProperty(descriptor);
         final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue;
 
-        return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor), variableRegistry);
+        return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor), procNode.getVariableRegistry());
     }
 
     @Override
     public PropertyValue newPropertyValue(final String rawValue) {
-        return new StandardPropertyValue(rawValue, this, Query.prepare(rawValue), variableRegistry);
+        return new StandardPropertyValue(rawValue, this, Query.prepare(rawValue), procNode.getVariableRegistry());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
index 662169c..2f38aee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
@@ -53,8 +53,8 @@ public class StandardValidationContext implements ValidationContext {
     private final String componentId;
 
     public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map<PropertyDescriptor, String> properties,
-            final String annotationData, final String groupId, final String componentId, VariableRegistry variableRegistry) {
-        this(controllerServiceProvider, Collections.<String> emptySet(), properties, annotationData, groupId, componentId,variableRegistry);
+            final String annotationData, final String groupId, final String componentId, final VariableRegistry variableRegistry) {
+        this(controllerServiceProvider, Collections.<String> emptySet(), properties, annotationData, groupId, componentId, variableRegistry);
     }
 
     public StandardValidationContext(
@@ -63,7 +63,8 @@ public class StandardValidationContext implements ValidationContext {
             final Map<PropertyDescriptor, String> properties,
             final String annotationData,
             final String groupId,
-            final String componentId, VariableRegistry variableRegistry) {
+            final String componentId,
+            final VariableRegistry variableRegistry) {
         this.controllerServiceProvider = controllerServiceProvider;
         this.properties = new HashMap<>(properties);
         this.annotationData = annotationData;