You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/11 17:12:05 UTC

incubator-nifi git commit: NIFI-250: Move activate/deactive methods for controller services' referencing components to ControllerServiceProvider

Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-250 ed4d22c1f -> 371e0100d


NIFI-250: Move activate/deactive methods for controller services' referencing components to ControllerServiceProvider


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/371e0100
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/371e0100
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/371e0100

Branch: refs/heads/NIFI-250
Commit: 371e0100d35864f0092b133a7decac34ffb4be33
Parents: ed4d22c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Feb 11 11:11:49 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Feb 11 11:11:49 2015 -0500

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java |  12 +-
 .../nifi/controller/ProcessScheduler.java       |  13 ++
 .../service/ControllerServiceNode.java          |  24 ++-
 .../service/ControllerServiceProvider.java      |  24 +++
 .../apache/nifi/controller/FlowController.java  |  86 ++---------
 .../scheduling/StandardProcessScheduler.java    |   4 +
 .../service/StandardControllerServiceNode.java  |  45 ++++--
 .../StandardControllerServiceProvider.java      | 146 ++++++++++++++++++-
 .../StandardControllerServiceReference.java     |   5 +-
 .../TestStandardControllerServiceProvider.java  | 107 ++++++++++++++
 .../nifi/controller/service/mock/ServiceA.java  |  49 +++++++
 .../nifi/controller/service/mock/ServiceB.java  |  23 +++
 12 files changed, 437 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 8a431ad..ee3c621 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -357,7 +357,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         this.httpResponseMapper = httpResponseMapper;
         this.dataFlowManagementService = dataFlowManagementService;
         this.properties = properties;
-        this.controllerServiceProvider = new StandardControllerServiceProvider();
         this.bulletinRepository = new VolatileBulletinRepository();
         this.instanceId = UUID.randomUUID().toString();
         this.senderListener = senderListener;
@@ -410,6 +409,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
         processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10);
         processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
+        
+        controllerServiceProvider = new StandardControllerServiceProvider(processScheduler);
     }
 
     public void start() throws IOException {
@@ -1356,6 +1357,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     	return controllerServiceProvider.getAllControllerServices();
     }
     
