You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 15:43:54 UTC

[19/62] [abbrv] incubator-nifi git commit: Squashed commit of the following:

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java
index aa095d1..30d4365 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java
@@ -42,24 +42,25 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.persistence.TemplateDeserializer;
+import org.apache.nifi.persistence.TemplateSerializer;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.stream.io.DataOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.persistence.TemplateDeserializer;
-import org.apache.nifi.persistence.TemplateSerializer;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.dto.ProcessorConfigDTO.PropertyDescriptorDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.api.dto.TemplateDTO;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -272,6 +273,11 @@ public class TemplateManager {
             if (snippet.getProcessGroups() != null) {
                 scrubProcessGroups(snippet.getProcessGroups());
             }
+            
+            // go through each controller service if specified
+            if (snippet.getControllerServices() != null) {
+                scrubControllerServices(snippet.getControllerServices());
+            }
         }
     }
 
@@ -315,7 +321,6 @@ public class TemplateManager {
                     }
                 }
 
-                processorConfig.setDescriptors(null);
                 processorConfig.setCustomUiUrl(null);
             }
 
@@ -323,6 +328,24 @@ public class TemplateManager {
             processorDTO.setValidationErrors(null);
         }
     }
+    
+    private void scrubControllerServices(final Set<ControllerServiceDTO> controllerServices) {
+        for ( final ControllerServiceDTO serviceDTO : controllerServices ) {
+            final Map<String, String> properties = serviceDTO.getProperties();
+            final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors();
+            
+            if ( properties != null && descriptors != null ) {
+                for ( final PropertyDescriptorDTO descriptor : descriptors.values() ) {
+                    if ( descriptor.isSensitive() ) {
+                        properties.put(descriptor.getName(), null);
+                    }
+                }
+            }
+            
+            serviceDTO.setCustomUiUrl(null);
+            serviceDTO.setValidationErrors(null);
+        }
+    }
 
     /**
      * Scrubs connections prior to saving. This includes removing available

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 7c3734a..05b3a06 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -16,11 +16,14 @@
  */
 package org.apache.nifi.controller.reporting;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractConfiguredComponent;
-import org.apache.nifi.controller.Availability;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.ProcessScheduler;
@@ -29,6 +32,7 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.annotation.OnConfigured;
 import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.nar.NarCloseable;
@@ -45,8 +49,8 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
 
     private final AtomicReference<SchedulingStrategy> schedulingStrategy = new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN);
     private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins");
-    private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
-
+    
+    private volatile String comment;
     private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
     
     public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
@@ -59,16 +63,6 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
     }
 
     @Override
-    public Availability getAvailability() {
-        return availability.get();
-    }
-
-    @Override
-    public void setAvailability(final Availability availability) {
-        this.availability.set(availability);
-    }
-
-    @Override
     public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) {
         this.schedulingStrategy.set(schedulingStrategy);
     }
@@ -102,6 +96,11 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
     public boolean isRunning() {
         return processScheduler.isScheduled(this) || processScheduler.getActiveThreadCount(this) > 0;
     }
+    
+    @Override
+    public int getActiveThreadCount() {
+        return processScheduler.getActiveThreadCount(this);
+    }
 
     @Override
     public ConfigurationContext getConfigurationContext() {
@@ -142,7 +141,8 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
         return removed;
     }
     
-    private void onConfigured() {
+    @SuppressWarnings("deprecation")
+	private void onConfigured() {
         // We need to invoke any method annotation with the OnConfigured annotation in order to
         // maintain backward compatibility. This will be removed when we remove the old, deprecated annotations.
         try (final NarCloseable x = NarCloseable.withNarLoader()) {
@@ -158,6 +158,16 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
     }
     
     @Override
+    public String getComments() {
+		return comment;
+	}
+
+    @Override
+	public void setComments(final String comment) {
+		this.comment = comment;
+	}
+
+	@Override
     public void verifyCanDelete() {
         if (isRunning()) {
             throw new IllegalStateException("Cannot delete " + reportingTask + " because it is currently running");
@@ -207,4 +217,38 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
             throw new IllegalStateException("Cannot update " + reportingTask + " because it is currently running");
         }
     }
+    
+    @Override
+    public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
+        switch (getScheduledState()) {
+            case DISABLED:
+                throw new IllegalStateException(this + " cannot be started because it is disabled");
+            case RUNNING:
+                throw new IllegalStateException(this + " cannot be started because it is already running");
+            case STOPPED:
+                break;
+        }
+        final int activeThreadCount = getActiveThreadCount();
+        if ( activeThreadCount > 0 ) {
+            throw new IllegalStateException(this + " cannot be started because it has " + activeThreadCount + " active threads already");
+        }
+        
+        final Set<String> ids = new HashSet<>();
+        for ( final ControllerServiceNode node : ignoredReferences ) {
+            ids.add(node.getIdentifier());
+        }
+        
+        final Collection<ValidationResult> validationResults = getValidationErrors(ids);
+        for ( final ValidationResult result : validationResults ) {
+            if ( !result.isValid() ) {
+                throw new IllegalStateException(this + " cannot be started because it is not valid: " + result);
+            }
+        }
+    }
+    
+    
+    @Override
+    public String toString() {
+        return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index ed48e20..3d57533 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -124,9 +124,20 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
     public boolean isControllerServiceEnabled(final String serviceIdentifier) {
         return serviceProvider.isControllerServiceEnabled(serviceIdentifier);
     }
+    
+    @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) {
+        return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
+    }
 
     @Override
     public ControllerServiceLookup getControllerServiceLookup() {
         return this;
     }
+
+	@Override
+	public String getControllerServiceName(final String serviceIdentifier) {
+		return serviceProvider.getControllerServiceName(serviceIdentifier);
+	}
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
index d576f9c..435dbd0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
@@ -33,13 +34,16 @@ public class StandardReportingInitializationContext implements ReportingInitiali
     private final String schedulingPeriod;
     private final SchedulingStrategy schedulingStrategy;
     private final ControllerServiceProvider serviceProvider;
-
-    public StandardReportingInitializationContext(final String id, final String name, final SchedulingStrategy schedulingStrategy, final String schedulingPeriod, final ControllerServiceProvider serviceProvider) {
+    private final ComponentLog logger;
+    
+    public StandardReportingInitializationContext(final String id, final String name, final SchedulingStrategy schedulingStrategy, 
+            final String schedulingPeriod, final ComponentLog logger, final ControllerServiceProvider serviceProvider) {
         this.id = id;
         this.name = name;
         this.schedulingPeriod = schedulingPeriod;
         this.serviceProvider = serviceProvider;
         this.schedulingStrategy = schedulingStrategy;
+        this.logger = logger;
     }
 
     @Override
@@ -90,7 +94,22 @@ public class StandardReportingInitializationContext implements ReportingInitiali
     }
 
     @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) {
+        return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
+    }
+    
+    @Override
     public ControllerServiceLookup getControllerServiceLookup() {
         return this;
     }
+    
+    @Override
+    public String getControllerServiceName(final String serviceIdentifier) {
+    	return serviceProvider.getControllerServiceName(serviceIdentifier);
+    }
+    
+    @Override
+    public ComponentLog getLogger() {
+        return logger;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 4407451..89850cc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -19,6 +19,8 @@ package org.apache.nifi.controller.scheduling;
 import static java.util.Objects.requireNonNull;
 
 import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -32,20 +34,26 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.SchedulingContext;
@@ -144,6 +152,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         componentLifeCycleThreadPool.shutdown();
     }
 
+    
+    @Override
     public void schedule(final ReportingTaskNode taskNode) {
         final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
         if (scheduleState.isScheduled()) {
@@ -176,16 +186,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                         }
                         
                         break;
-                    } catch (final InvocationTargetException ite) {
-                        LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
-                                new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
-                        LOG.error("", ite.getTargetException());
-
-                        try {
-                            Thread.sleep(administrativeYieldMillis);
-                        } catch (final InterruptedException ie) {
-                        }
                     } catch (final Exception e) {
+                        final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
+                        final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
+                        componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
+                        
                         LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
                                 new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
                         try {
@@ -200,18 +205,23 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         };
 
         componentLifeCycleThreadPool.execute(startReportingTaskRunnable);
+        taskNode.setScheduledState(ScheduledState.RUNNING);
     }
 
+    
+    @Override
     public void unschedule(final ReportingTaskNode taskNode) {
         final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
         if (!scheduleState.isScheduled()) {
             return;
         }
-
+        
+        taskNode.verifyCanStop();
         final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
         final ReportingTask reportingTask = taskNode.getReportingTask();
         scheduleState.setScheduled(false);
-
+        taskNode.setScheduledState(ScheduledState.STOPPED);
+        
         final Runnable unscheduleReportingTaskRunnable = new Runnable() {
             @SuppressWarnings("deprecation")
             @Override
@@ -222,18 +232,15 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                     try (final NarCloseable x = NarCloseable.withNarLoader()) {
                         ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
                     }
-                } catch (final InvocationTargetException ite) {
-                    LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
-                            new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
-                    LOG.error("", ite.getTargetException());
-
-                    try {
-                        Thread.sleep(administrativeYieldMillis);
-                    } catch (final InterruptedException ie) {
-                    }
                 } catch (final Exception e) {
-                    LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
-                            new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
+                    final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
+                    final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
+                    componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
+
+                    LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+                            reportingTask, cause.toString(), administrativeYieldDuration);
+                    LOG.error("", cause);
+                    
                     try {
                         Thread.sleep(administrativeYieldMillis);
                     } catch (final InterruptedException ie) {
@@ -274,20 +281,38 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         }
 
         if (!procNode.isValid()) {
-            throw new IllegalStateException("Processor " + procNode.getName() + " is not in a valid state");
+            throw new IllegalStateException("Processor " + procNode.getName() + " is not in a valid state due to " + procNode.getValidationErrors());
         }
 
         final Runnable startProcRunnable = new Runnable() {
-            @SuppressWarnings("deprecation")
             @Override
+            @SuppressWarnings("deprecation")
             public void run() {
                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
                     long lastStopTime = scheduleState.getLastStopTime();
                     final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor);
 
-                    while (true) {
+                    final Set<String> serviceIds = new HashSet<>();
+                    for ( final PropertyDescriptor descriptor : processContext.getProperties().keySet() ) {
+                        final Class<? extends ControllerService> serviceDefinition = descriptor.getControllerServiceDefinition();
+                        if ( serviceDefinition != null ) {
+                            final String serviceId = processContext.getProperty(descriptor).getValue();
+                            serviceIds.add(serviceId);
+                        }
+                    }
+                    
+                    attemptOnScheduled: while (true) {
                         try {
                             synchronized (scheduleState) {
+                                for ( final String serviceId : serviceIds ) {
+                                    final boolean enabled = processContext.isControllerServiceEnabled(serviceId);
+                                    if ( !enabled ) {
+                                        LOG.debug("Controller Service with ID {} is not yet enabled, so will not start {} yet", serviceId, procNode);
+                                        Thread.sleep(administrativeYieldMillis);
+                                        continue attemptOnScheduled;
+                                    }
+                                }
+                                
                                 // if no longer scheduled to run, then we're finished. This can happen, for example,
                                 // if the @OnScheduled method throws an Exception and the user stops the processor 
                                 // while we're administratively yielded.
@@ -308,11 +333,12 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                                 return;
                             }
                         } catch (final Exception e) {
+                            final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
                             final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
 
                             procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}",
-                                    new Object[]{procNode.getProcessor(), e.getCause(), administrativeYieldDuration}, e.getCause());
-                            LOG.error("Failed to invoke @OnScheduled method due to {}", e.getCause().toString(), e.getCause());
+                                    new Object[]{procNode.getProcessor(), cause.getCause(), administrativeYieldDuration}, cause.getCause());
+                            LOG.error("Failed to invoke @OnScheduled method due to {}", cause.getCause().toString(), cause.getCause());
 
                             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, procNode.getProcessor(), processContext);
                             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext);