+    @Override
+    public void activateReferencingComponents(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.activateReferencingComponents(serviceNode);
+    }
+    
+    @Override
+    public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.deactivateReferencingComponents(serviceNode);
+    }
     
     private byte[] serialize(final Document doc) throws TransformerException {
     	final ByteArrayOutputStream baos = new ByteArrayOutputStream();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index 303f540..724d1f2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -143,4 +143,17 @@ public interface ProcessScheduler {
      * @param procNode
      */
     void yield(ProcessorNode procNode);
+    
+    /**
+     * Stops scheduling the given Reporting Task to run
+     * @param taskNode
+     */
+    void unschedule(ReportingTaskNode taskNode);
+    
+    /**
+     * Begins scheduling the given Reporting Taks to run
+     * @param taskNode
+     */
+    void schedule(ReportingTaskNode taskNode);
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index d737d04..4822958 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.controller.service;
 
+import java.util.Set;
+
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
 
@@ -27,7 +29,15 @@ public interface ControllerServiceNode extends ConfiguredComponent {
 
     boolean isDisabled();
 
-    void setDisabled(boolean disabled);
+    void enable();
+    void disable();
+    
+    /**
+     * Disables the Controller Service but does not verify that the provided set of referencing
+     * Controller Services should be verified as disabled first
+     * @param ignoredReferences
+     */
+    void disable(Set<ControllerServiceNode> ignoredReferences);
 
     ControllerServiceReference getReferences();
 
@@ -40,6 +50,18 @@ public interface ControllerServiceNode extends ConfiguredComponent {
     
     void verifyCanEnable();
     void verifyCanDisable();
+    
+    /**
+     * Verifies that this Controller Service can be disabled if the provided set of 
+     * services are also disabled. This is introduced because we can have an instance
+     * where A references B, which references C, which references A and we want
+     * to disable service C. In this case, the cycle needs to not cause us to fail, 
+     * so we want to verify that C can be disabled if A and B also are. 
+     * 
+     * @param ignoredReferences
+     */
+    void verifyCanDisable(Set<ControllerServiceNode> ignoredReferences);
+    
     void verifyCanDelete();
     void verifyCanUpdate();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 4074e89..7a767bf 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -74,4 +74,28 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
      * @return
      */
     Set<ControllerServiceNode> getAllControllerServices();
+    
+    /**
+     * Recursively stops all Processors and Reporting Tasks that are referencing the given Controller Service,
+     * as well as disabling any Controller Service that references this Controller Service (and stops
+     * all Reporting Task or Controller Service that is referencing it, and so on).
+     * @param serviceNode
+     */
+    void deactivateReferencingComponents(ControllerServiceNode serviceNode);
+    
+    /**
+     * <p>
+     * Starts any enabled Processors and Reporting Tasks that are referencing this Controller Service. If other Controller
+     * Services reference this Controller Service, will also enable those services and 'activate' any components referencing
+     * them.
+     * </p>
+     * 
+     * <p>
+     * NOTE: If any component cannot be started, an IllegalStateException will be thrown an no more components will
+     * be activated. This method provides no atomicity.
+     * </p>
+     * 
+     * @param serviceNode
+     */
+    void activateReferencingComponents(ControllerServiceNode serviceNode);
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 8bd1149..2825d5b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -104,7 +104,6 @@ import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.controller.service.ControllerServiceReference;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.PortStatus;
@@ -376,7 +375,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
         this.properties = properties;
         sslContext = SslContextFactory.createSslContext(properties, false);
         extensionManager = new ExtensionManager();
-        controllerServiceProvider = new StandardControllerServiceProvider();
 
         timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
         eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
@@ -400,6 +398,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
 
         processScheduler = new StandardProcessScheduler(this, this, encryptor);
         eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
+        controllerServiceProvider = new StandardControllerServiceProvider(processScheduler);
 
         final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
@@ -2571,82 +2570,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
         return reportingTasks.values();
     }
 
-    /**
-     * Recursively stops all Processors and Reporting Tasks that are referencing the given Controller Service,
-     * as well as disabling any Controller Service that references this Controller Service (and stops
-     * all Reporting Task or Controller Service that is referencing it, and so on).
-     * @param serviceNode
-     */
-    public void deactiveReferencingComponents(final ControllerServiceNode serviceNode) {
-    	final ControllerServiceReference reference = serviceNode.getReferences();
-    	
-    	final Set<ConfiguredComponent> components = reference.getActiveReferences();
-    	for (final ConfiguredComponent component : components) {
-    		if ( component instanceof ControllerServiceNode ) {
-    			deactiveReferencingComponents((ControllerServiceNode) component);
-    			
-    			if (isControllerServiceEnabled(serviceNode.getIdentifier())) {
-    				disableControllerService(serviceNode);
-    			}
-    		} else if ( component instanceof ReportingTaskNode ) {
-    			final ReportingTaskNode taskNode = (ReportingTaskNode) component;
-    			if (taskNode.isRunning()) {
-    				stopReportingTask((ReportingTaskNode) component);
-    			}
-    		} else if ( component instanceof ProcessorNode ) {
-    			final ProcessorNode procNode = (ProcessorNode) component;
-    			if ( procNode.isRunning() ) {
-    				stopProcessor(procNode.getProcessGroup().getIdentifier(), procNode.getIdentifier());
-    			}
-    		}
-    	}
-    }
     
-    
-    /**
-     * <p>
-     * Starts any enabled Processors and Reporting Tasks that are referencing this Controller Service. If other Controller
-     * Services reference this Controller Service, will also enable those services and 'active' any components referencing
-     * them.
-     * </p>
-     * 
-     * <p>
-     * NOTE: If any component cannot be started, an IllegalStateException will be thrown an no more components will
-     * be activated. This method provides no atomicity.
-     * </p>
-     * 
-     * @param serviceNode
-     */
+    @Override
     public void activateReferencingComponents(final ControllerServiceNode serviceNode) {
-    	final ControllerServiceReference ref = serviceNode.getReferences();
-    	final Set<ConfiguredComponent> components = ref.getReferencingComponents();
-    	
-    	// First, activate any other controller services. We do this first so that we can
-    	// avoid the situation where Processor X depends on Controller Services Y and Z; and
-    	// Controller Service Y depends on Controller Service Z. In this case, if we first attempted
-    	// to start Processor X, we would fail because Controller Service Y is disabled. THis way, we
-    	// can recursively enable everything.
-    	for ( final ConfiguredComponent component : components ) {
-    		if (component instanceof ControllerServiceNode) {
-    			final ControllerServiceNode componentNode = (ControllerServiceNode) component;
-    			enableControllerService(componentNode);
-    			activateReferencingComponents(componentNode);
-    		}
-    	}
-    	
-    	for ( final ConfiguredComponent component : components ) {
-    		if (component instanceof ProcessorNode) {
-    			final ProcessorNode procNode = (ProcessorNode) component;
-    			if ( !procNode.isRunning() ) {
-    				startProcessor(procNode.getProcessGroup().getIdentifier(), procNode.getIdentifier());
-    			}
-    		} else if (component instanceof ReportingTaskNode) {
-    			final ReportingTaskNode taskNode = (ReportingTaskNode) component;
-    			if ( !taskNode.isRunning() ) {
-    				startReportingTask(taskNode);
-    			}
-    		}
-    	}
+        controllerServiceProvider.activateReferencingComponents(serviceNode);
     }
     
     @Override
@@ -2664,6 +2591,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
         processScheduler.disableReportingTask(reportingTaskNode);
     }
     
+    
+    @Override
+    public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.deactivateReferencingComponents(serviceNode);
+    }
+    
     @Override
     public void enableControllerService(final ControllerServiceNode serviceNode) {
         serviceNode.verifyCanEnable();
@@ -2675,6 +2608,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
         serviceNode.verifyCanDisable();
         controllerServiceProvider.disableControllerService(serviceNode);
     }
+    
 
     @Override
     public ControllerService getControllerService(final String serviceIdentifier) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 1627994..b6699d2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -145,6 +145,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         componentLifeCycleThreadPool.shutdown();
     }
 
+    
+    @Override
     public void schedule(final ReportingTaskNode taskNode) {
         final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
         if (scheduleState.isScheduled()) {
@@ -203,6 +205,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         componentLifeCycleThreadPool.execute(startReportingTaskRunnable);
     }
 
+    
+    @Override
     public void unschedule(final ReportingTaskNode taskNode) {
         final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
         if (!scheduleState.isScheduled()) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index ff4704d..b29f86b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller.service;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,22 +61,27 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     public boolean isDisabled() {
         return disabled.get();
     }
-
+    
+    
     @Override
-    public void setDisabled(final boolean disabled) {
-        if (!disabled && !isValid()) {
+    public void enable() {
+        if ( !isValid() ) {
             throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid");
         }
-
-        if (disabled) {
-            // do not allow a Controller Service to be disabled if it's currently being used.
-            final Set<ConfiguredComponent> runningRefs = getReferences().getActiveReferences();
-            if (!runningRefs.isEmpty()) {
-                throw new IllegalStateException("Cannot disable Controller Service because it is referenced (either directly or indirectly) by " + runningRefs.size() + " different components that are currently running");
-            }
-        }
-
-        this.disabled.set(disabled);
+        
+        this.disabled.set(false);
+    }
+    
+    @Override
+    public void disable() {
+        verifyCanDisable();
+        this.disabled.set(true);
+    }
+    
+    @Override
+    public void disable(final Set<ControllerServiceNode> ignoredReferences) {
+        verifyCanDisable(ignoredReferences);
+        this.disabled.set(true);
     }
 
     @Override
@@ -161,10 +167,17 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     
     @Override
     public void verifyCanDisable() {
+        verifyCanDisable(Collections.<ControllerServiceNode>emptySet());
+    }
+    
+    @Override
+    public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
         final ControllerServiceReference references = getReferences();
-        final int numRunning = references.getActiveReferences().size();
-        if ( numRunning > 0 ) {
-            throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by " + numRunning + " components that are currently running");
+        
+        for ( final ConfiguredComponent activeReference : references.getActiveReferences() ) {
+            if ( !ignoreReferences.contains(activeReference) ) {
+                throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by at least one component that is currently running");
+            }
         }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 208da30..c584188 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -37,7 +37,11 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
 import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
@@ -56,6 +60,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
 
+    private final ProcessScheduler processScheduler;
     private final ConcurrentMap<String, ControllerServiceNode> controllerServices;
     private static final Set<Method> validDisabledMethods;
 
@@ -71,10 +76,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         validDisabledMethods = Collections.unmodifiableSet(validMethods);
     }
 
-    public StandardControllerServiceProvider() {
+    public StandardControllerServiceProvider(final ProcessScheduler scheduler) {
         // the following 2 maps must be updated atomically, but we do not lock around them because they are modified
         // only in the createControllerService method, and both are modified before the method returns
         this.controllerServices = new ConcurrentHashMap<>();
+        this.processScheduler = scheduler;
     }
 
     private Class<?>[] getInterfaces(final Class<?> cls) {
@@ -106,8 +112,14 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
         try {
             final ClassLoader cl = ExtensionManager.getClassLoader(type);
-            Thread.currentThread().setContextClassLoader(cl);
-            final Class<?> rawClass = Class.forName(type, false, cl);
+            final Class<?> rawClass;
+            if ( cl == null ) {
+                rawClass = Class.forName(type);
+            } else {
+                Thread.currentThread().setContextClassLoader(cl);
+                rawClass = Class.forName(type, false, cl);
+            }
+            
             final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
 
             final ControllerService originalService = controllerServiceClass.newInstance();
@@ -135,7 +147,12 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
                 }
             };
 
-            final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
+            final ControllerService proxiedService;
+            if ( cl == null ) {
+                proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), getInterfaces(controllerServiceClass), invocationHandler);
+            } else {
+                proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
+            }
             logger.info("Create Controller Service of type {} with identifier {}", type, id);
 
             originalService.initialize(new StandardControllerServiceInitializationContext(id, this));
@@ -174,7 +191,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation(), configContext);
         }
         
-        serviceNode.setDisabled(false);
+        serviceNode.enable();
     }
     
     @Override