@@ -535,11 +561,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         }
         
         procNode.setScheduledState(ScheduledState.STOPPED);
-        
-        try (final NarCloseable x = NarCloseable.withNarLoader()) {
-            final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
-            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, procNode.getProcessor(), processorLog);
-        }
     }
 
     @Override
@@ -549,11 +570,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         }
         
         procNode.setScheduledState(ScheduledState.DISABLED);
-        
-        try (final NarCloseable x = NarCloseable.withNarLoader()) {
-            final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
-            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, procNode.getProcessor(), processorLog);
-        }
     }
 
     public synchronized void enableReportingTask(final ReportingTaskNode taskNode) {
@@ -562,10 +578,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         }
 
         taskNode.setScheduledState(ScheduledState.STOPPED);
-        
-        try (final NarCloseable x = NarCloseable.withNarLoader()) {
-            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, taskNode.getReportingTask());
-        }
     }
     
     public synchronized void disableReportingTask(final ReportingTaskNode taskNode) {
@@ -574,10 +586,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         }
 
         taskNode.setScheduledState(ScheduledState.DISABLED);
-        
-        try (final NarCloseable x = NarCloseable.withNarLoader()) {
-            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, taskNode.getReportingTask());
-        }
     }
 
     @Override
@@ -605,4 +613,114 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         }
         return scheduleState;
     }
+
+    @Override
+    public void enableControllerService(final ControllerServiceNode service) {
+        service.setState(ControllerServiceState.ENABLING);
+        final ScheduleState scheduleState = getScheduleState(service);
+        
+        final Runnable enableRunnable = new Runnable() {
+            @Override
+            public void run() {
+                try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                    long lastStopTime = scheduleState.getLastStopTime();
+                    final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
+                    
+                    while (true) {
+                        try {
+                            synchronized (scheduleState) {
+                                // if no longer enabled, then we're finished. This can happen, for example,
+                                // if the @OnEnabled method throws an Exception and the user disables the service
+                                // while we're administratively yielded.
+                                // 
+                                // we also check if the schedule state's last stop time is equal to what it was before.
+                                // if not, then means that the service has been disabled and enabled again, so we should just
+                                // bail; another thread will be responsible for invoking the @OnEnabled methods.
+                                if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
+                                    return;
+                                }
+
+                                ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service.getControllerServiceImplementation(), configContext);
+                                heartbeater.heartbeat();
+                                service.setState(ControllerServiceState.ENABLED);
+                                return;
+                            }
+                        } catch (final Exception e) {
+                            final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
+                            
+                            final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
+                            componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
+                            LOG.error("Failed to invoke @OnEnabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
+                            if ( LOG.isDebugEnabled() ) {
+                                LOG.error("", cause);
+                            }
+
+                            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
+                            Thread.sleep(administrativeYieldMillis);
+                            continue;
+                        }
+                    }
+                } catch (final Throwable t) {
+                    final Throwable cause = (t instanceof InvocationTargetException) ? t.getCause() : t;
+                    final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
+                    componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
+                    
+                    LOG.error("Failed to invoke @OnEnabled method on {} due to {}", service.getControllerServiceImplementation(), cause.toString());
+                    if ( LOG.isDebugEnabled() ) {
+                        LOG.error("", cause);
+                    }
+                }
+            }
+        };
+        
+        scheduleState.setScheduled(true);
+        componentLifeCycleThreadPool.execute(enableRunnable);
+    }
+
+    @Override
+    public void disableControllerService(final ControllerServiceNode service) {
+        service.verifyCanDisable();
+        
+        final ScheduleState state = getScheduleState(requireNonNull(service));
+        final Runnable disableRunnable = new Runnable() {
+            @Override
+            public void run() {
+                synchronized (state) {
+                    state.setScheduled(false);
+                }
+
+                try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                    final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
+                    
+                    while(true) {
+                        try {
+                            ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
+                            heartbeater.heartbeat();
+                            service.setState(ControllerServiceState.DISABLED);
+                            return;
+                        } catch (final Exception e) {
+                            final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
+                            final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
+                            componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
+                            
+                            LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
+                            if ( LOG.isDebugEnabled() ) {
+                                LOG.error("", cause);
+                            }
+        
+                            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
+                            try {
+                                Thread.sleep(administrativeYieldMillis);
+                            } catch (final InterruptedException ie) {}
+                            
+                            continue;
+                        }
+                    }
+                }
+            }
+        };
+
+        service.setState(ControllerServiceState.DISABLING);
+        componentLifeCycleThreadPool.execute(disableRunnable);        
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index db44b5f..1fde670 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -17,30 +17,29 @@
 package org.apache.nifi.controller.service;
 
 import java.io.BufferedInputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
-import javax.xml.XMLConstants;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.nifi.controller.FlowFromDOMFactory;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.util.DomUtils;
-import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
@@ -49,35 +48,14 @@ import org.xml.sax.SAXParseException;
  */
 public class ControllerServiceLoader {
 
-    private static final Log logger = LogFactory.getLog(ControllerServiceLoader.class);
+    private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class);
 
-    private final Path serviceConfigXmlPath;
 
-    public ControllerServiceLoader(final Path serviceConfigXmlPath) throws IOException {
-        final File serviceConfigXmlFile = serviceConfigXmlPath.toFile();
-        if (!serviceConfigXmlFile.exists() || !serviceConfigXmlFile.canRead()) {
-            throw new IOException(serviceConfigXmlPath + " does not appear to exist or cannot be read. Cannot load configuration.");
-        }
-
-        this.serviceConfigXmlPath = serviceConfigXmlPath;
-    }
-
-    public List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider) throws IOException {
-        final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+    public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException {
         final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
-        InputStream fis = null;
-        BufferedInputStream bis = null;
         documentBuilderFactory.setNamespaceAware(true);
 
-        final List<ControllerServiceNode> services = new ArrayList<>();
-
-        try {
-            final URL configurationResource = this.getClass().getResource("/ControllerServiceConfiguration.xsd");
-            if (configurationResource == null) {
-                throw new NullPointerException("Unable to load XML Schema for ControllerServiceConfiguration");
-            }
-            final Schema schema = schemaFactory.newSchema(configurationResource);
-            documentBuilderFactory.setSchema(schema);
+        try (final InputStream in = new BufferedInputStream(serializedStream)) {
             final DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder();
 
             builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -109,43 +87,72 @@ public class ControllerServiceLoader {
                     throw err;
                 }
             });
-
-            //if controllerService.xml does not exist, create an empty file...
-            fis = Files.newInputStream(this.serviceConfigXmlPath, StandardOpenOption.READ);
-            bis = new BufferedInputStream(fis);
-            if (Files.size(this.serviceConfigXmlPath) > 0) {
-                final Document document = builder.parse(bis);
-                final NodeList servicesNodes = document.getElementsByTagName("services");
-                final Element servicesElement = (Element) servicesNodes.item(0);
-
-                final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(servicesElement, "service");
-                for (final Element serviceElement : serviceNodes) {
-                    //get properties for the specific controller task - id, name, class,
-                    //and schedulingPeriod must be set
-                    final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
-                    final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
-
-                    //set the class to be used for the configured controller task
-                    final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, false);
-
-                    //optional task-specific properties
-                    for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
-                        final String name = optionalProperty.getAttribute("name").trim();
-                        final String value = optionalProperty.getTextContent().trim();
-                        serviceNode.setProperty(name, value);
-                    }
-
-                    services.add(serviceNode);
-                    provider.enableControllerService(serviceNode);
-                }
-            }
+            
+            final Document document = builder.parse(in);
+            final Element controllerServices = document.getDocumentElement();
+            final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
+            return new ArrayList<ControllerServiceNode>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState));
         } catch (SAXException | ParserConfigurationException sxe) {
             throw new IOException(sxe);
-        } finally {
-            FileUtils.closeQuietly(fis);
-            FileUtils.closeQuietly(bis);
         }