@@ -183,7 +200,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
         // We must set the service to disabled before we invoke the OnDisabled methods because the service node
         // can throw Exceptions if we attempt to disable the service while it's known to be in use.
-        serviceNode.setDisabled(true);
+        serviceNode.disable();
         
         try (final NarCloseable x = NarCloseable.withNarLoader()) {
             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
@@ -263,4 +280,121 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     public Set<ControllerServiceNode> getAllControllerServices() {
     	return new HashSet<>(controllerServices.values());
     }
+    
+    @Override
+    public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) {
+        deactivateReferencingComponents(serviceNode, new HashSet<ControllerServiceNode>());
+    }
+    
+    private void deactivateReferencingComponents(final ControllerServiceNode serviceNode, final Set<ControllerServiceNode> visited) {
+        final ControllerServiceReference reference = serviceNode.getReferences();
+        
+        final Set<ConfiguredComponent> components = reference.getActiveReferences();
+        for (final ConfiguredComponent component : components) {
+            if ( component instanceof ControllerServiceNode ) {
+                // If we've already visited this component (there is a loop such that
+                // we are disabling Controller Service A, but B depends on A and A depends on B)
+                // we don't need to disable this component because it will be disabled after we return
+                if ( visited.contains(component) ) {
+                    continue;
+                }
+                
+                visited.add(serviceNode);
+                deactivateReferencingComponents((ControllerServiceNode) component, visited);
+                
+                if (isControllerServiceEnabled(serviceNode.getIdentifier())) {
+                    serviceNode.verifyCanDisable(visited);
+                    serviceNode.disable(visited);
+                }
+            } else if ( component instanceof ReportingTaskNode ) {
+                final ReportingTaskNode taskNode = (ReportingTaskNode) component;
+                if (taskNode.isRunning()) {
+                    taskNode.verifyCanStop();
+                    processScheduler.unschedule(taskNode);
+                }
+            } else if ( component instanceof ProcessorNode ) {
+                final ProcessorNode procNode = (ProcessorNode) component;
+                if ( procNode.isRunning() ) {
+                    procNode.getProcessGroup().stopProcessor(procNode);
+                }
+            }
+        }
+    }
+    
+    
+    @Override
+    public void activateReferencingComponents(final ControllerServiceNode serviceNode) {
+        activateReferencingComponents(serviceNode, new HashSet<ControllerServiceNode>());
+    }
+    
+    
+    /**
+     * Recursively enables this controller service and any controller service that it references.
+     * @param serviceNode
+     */
+    private void activateReferencedComponents(final ControllerServiceNode serviceNode) {
+        for ( final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet() ) {
+            final PropertyDescriptor key = entry.getKey();
+            if ( key.getControllerServiceDefinition() == null ) {
+                continue;
+            }
+                
+            final String serviceId = entry.getValue() == null ? key.getDefaultValue() : entry.getValue();
+            if ( serviceId == null ) {
+                continue;
+            }
+            
+            final ControllerServiceNode referencedNode = getControllerServiceNode(serviceId);
+            if ( referencedNode == null ) {
+                throw new IllegalStateException("Cannot activate referenced component of " + serviceNode + " because no service exists with ID " + serviceId);
+            }
+            
+            activateReferencedComponents(referencedNode);
+            
+            if ( referencedNode.isDisabled() ) {
+                enableControllerService(referencedNode);
+            }
+        }
+    }
+    
+    private void activateReferencingComponents(final ControllerServiceNode serviceNode, final Set<ControllerServiceNode> visited) {
+        if ( serviceNode.isDisabled() ) {
+            throw new IllegalStateException("Cannot activate referencing components of " + serviceNode.getControllerServiceImplementation() + " because the Controller Service is disabled");
+        }
+        
+        final ControllerServiceReference ref = serviceNode.getReferences();
+        final Set<ConfiguredComponent> components = ref.getReferencingComponents();
+        
+        // First, activate any other controller services. We do this first so that we can
+        // avoid the situation where Processor X depends on Controller Services Y and Z; and
+        // Controller Service Y depends on Controller Service Z. In this case, if we first attempted
+        // to start Processor X, we would fail because Controller Service Y is disabled. THis way, we
+        // can recursively enable everything.
+        for ( final ConfiguredComponent component : components ) {
+            if (component instanceof ControllerServiceNode) {
+                final ControllerServiceNode componentNode = (ControllerServiceNode) component;
+                activateReferencedComponents(componentNode);
+                
+                if ( componentNode.isDisabled() ) {
+                    enableControllerService(componentNode);
+                }
+                
+                activateReferencingComponents(componentNode);
+            }
+        }
+        
+        for ( final ConfiguredComponent component : components ) {
+            if (component instanceof ProcessorNode) {
+                final ProcessorNode procNode = (ProcessorNode) component;
+                if ( !procNode.isRunning() ) {
+                    procNode.getProcessGroup().startProcessor(procNode);
+                }
+            } else if (component instanceof ReportingTaskNode) {
+                final ReportingTaskNode taskNode = (ReportingTaskNode) component;
+                if ( !taskNode.isRunning() ) {
+                    processScheduler.schedule(taskNode);
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
index a8468ff..97921d6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
@@ -65,7 +65,10 @@ public class StandardControllerServiceReference implements ControllerServiceRefe
         for (final ConfiguredComponent component : components) {
             if (component instanceof ControllerServiceNode) {
                 serviceNodes.add((ControllerServiceNode) component);
-                activeReferences.add(component);
+                
+                if ( !((ControllerServiceNode) component).isDisabled() ) {
+                    activeReferences.add(component);
+                }
             } else if (isRunning(component)) {
                 activeReferences.add(component);
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
new file mode 100644
index 0000000..fbd3dd7
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import static org.junit.Assert.assertFalse;
+
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.service.mock.ServiceA;
+import org.apache.nifi.controller.service.mock.ServiceB;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestStandardControllerServiceProvider {
+
+    @Test
+    public void testDisableControllerService() {
+        final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
+        
+        final ControllerServiceNode serviceNode = provider.createControllerService(ServiceB.class.getName(), "B", false);
+        provider.enableControllerService(serviceNode);
+        provider.disableControllerService(serviceNode);
+    }
+    
+    @Test
+    public void testEnableDisableWithReference() {
+        final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
+        
+        final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false);
+        final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false);
+        
+        serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
+        
+        try {
+            provider.enableControllerService(serviceNodeA);
+            Assert.fail("Was able to enable Service A but Service B is disabled.");
+        } catch (final IllegalStateException expected) {
+        }
+        
+        provider.enableControllerService(serviceNodeB);
+        provider.enableControllerService(serviceNodeA);
+        
+        try {
+            provider.disableControllerService(serviceNodeB);
+            Assert.fail("Was able to disable Service B but Service A is enabled and references B");
+        } catch (final IllegalStateException expected) {
+        }
+        
+        provider.disableControllerService(serviceNodeA);
+        provider.disableControllerService(serviceNodeB);
+    }
+    
+    
+    @Test
+    public void testActivateReferencingComponentsGraph() {
+        final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
+        
+        // build a graph of components with dependencies as such:
+        //
+        // A -> B -> D
+        // C ---^----^
+        //
+        // In other words, A references B, which references D.
+        // AND
+        // C references B and D.
+        //
+        // So we have to verify that if D is enabled, when we enable its referencing components,
+        // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
+        // until B is first enabled so ensure that we enable B first.
+        
+        final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
+        final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
+        final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
+        final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
+        
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
+        
+        provider.enableControllerService(serviceNode4);
+        provider.activateReferencingComponents(serviceNode4);
+        
+        assertFalse(serviceNode3.isDisabled());
+        assertFalse(serviceNode2.isDisabled());
+        assertFalse(serviceNode1.isDisabled());
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
new file mode 100644
index 0000000..4918468
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service.mock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ControllerService;
+
+public class ServiceA extends AbstractControllerService {
+
+    public static final PropertyDescriptor OTHER_SERVICE = new PropertyDescriptor.Builder()
+        .name("Other Service")
+        .identifiesControllerService(ControllerService.class)
+        .required(true)
+        .build();
+    
+    public static final PropertyDescriptor OTHER_SERVICE_2 = new PropertyDescriptor.Builder()
+        .name("Other Service 2")
+        .identifiesControllerService(ControllerService.class)
+        .required(false)
+        .build();
+
+    
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(OTHER_SERVICE);
+        descriptors.add(OTHER_SERVICE_2);
+        return descriptors;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceB.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceB.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceB.java
new file mode 100644
index 0000000..070b156
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceB.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service.mock;
+
+import org.apache.nifi.controller.AbstractControllerService;
+
+public class ServiceB extends AbstractControllerService {
+
+}