-
-        return services;
+    }
+    
+    public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final ControllerServiceProvider provider, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) {
+        final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>();
+        for ( final Element serviceElement : serviceElements ) {
+            final ControllerServiceNode serviceNode = createControllerService(provider, serviceElement, encryptor);
+            // We need to clone the node because it will be used in a separate thread below, and 
+            // Element is not thread-safe.
+            nodeMap.put(serviceNode, (Element) serviceElement.cloneNode(true));
+        }
+        for ( final Map.Entry<ControllerServiceNode, Element> entry : nodeMap.entrySet() ) {
+            configureControllerService(entry.getKey(), entry.getValue(), encryptor);
+        }
+        
+        // Start services
+        if ( autoResumeState ) {
+            final Set<ControllerServiceNode> nodesToEnable = new HashSet<>();
+            
+            for ( final ControllerServiceNode node : nodeMap.keySet() ) {
+                final Element controllerServiceElement = nodeMap.get(node);
+
+                final ControllerServiceDTO dto;
+                synchronized (controllerServiceElement.getOwnerDocument()) {
+                    dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+                }
+                
+                final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState());
+                if (state == ControllerServiceState.ENABLED) {
+                    nodesToEnable.add(node);
+                }
+            }
+            
+            provider.enableControllerServices(nodesToEnable);
+        }
+        
+        return nodeMap.keySet();
+    }
+    
+    
+    private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) {
+        final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+        
+        final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), false);
+        node.setName(dto.getName());
+        node.setComments(dto.getComments());
+        return node;
+    }
+    
+    private static void configureControllerService(final ControllerServiceNode node, final Element controllerServiceElement, final StringEncryptor encryptor) {
+        final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+        node.setAnnotationData(dto.getAnnotationData());
+        
+        for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
+            if (entry.getValue() == null) {
+                node.removeProperty(entry.getKey());
+            } else {
+                node.setProperty(entry.getKey(), entry.getValue());
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
index 8b5f27f..8d46b05 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
@@ -21,14 +21,17 @@ import java.util.Set;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.logging.ComponentLog;
 
 public class StandardControllerServiceInitializationContext implements ControllerServiceInitializationContext, ControllerServiceLookup {
 
     private final String id;
     private final ControllerServiceProvider serviceProvider;
+    private final ComponentLog logger;
 
-    public StandardControllerServiceInitializationContext(final String identifier, final ControllerServiceProvider serviceProvider) {
+    public StandardControllerServiceInitializationContext(final String identifier, final ComponentLog logger, final ControllerServiceProvider serviceProvider) {
         this.id = identifier;
+        this.logger = logger;
         this.serviceProvider = serviceProvider;
     }
 
@@ -61,4 +64,19 @@ public class StandardControllerServiceInitializationContext implements Controlle
     public boolean isControllerServiceEnabled(final ControllerService service) {
         return serviceProvider.isControllerServiceEnabled(service);
     }
+    
+    @Override
+    public boolean isControllerServiceEnabling(String serviceIdentifier) {
+        return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
+    }
+    
+    @Override
+    public String getControllerServiceName(final String serviceIdentifier) {
+    	return serviceProvider.getControllerServiceName(serviceIdentifier);
+    }
+
+    @Override
+    public ComponentLog getLogger() {
+        return logger;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 741caec..c8c7ec9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,16 +16,17 @@
  */
 package org.apache.nifi.controller.service;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractConfiguredComponent;
-import org.apache.nifi.controller.Availability;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
@@ -41,14 +42,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     private final ControllerService implementation;
     private final ControllerServiceProvider serviceProvider;
 
-    private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
-    private final AtomicBoolean disabled = new AtomicBoolean(true);
+    private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED);
 
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
     private final Lock writeLock = rwLock.writeLock();
 
     private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
+    private String comment;
 
     public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
             final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
@@ -58,38 +59,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
         this.serviceProvider = serviceProvider;
     }
 
-    @Override
-    public boolean isDisabled() {
-        return disabled.get();
-    }
-
-    @Override
-    public void setDisabled(final boolean disabled) {
-        if (!disabled && !isValid()) {
-            throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid");
-        }
-
-        if (disabled) {
-            // do not allow a Controller Service to be disabled if it's currently being used.
-            final Set<ConfiguredComponent> runningRefs = getReferences().getRunningReferences();
-            if (!runningRefs.isEmpty()) {
-                throw new IllegalStateException("Cannot disable Controller Service because it is referenced (either directly or indirectly) by " + runningRefs.size() + " different components that are currently running");
-            }
-        }
-
-        this.disabled.set(disabled);
-    }
-
-    @Override
-    public Availability getAvailability() {
-        return availability.get();
-    }
-
-    @Override
-    public void setAvailability(final Availability availability) {
-        this.availability.set(availability);
-    }
-
+    
     @Override
     public ControllerService getProxiedControllerService() {
         return proxedControllerService;
@@ -132,7 +102,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
 
     @Override
     public void verifyModifiable() throws IllegalStateException {
-        if (!isDisabled()) {
+        if (getState() != ControllerServiceState.DISABLED) {
             throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
         }
     }
@@ -140,7 +110,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     @Override
     public void setProperty(final String name, final String value) {
         super.setProperty(name, value);
-        
         onConfigured();
     }
     
@@ -166,31 +135,96 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     
     @Override
     public void verifyCanDelete() {
-        if ( !isDisabled() ) {
+        if ( getState() != ControllerServiceState.DISABLED ) {
             throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled");
         }
     }
     
     @Override
     public void verifyCanDisable() {
+        verifyCanDisable(Collections.<ControllerServiceNode>emptySet());
+    }
+    
+    @Override
+    public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
+        final ControllerServiceState state = getState();
+        if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+            throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled");
+        }
+        
         final ControllerServiceReference references = getReferences();
-        final int numRunning = references.getRunningReferences().size();
-        if ( numRunning > 0 ) {
-            throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by " + numRunning + " components that are currently running");
+        
+        for ( final ConfiguredComponent activeReference : references.getActiveReferences() ) {
+            if ( !ignoreReferences.contains(activeReference) ) {
+                throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by at least one component that is currently running");
+            }
         }
     }
     
     @Override
     public void verifyCanEnable() {
-        if ( !isDisabled() ) {
+        if ( getState() != ControllerServiceState.DISABLED ) {
             throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
         }
+        
+        if ( !isValid() ) {
+            throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + getValidationErrors());
+        }
+    }
+    
+    @Override
+    public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) {
+        if (getState() != ControllerServiceState.DISABLED) {
+            throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
+        }
+        
+        final Set<String> ids = new HashSet<>();
+        for ( final ControllerServiceNode node : ignoredReferences ) {
+            ids.add(node.getIdentifier());
+        }
+        
+        final Collection<ValidationResult> validationResults = getValidationErrors(ids);
+        for ( final ValidationResult result : validationResults ) {
+            if ( !result.isValid() ) {
+                throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + result);
+            }
+        }
     }
     
     @Override
     public void verifyCanUpdate() {
-        if ( !isDisabled() ) {
+        if ( getState() != ControllerServiceState.DISABLED ) {
             throw new IllegalStateException(implementation + " cannot be updated because it is not disabled");
         }
     }
+    
+    @Override
+    public String getComments() {
+    	readLock.lock();
+    	try {
+    		return comment;
+    	} finally {
+    		readLock.unlock();
+    	}
+    }
+    
+    @Override
+    public void setComments(final String comment) {
+    	writeLock.lock();
+    	try {
+    		this.comment = comment;
+    	} finally {
+    		writeLock.unlock();
+    	}
+    }
+    
+    @Override
+    public ControllerServiceState getState() {
+        return stateRef.get();
+    }
+    
+    @Override
+    public void setState(final ControllerServiceState state) {
+        this.stateRef.set(state);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 7a8e22f..dfbfca5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -23,26 +23,39 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.nifi.annotation.lifecycle.OnAdded;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.exception.ControllerServiceAlreadyExistsException;
-import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
+import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
 import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.ObjectHolder;
 import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
@@ -55,8 +68,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
 
-    private final Map<String, ControllerServiceNode> controllerServices;
+    private final ProcessScheduler processScheduler;
+    private final ConcurrentMap<String, ControllerServiceNode> controllerServices;
     private static final Set<Method> validDisabledMethods;
+    private final BulletinRepository bulletinRepo;
 
     static {
         // methods that are okay to be called when the service is disabled.
@@ -70,10 +85,12 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         validDisabledMethods = Collections.unmodifiableSet(validMethods);
     }
 
-    public StandardControllerServiceProvider() {
+    public StandardControllerServiceProvider(final ProcessScheduler scheduler, final BulletinRepository bulletinRepo) {
         // the following 2 maps must be updated atomically, but we do not lock around them because they are modified
         // only in the createControllerService method, and both are modified before the method returns
         this.controllerServices = new ConcurrentHashMap<>();
+        this.processScheduler = scheduler;
+        this.bulletinRepo = bulletinRepo;
     }
 
     private Class<?>[] getInterfaces(final Class<?> cls) {
@@ -95,21 +112,24 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             populateInterfaces(superClass, interfacesDefinedThusFar);
         }
     }
-
+    
     @Override
     public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
         if (type == null || id == null) {
             throw new NullPointerException();
         }
-        if (controllerServices.containsKey(id)) {
-            throw new ControllerServiceAlreadyExistsException(id);
-        }
-
+        
         final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
         try {
             final ClassLoader cl = ExtensionManager.getClassLoader(type);
-            Thread.currentThread().setContextClassLoader(cl);
-            final Class<?> rawClass = Class.forName(type, false, cl);
+            final Class<?> rawClass;
+            if ( cl == null ) {
+                rawClass = Class.forName(type);
+            } else {
+                Thread.currentThread().setContextClassLoader(cl);
+                rawClass = Class.forName(type, false, cl);
+            }
+            
             final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
 
             final ControllerService originalService = controllerServiceClass.newInstance();
@@ -124,7 +144,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
                 	}
                 	
                     final ControllerServiceNode node = serviceNodeHolder.get();
-                    if (node.isDisabled() && !validDisabledMethods.contains(method)) {
+                    final ControllerServiceState state = node.getState();
+                    final boolean disabled = (state != ControllerServiceState.ENABLED); // only allow method call if service state is ENABLED.
+                    if (disabled && !validDisabledMethods.contains(method)) {
                         // Use nar class loader here because we are implicitly calling toString() on the original implementation.
                         try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
                             throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService + " because the Controller Service is disabled");
@@ -143,17 +165,22 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
                 }
             };
 
-            final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
-            logger.info("Loaded service {} as configured.", type);
+            final ControllerService proxiedService;
+            if ( cl == null ) {
+                proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), getInterfaces(controllerServiceClass), invocationHandler);
+            } else {
+                proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
+            }
+            logger.info("Created Controller Service of type {} with identifier {}", type, id);
 
-            originalService.initialize(new StandardControllerServiceInitializationContext(id, this));
+            final ComponentLog serviceLogger = new SimpleProcessLogger(id, originalService);
+            originalService.initialize(new StandardControllerServiceInitializationContext(id, serviceLogger, this));
 
             final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
 
             final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this);
             serviceNodeHolder.set(serviceNode);
-            serviceNode.setAnnotationData(null);
-            serviceNode.setName(id);
+            serviceNode.setName(rawClass.getSimpleName());
             
             if ( firstTimeAdded ) {
                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
@@ -166,7 +193,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             this.controllerServices.put(id, serviceNode);
             return serviceNode;
         } catch (final Throwable t) {
-            throw new ControllerServiceNotFoundException(t);
+            throw new ControllerServiceInstantiationException(t);
         } finally {
             if (currentContextClassLoader != null) {
                 Thread.currentThread().setContextClassLoader(currentContextClassLoader);
@@ -174,29 +201,242 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         }
     }
     
+    
+    
+    @Override
+    public void disableReferencingServices(final ControllerServiceNode serviceNode) {
+        // Get a list of all Controller Services that need to be disabled, in the order that they need to be
+        // disabled.
+        final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+        final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
+        
+        for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+            final ControllerServiceState state = nodeToDisable.getState();
+            
+            if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+                nodeToDisable.verifyCanDisable(serviceSet);
+            }
+        }
+        
+        Collections.reverse(toDisable);
+        for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+            final ControllerServiceState state = nodeToDisable.getState();
+            
+            if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+                disableControllerService(nodeToDisable);
+            }
+        }
+    }
+    
+    
+    @Override
+    public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+        // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
+        // or a service that references this controller service, etc.
+        final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
+        final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+        
+        // verify that  we can start all components (that are not disabled) before doing anything
+        for ( final ProcessorNode node : processors ) {
+            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+                node.verifyCanStart();
+            }
+        }
+        for ( final ReportingTaskNode node : reportingTasks ) {
+            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+                node.verifyCanStart();
+            }
+        }
+        
+        // start all of the components that are not disabled
+        for ( final ProcessorNode node : processors ) {
+            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+                node.getProcessGroup().startProcessor(node);
+            }
+        }
+        for ( final ReportingTaskNode node : reportingTasks ) {
+            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+                processScheduler.schedule(node);
+            }
+        }
+    }
+    
+    @Override
+    public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+        // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
+        // or a service that references this controller service, etc.
+        final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
+        final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+        
+        // verify that  we can stop all components (that are running) before doing anything
+        for ( final ProcessorNode node : processors ) {
+            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+                node.verifyCanStop();
+            }
+        }
+        for ( final ReportingTaskNode node : reportingTasks ) {
+            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+                node.verifyCanStop();
+            }
+        }
+        
+        // stop all of the components that are running
+        for ( final ProcessorNode node : processors ) {
+            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+                node.getProcessGroup().stopProcessor(node);
+            }
+        }
+        for ( final ReportingTaskNode node : reportingTasks ) {
+            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+                processScheduler.unschedule(node);
+            }
+        }
+    }
+    
     @Override
     public void enableControllerService(final ControllerServiceNode serviceNode) {
         serviceNode.verifyCanEnable();
+        processScheduler.enableControllerService(serviceNode);
+    }
+    
+    @Override
+    public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
+        final Set<ControllerServiceNode> servicesToEnable = new HashSet<>();
+        // Ensure that all nodes are already disabled
+        for ( final ControllerServiceNode serviceNode : serviceNodes ) {
+            final ControllerServiceState curState = serviceNode.getState();
+            if ( ControllerServiceState.DISABLED.equals(curState) ) {
+                servicesToEnable.add(serviceNode);
+            } else {
+                logger.warn("Cannot enable {} because it is not disabled; current state is {}", serviceNode, curState);
+            }
+        }
         
-        try (final NarCloseable x = NarCloseable.withNarLoader()) {
-            final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, this);
-            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation(), configContext);
+        // determine the order to load the services. We have to ensure that if service A references service B, then B
+        // is enabled first, and so on.
+        final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>();
+        for ( final ControllerServiceNode node : servicesToEnable ) {
+            idToNodeMap.put(node.getIdentifier(), node);
+        }
+        
+        // We can have many Controller Services dependent on one another. We can have many of these
+        // disparate lists of Controller Services that are dependent on one another. We refer to each
+        // of these as a branch.
+        final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap);
+
+        if ( branches.isEmpty() ) {
+            logger.info("No Controller Services to enable");
+            return;
+        } else {
+            logger.info("Will enable {} Controller Services", servicesToEnable.size());
+        }
+        
+        // Mark all services that are configured to be enabled as 'ENABLING'. This allows Processors, reporting tasks
+        // to be valid so that they can be scheduled.
+        for ( final List<ControllerServiceNode> branch : branches ) {
+            for ( final ControllerServiceNode nodeToEnable : branch ) {
+                nodeToEnable.setState(ControllerServiceState.ENABLING);
+            }
+        }
+        
+        final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
+        final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()));
+        for ( final List<ControllerServiceNode> branch : branches ) {
+            final Runnable enableBranchRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    logger.debug("Enabling Controller Service Branch {}", branch);
+                    
+                    for ( final ControllerServiceNode serviceNode : branch ) {
+                        try {
+                            if ( !enabledNodes.contains(serviceNode) ) {
+                                enabledNodes.add(serviceNode);
+                                
+                                logger.info("Enabling {}", serviceNode);
+                                try {
+                                    processScheduler.enableControllerService(serviceNode);
+                                } catch (final Exception e) {
+                                    logger.error("Failed to enable " + serviceNode + " due to " + e);
+                                    if ( logger.isDebugEnabled() ) {
+                                        logger.error("", e);
+                                    }
+                                    
+                                    if ( bulletinRepo != null ) {
+                                        bulletinRepo.addBulletin(BulletinFactory.createBulletin(
+                                            "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
+                                    }
+                                }
+                            }
+                            
+                            // wait for service to finish enabling.
+                            while ( ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) {
+                                try {
+                                    Thread.sleep(100L);
+                                } catch (final InterruptedException ie) {}
+                            }
+                            
+                            logger.info("State for {} is now {}", serviceNode, serviceNode.getState());
+                        } catch (final Exception e) {
+                            logger.error("Failed to enable {} due to {}", serviceNode, e.toString());
+                            if ( logger.isDebugEnabled() ) {
+                                logger.error("", e);
+                            }
+                        }
+                    }
+                }
+            };
+            
+            executor.submit(enableBranchRunnable);
+        }
+        
+        executor.shutdown();
+    }
+    
+    static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
+        final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>();
+        
+        for ( final ControllerServiceNode node : serviceNodeMap.values() ) {
+            if ( orderedNodeLists.contains(node) ) {
+                continue;   // this node is already in the list.
+            }
+            
+            final List<ControllerServiceNode> branch = new ArrayList<>();
+            determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>());
+            orderedNodeLists.add(branch);
+        }
+        
+        return orderedNodeLists;
+    }
+    
+    
+    private static void determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, final List<ControllerServiceNode> orderedNodes, final Set<ControllerServiceNode> visited) {
+        if ( visited.contains(contextNode) ) {
+            return;
         }
         
-        serviceNode.setDisabled(false);
+        for ( final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet() ) {
+            if ( entry.getKey().getControllerServiceDefinition() != null ) {
+                final String referencedServiceId = entry.getValue();
+                if ( referencedServiceId != null ) {
+                    final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId);
+                    if ( !orderedNodes.contains(referencedNode) ) {
+                        visited.add(contextNode);
+                        determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited);
+                    }
+                }
+            }
+        }
+
+        if ( !orderedNodes.contains(contextNode) ) {
+            orderedNodes.add(contextNode);
+        }
     }
     
+    
     @Override
     public void disableControllerService(final ControllerServiceNode serviceNode) {
         serviceNode.verifyCanDisable();
-
-        // We must set the service to disabled before we invoke the OnDisabled methods because the service node
-        // can throw Exceptions if we attempt to disable the service while it's known to be in use.
-        serviceNode.setDisabled(true);
-        
-        try (final NarCloseable x = NarCloseable.withNarLoader()) {
-            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
-        }
+        processScheduler.disableControllerService(serviceNode);
     }
 
     @Override
@@ -213,10 +453,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     @Override
     public boolean isControllerServiceEnabled(final String serviceIdentifier) {
         final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
-        return (node == null) ? false : !node.isDisabled();
+        return (node == null) ? false : (ControllerServiceState.ENABLED == node.getState());
     }
 
     @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) {
+        final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
+        return (node == null) ? false : (ControllerServiceState.ENABLING == node.getState());
+    }
+    
+    @Override
     public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
         return controllerServices.get(serviceIdentifier);
     }
@@ -234,6 +480,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     }
     
     @Override
+    public String getControllerServiceName(final String serviceIdentifier) {
+    	final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
+    	return node == null ? null : node.getName();
+    }
+    
     public void removeControllerService(final ControllerServiceNode serviceNode) {
         final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier());
         if ( existing == null || existing != serviceNode ) {
@@ -247,6 +498,139 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
         }
         
+        for ( final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet() ) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.getControllerServiceDefinition() != null ) {
+                final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
+                if ( value != null ) {
+                    final ControllerServiceNode referencedNode = getControllerServiceNode(value);
+                    if ( referencedNode != null ) {
+                        referencedNode.removeReference(serviceNode);
+                    }
+                }
+            }
+        }
+        
         controllerServices.remove(serviceNode.getIdentifier());
     }
+    
+    @Override
+    public Set<ControllerServiceNode> getAllControllerServices() {
+    	return new HashSet<>(controllerServices.values());
+    }
+    
+    
+    /**
+     * Returns a List of all components that reference the given referencedNode (either directly or indirectly through
+     * another service) that are also of the given componentType. The list that is returned is in the order in which they will
+     * need to be 'activated' (enabled/started).
+     * @param referencedNode
+     * @param componentType
+     * @return
+     */
+    private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) {
+        final List<T> references = new ArrayList<>();
+        
+        for ( final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents() ) {
+            if ( componentType.isAssignableFrom(referencingComponent.getClass()) ) {
+                references.add(componentType.cast(referencingComponent));
+            }
+            
+            if ( referencingComponent instanceof ControllerServiceNode ) {
+                final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent;
+                
+                // find components recursively that depend on referencingNode.
+                final List<T> recursive = findRecursiveReferences(referencingNode, componentType);
+                
+                // For anything that depends on referencing node, we want to add it to the list, but we know
+                // that it must come after the referencing node, so we first remove any existing occurrence.
+                references.removeAll(recursive);
+                references.addAll(recursive);
+            }
+        }
+        
+        return references;
+    }
+
+    
+    @Override
+    public void enableReferencingServices(final ControllerServiceNode serviceNode) {
+        final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+        enableReferencingServices(serviceNode, recursiveReferences);
+    }
+    
+    private void enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) {
+        if ( serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING ) {
+            serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
+        }
+        
+        final Set<ControllerServiceNode> ifEnabled = new HashSet<>();
+        final List<ControllerServiceNode> toEnable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+        for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+            final ControllerServiceState state = nodeToEnable.getState();
+            if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+                nodeToEnable.verifyCanEnable(ifEnabled);
+                ifEnabled.add(nodeToEnable);
+            }
+        }
+        
+        for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+            final ControllerServiceState state = nodeToEnable.getState();
+            if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+                enableControllerService(nodeToEnable);
+            }
+        }
+    }
+    
+    @Override
+    public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
+        final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+        final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
+        
+        for ( final ControllerServiceNode referencingService : referencingServices ) {
+            referencingService.verifyCanEnable(referencingServiceSet);
+        }
+    }
+    
+    @Override
+    public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+        final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+        final List<ReportingTaskNode> referencingReportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+        final List<ProcessorNode> referencingProcessors = findRecursiveReferences(serviceNode, ProcessorNode.class);
+        
+        final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
+        
+        for ( final ReportingTaskNode taskNode : referencingReportingTasks ) {
+            if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
+                taskNode.verifyCanStart(referencingServiceSet);
+            }
+        }
+        
+        for ( final ProcessorNode procNode : referencingProcessors ) {
+            if ( procNode.getScheduledState() != ScheduledState.DISABLED ) {
+                procNode.verifyCanStart(referencingServiceSet);
+            }
+        }
+    }
+    
+    @Override
+    public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
+        // Get a list of all Controller Services that need to be disabled, in the order that they need to be
+        // disabled.
+        final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+        final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
+        
+        for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+            final ControllerServiceState state = nodeToDisable.getState();
+            
+            if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+                nodeToDisable.verifyCanDisable(serviceSet);
+            }
+        }
+    }
+    
+    @Override
+    public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
+        // we can always stop referencing components
+    }
 }