You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 15:43:56 UTC

[21/62] [abbrv] incubator-nifi git commit: Squashed commit of the following:

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 4d5455f..eff523a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -16,12 +16,12 @@
  */
 package org.apache.nifi.cluster.manager.impl;
 
-import java.io.File;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.net.URI;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -41,7 +41,9 @@ import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -54,17 +56,20 @@ import javax.net.ssl.SSLContext;
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.StreamingOutput;
-import javax.xml.XMLConstants;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import javax.xml.validation.Validator;
+import javax.xml.transform.stream.StreamResult;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.context.ClusterContext;
@@ -122,12 +127,18 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.StandardFlowSerializer;
 import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
 import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.reporting.ReportingTaskProvider;
 import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
+import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
@@ -147,10 +158,12 @@ import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.events.VolatileBulletinRepository;
 import org.apache.nifi.framework.security.util.SslContextFactory;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.NiFiLog;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
+import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.remote.RemoteResourceManager;
 import org.apache.nifi.remote.RemoteSiteListener;
@@ -168,7 +181,11 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.DomUtils;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.ObjectHolder;
+import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.web.OptimisticLockingManager;
 import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.UpdateRevision;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
@@ -204,6 +221,16 @@ import org.xml.sax.SAXParseException;
 
 import com.sun.jersey.api.client.ClientResponse;
 
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.apache.nifi.web.api.entity.ReportingTasksEntity;
+
 /**
  * Provides a cluster manager implementation. The manager federates incoming
  * HTTP client requests to the nodes' external API using the HTTP protocol. The
@@ -222,7 +249,7 @@ import com.sun.jersey.api.client.ClientResponse;
  *
  * @author unattributed
  */
-public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider {
+public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider, ReportingTaskProvider {
 
     public static final String ROOT_GROUP_ID_ALIAS = "root";
     public static final String BULLETIN_CATEGORY = "Clustering";
@@ -279,6 +306,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
     public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
+    public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
 
     public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
     public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}");
@@ -290,12 +318,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public static final String PROVENANCE_URI = "/nifi-api/controller/provenance";
     public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
     public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
-
+    
+    public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
+    public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
+    public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
+    public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
+    public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
+    
     private final NiFiProperties properties;
     private final HttpRequestReplicator httpRequestReplicator;
     private final HttpResponseMapper httpResponseMapper;
     private final DataFlowManagementService dataFlowManagementService;
     private final ClusterManagerProtocolSenderListener senderListener;
+    private final OptimisticLockingManager optimisticLockingManager;
     private final StringEncryptor encryptor;
     private final Queue<Heartbeat> pendingHeartbeats = new ConcurrentLinkedQueue<>();
     private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock();
@@ -303,12 +338,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write");
 
     private final Set<Node> nodes = new HashSet<>();
-    private final Set<ReportingTaskNode> reportingTasks = new HashSet<>();
+    private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
 
     // null means the dataflow should be read from disk
     private StandardDataFlow cachedDataFlow = null;
     private NodeIdentifier primaryNodeId = null;
-    private Revision revision = new Revision(0L, "");
     private Timer heartbeatMonitor;
     private Timer heartbeatProcessor;
     private volatile ClusterServicesBroadcaster servicesBroadcaster = null;
@@ -329,7 +363,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     public WebClusterManager(final HttpRequestReplicator httpRequestReplicator, final HttpResponseMapper httpResponseMapper,
             final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener,
-            final NiFiProperties properties, final StringEncryptor encryptor) {
+            final NiFiProperties properties, final StringEncryptor encryptor, final OptimisticLockingManager optimisticLockingManager) {
 
         if (httpRequestReplicator == null) {
             throw new IllegalArgumentException("HttpRequestReplicator may not be null.");
@@ -348,11 +382,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         this.httpResponseMapper = httpResponseMapper;
         this.dataFlowManagementService = dataFlowManagementService;
         this.properties = properties;
-        this.controllerServiceProvider = new StandardControllerServiceProvider();
         this.bulletinRepository = new VolatileBulletinRepository();
         this.instanceId = UUID.randomUUID().toString();
         this.senderListener = senderListener;
         this.encryptor = encryptor;
+        this.optimisticLockingManager = optimisticLockingManager;
         senderListener.addHandler(this);
         senderListener.setBulletinRepository(bulletinRepository);
 
@@ -393,9 +427,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             public void heartbeat() {
             }
         }, this, encryptor);
+        
+        // When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only
+        // going to be scheduling Reporting Tasks. Otherwise, it would not be okay.
         processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor));
+        processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
         processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10);
         processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
+        
+        controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository);
     }
 
     public void start() throws IOException {
@@ -429,14 +469,20 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 }
 
                 // load flow
+                final ClusterDataFlow clusterDataFlow;
                 if (dataFlowManagementService.isFlowCurrent()) {
-                    final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow();
+                    clusterDataFlow = dataFlowManagementService.loadDataFlow();
                     cachedDataFlow = clusterDataFlow.getDataFlow();
                     primaryNodeId = clusterDataFlow.getPrimaryNodeId();
                 } else {
                     throw new IOException("Flow is not current.");
                 }
 
+                final byte[] serializedServices = clusterDataFlow.getControllerServices();
+                if ( serializedServices != null && serializedServices.length > 0 ) {
+                	ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState());
+                }
+                
                 // start multicast broadcasting service, if configured
                 if (servicesBroadcaster != null) {
                     servicesBroadcaster.start();
@@ -446,8 +492,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 executeSafeModeTask();
 
                 // Load and start running Reporting Tasks
-                final File taskFile = new File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
-                reportingTasks.addAll(loadReportingTasks(taskFile));
+                final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks();
+                if ( serializedReportingTasks != null && serializedReportingTasks.length > 0 ) {
+                	loadReportingTasks(serializedReportingTasks);
+                }
             } catch (final IOException ioe) {
                 logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
                 stop();
@@ -861,22 +909,17 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         reconnectionThread.start();
     }
 
-    private List<ReportingTaskNode> loadReportingTasks(final File taskConfigXml) {
-        final List<ReportingTaskNode> tasks = new ArrayList<>();
-        if (taskConfigXml == null) {
-            logger.info("No controller tasks to start");
-            return tasks;
-        }
+    private Map<String, ReportingTaskNode> loadReportingTasks(final byte[] serialized) {
+        final Map<String, ReportingTaskNode> tasks = new HashMap<>();
 
         try {
-            final URL schemaUrl = getClass().getResource("/ReportingTaskConfiguration.xsd");
-            final Document document = parse(taskConfigXml, schemaUrl);
+            final Document document = parse(serialized);
 
-            final NodeList tasksNodes = document.getElementsByTagName("tasks");
+            final NodeList tasksNodes = document.getElementsByTagName("reportingTasks");
             final Element tasksElement = (Element) tasksNodes.item(0);
 
             //optional properties for all ReportingTasks
-            for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "task")) {
+            for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "reportingTask")) {
                 //add global properties common to all tasks
                 Map<String, String> properties = new HashMap<>();
 
@@ -901,17 +944,20 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 final String taskSchedulingPeriod = DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim();
                 final String taskClass = DomUtils.getChild(taskElement, "class").getTextContent().trim();
 
-                //optional task-specific properties
-                for (final Element optionalProperty : DomUtils.getChildElementsByTagName(taskElement, "property")) {
-                    final String name = optionalProperty.getAttribute("name");
-                    final String value = optionalProperty.getTextContent().trim();
+                final String scheduleStateValue = DomUtils.getChild(taskElement, "scheduledState").getTextContent().trim();
+                final ScheduledState scheduledState = ScheduledState.valueOf(scheduleStateValue);
+                
+                // Reporting Task Properties
+                for (final Element property : DomUtils.getChildElementsByTagName(taskElement, "property")) {
+                    final String name = DomUtils.getChildText(property, "name");
+                    final String value = DomUtils.getChildText(property, "value");
                     properties.put(name, value);
                 }
 
                 //set the class to be used for the configured reporting task
                 final ReportingTaskNode reportingTaskNode;
                 try {
-                    reportingTaskNode = createReportingTask(taskClass, taskId);
+                    reportingTaskNode = createReportingTask(taskClass, taskId, false);
                 } catch (final ReportingTaskInstantiationException e) {
                     logger.error("Unable to load reporting task {} due to {}", new Object[]{taskId, e});
                     if (logger.isDebugEnabled()) {
@@ -922,27 +968,61 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
                 final ReportingTask reportingTask = reportingTaskNode.getReportingTask();
 
-                final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName, schedulingStrategy, taskSchedulingPeriod, this);
+                final ComponentLog componentLog = new SimpleProcessLogger(taskId, reportingTask);
+                final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName, 
+                        schedulingStrategy, taskSchedulingPeriod, componentLog, this);
                 reportingTask.initialize(config);
 
+                final String annotationData = DomUtils.getChildText(taskElement, "annotationData");
+                if ( annotationData != null ) {
+                    reportingTaskNode.setAnnotationData(annotationData.trim());
+                }
+                
                 final Map<PropertyDescriptor, String> resolvedProps;
                 try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
                     resolvedProps = new HashMap<>();
                     for (final Map.Entry<String, String> entry : properties.entrySet()) {
                         final PropertyDescriptor descriptor = reportingTask.getPropertyDescriptor(entry.getKey());
-                        resolvedProps.put(descriptor, entry.getValue());
+                        if ( entry.getValue() == null ) {
+                            resolvedProps.put(descriptor, descriptor.getDefaultValue());
+                        } else {
+                            resolvedProps.put(descriptor, entry.getValue());
+                        }
                     }
                 }
 
                 for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) {
-                    reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue());
+                    if ( entry.getValue() != null ) {
+                        reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue());
+                    }
+                }
+                
+                final String comments = DomUtils.getChildText(taskElement, "comment");
+                if ( comments != null ) {
+                    reportingTaskNode.setComments(comments);
                 }
 
-                processScheduler.schedule(reportingTaskNode);
-                tasks.add(reportingTaskNode);
+                reportingTaskNode.setScheduledState(scheduledState);
+                if ( ScheduledState.RUNNING.equals(scheduledState) ) {
+                    if ( reportingTaskNode.isValid() ) {
+                        try {
+                            processScheduler.schedule(reportingTaskNode);
+                        } catch (final Exception e) {
+                            logger.error("Failed to start {} due to {}", reportingTaskNode, e);
+                            if ( logger.isDebugEnabled() ) {
+                                logger.error("", e);
+                            }
+                        }
+                    } else {
+                        logger.error("Failed to start {} because it is invalid due to {}", reportingTaskNode, reportingTaskNode.getValidationErrors());
+                    }
+                }
+                
+                
+                tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode);
             }
         } catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
-            logger.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXml, t});
+            logger.error("Unable to load reporting tasks due to {}", new Object[]{t});
             if (logger.isDebugEnabled()) {
                 logger.error("", t);
             }
@@ -951,7 +1031,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return tasks;
     }
 
-    private ReportingTaskNode createReportingTask(final String type, final String id) throws ReportingTaskInstantiationException {
+    
+    @Override
+    public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
         if (type == null) {
             throw new NullPointerException();
         }
@@ -981,14 +1063,22 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
         final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler,
                 new ClusteredEventAccess(this), bulletinRepository, controllerServiceProvider, validationContextFactory);
+        taskNode.setName(task.getClass().getSimpleName());
+        
+        reportingTasks.put(id, taskNode);
+        if ( firstTimeAdded ) {
+            try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
+            } catch (final Exception e) {
+                throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
+            }
+        }
+        
         return taskNode;
     }
 
-    private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
-        final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
-        final Schema schema = schemaFactory.newSchema(schemaUrl);
+    private Document parse(final byte[] serialized) throws SAXException, ParserConfigurationException, IOException {
         final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
-        docFactory.setSchema(schema);
         final DocumentBuilder builder = docFactory.newDocumentBuilder();
 
         builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -1021,12 +1111,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         });
 
         // build the docuemnt
-        final Document document = builder.parse(xmlFile);
-
-        // ensure schema compliance
-        final Validator validator = schema.newValidator();
-        validator.validate(new DOMSource(document));
-
+        final Document document = builder.parse(new ByteArrayInputStream(serialized));
         return document;
     }
 
@@ -1287,7 +1372,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             writeLock.unlock("handleControllerStartupFailure");
         }
     }
-
+    
+    /**
+     * Adds an instance of a specified controller service.
+     *
+     * @param type
+     * @param id
+     * @param properties
+     * @return
+     */
+    @Override
+    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
+    	return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+    }
 
     @Override
     public ControllerService getControllerService(String serviceIdentifier) {
@@ -1310,11 +1407,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     }
 
     @Override
-    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
-        return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) {
+        return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier);
     }
     
     @Override
+    public String getControllerServiceName(final String serviceIdentifier) {
+    	return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
+    }
+
+    @Override
     public void removeControllerService(final ControllerServiceNode serviceNode) {
         controllerServiceProvider.removeControllerService(serviceNode);
     }
@@ -1326,10 +1428,214 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     }
     
     @Override
+    public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
+        controllerServiceProvider.enableControllerServices(serviceNodes);
+    }
+    
+    @Override
     public void disableControllerService(final ControllerServiceNode serviceNode) {
         controllerServiceProvider.disableControllerService(serviceNode);
     }
     
+    @Override
+    public Set<ControllerServiceNode> getAllControllerServices() {
+    	return controllerServiceProvider.getAllControllerServices();
+    }
+    
+    
+    @Override
+    public void disableReferencingServices(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.disableReferencingServices(serviceNode);
+    }
+    
+    @Override
+    public void enableReferencingServices(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.enableReferencingServices(serviceNode);
+    }
+    
+    @Override
+    public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.scheduleReferencingComponents(serviceNode);
+    }
+    
+    @Override
+    public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
+    }
+    
+    @Override
+    public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
+    }
+    
+    @Override
+    public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
+    }
+    
+    @Override
+    public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode);
+    }
+    
+    @Override
+    public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode);
+    }
+    
+    private byte[] serialize(final Document doc) throws TransformerException {
+    	final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    	final DOMSource domSource = new DOMSource(doc);
+        final StreamResult streamResult = new StreamResult(baos);
+
+        // configure the transformer and convert the DOM
+        final TransformerFactory transformFactory = TransformerFactory.newInstance();
+        final Transformer transformer = transformFactory.newTransformer();
+        transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+        transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+
+        // transform the document to byte stream
+        transformer.transform(domSource, streamResult);
+        return baos.toByteArray();
+    }
+    
+    private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException {
+    	final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+        final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+        final Document document = docBuilder.newDocument();
+    	final Element rootElement = document.createElement("controllerServices");
+    	document.appendChild(rootElement);
+    	
+    	for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) {
+    		StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor);
+    	}
+    	
+    	return serialize(document);
+    }
+    
+    private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException {
+    	final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+        final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+        final Document document = docBuilder.newDocument();
+    	final Element rootElement = document.createElement("reportingTasks");
+    	document.appendChild(rootElement);
+    	
+    	for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) {
+    		StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor);
+    	}
+    	
+    	return serialize(document);
+    }
+    
+    
+    public void saveControllerServices() {
+    	try {
+    		dataFlowManagementService.updateControllerServices(serializeControllerServices());
+    	} catch (final Exception e) {
+    		logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e);
+    		if ( logger.isDebugEnabled() ) {
+    			logger.error("", e);
+    		}
+    		
+    		getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(), 
+    				"Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details."));
+    	}
+    }
+    
+    public void saveReportingTasks() {
+    	try {
+    		dataFlowManagementService.updateReportingTasks(serializeReportingTasks());
+    	} catch (final Exception e) {
+    		logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e);
+    		if ( logger.isDebugEnabled() ) {
+    			logger.error("", e);
+    		}
+    		
+    		getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(), 
+    				"Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details."));
+    	}
+    }
+
+    @Override
+    public Set<ReportingTaskNode> getAllReportingTasks() {
+    	readLock.lock();
+    	try {
+    		return new HashSet<>(reportingTasks.values());
+    	} finally {
+    		readLock.unlock("getReportingTasks");
+    	}
+    }
+
+    @Override
+    public ReportingTaskNode getReportingTaskNode(final String taskId) {
+    	readLock.lock();
+    	try {
+    		return reportingTasks.get(taskId);
+    	} finally {
+    		readLock.unlock("getReportingTaskNode");
+    	}
+    }
+
+    @Override
+    public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
+        reportingTaskNode.verifyCanStart();
+       	processScheduler.schedule(reportingTaskNode);
+    }
+
+    
+    @Override
+    public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
+        reportingTaskNode.verifyCanStop();
+        processScheduler.unschedule(reportingTaskNode);
+    }
+
+    @Override
+    public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
+    	writeLock.lock();
+    	try {
+	        final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
+	        if ( existing == null || existing != reportingTaskNode ) {
+	            throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
+	        }
+	        
+	        reportingTaskNode.verifyCanDelete();
+	        
+	        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+	            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
+	        }
+	        
+	        for ( final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet() ) {
+	            final PropertyDescriptor descriptor = entry.getKey();
+	            if (descriptor.getControllerServiceDefinition() != null ) {
+	                final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
+	                if ( value != null ) {
+	                    final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
+	                    if ( serviceNode != null ) {
+	                        serviceNode.removeReference(reportingTaskNode);
+	                    }
+	                }
+	            }
+	        }
+	        
+	        reportingTasks.remove(reportingTaskNode.getIdentifier());
+    	} finally {
+    		writeLock.unlock("removeReportingTask");
+    	}
+    }
+    
+    
+    @Override
+    public void disableReportingTask(final ReportingTaskNode reportingTask) {
+        reportingTask.verifyCanDisable();
+        processScheduler.disableReportingTask(reportingTask);
+    }
+    
+    @Override
+    public void enableReportingTask(final ReportingTaskNode reportingTask) {
+        reportingTask.verifyCanEnable();
+        processScheduler.enableReportingTask(reportingTask);
+    }
+    
     
     /**
      * Handle a bulletins message.
@@ -1966,65 +2272,114 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         // check if this request can change the flow
         final boolean mutableRequest = canChangeNodeState(method, uri);
 
-        // update headers to contain cluster contextual information to send to the node
-        final Map<String, String> updatedHeaders = new HashMap<>(headers);
-        final ClusterContext clusterCtx = new ClusterContextImpl();
-        clusterCtx.setRequestSentByClusterManager(true);                 // indicate request is sent from cluster manager
-        clusterCtx.setRevision(revision);
+        final ObjectHolder<NodeResponse> holder = new ObjectHolder<>(null);
+        final UpdateRevision federateRequest = new UpdateRevision() {
+            @Override
+            public Revision execute(Revision currentRevision) {
+                // update headers to contain cluster contextual information to send to the node
+                final Map<String, String> updatedHeaders = new HashMap<>(headers);
+                final ClusterContext clusterCtx = new ClusterContextImpl();
+                clusterCtx.setRequestSentByClusterManager(true);                 // indicate request is sent from cluster manager
+                clusterCtx.setRevision(currentRevision);
+
+                // serialize cluster context and add to request header
+                final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx);
+                updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx);
+
+                // if the request is mutable, we need to verify that it is a valid request for all nodes in the cluster.
+                if (mutableRequest) {
+                    updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, "150-NodeContinue");
 
-        // serialize cluster context and add to request header
-        final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx);
-        updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx);
+                    final Set<NodeResponse> nodeResponses;
+                    if (entity == null) {
+                        nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
+                    } else {
+                        nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
+                    }
 
-        // if the request is mutable, we need to verify that it is a valid request for all nodes in the cluster.
-        if (mutableRequest) {
-            updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, "150-NodeContinue");
+                    updatedHeaders.remove(NCM_EXPECTS_HTTP_HEADER);
 
-            final Set<NodeResponse> nodeResponses;
-            if (entity == null) {
-                nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
-            } else {
-                nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
-            }
+                    for (final NodeResponse response : nodeResponses) {
+                        if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
+                            final String nodeDescription = response.getNodeId().getApiAddress() + ":" + response.getNodeId().getApiPort();
+                            final ClientResponse clientResponse = response.getClientResponse();
+                            if (clientResponse == null) {
+                                throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus());
+                            }
+                            final String nodeExplanation = clientResponse.getEntity(String.class);
+                            throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: " + nodeExplanation, response.getThrowable());
+                        }
+                    }
 
-            updatedHeaders.remove(NCM_EXPECTS_HTTP_HEADER);
+                    // set flow state to unknown to denote a mutable request replication in progress
+                    logger.debug("Setting Flow State to UNKNOWN due to mutable request to {} {}", method, uri);
+                    notifyDataFlowManagmentServiceOfFlowStateChange(PersistedFlowState.UNKNOWN);
+                }
 
-            for (final NodeResponse response : nodeResponses) {
-                if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
-                    final String nodeDescription = response.getNodeId().getApiAddress() + ":" + response.getNodeId().getApiPort();
-                    final ClientResponse clientResponse = response.getClientResponse();
-                    if (clientResponse == null) {
-                        throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus());
+                // replicate request
+                final Set<NodeResponse> nodeResponses;
+                try {
+                    if (entity == null) {
+                        nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
+                    } else {
+                        nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
+                    }
+                } catch (final UriConstructionException uce) {
+                    // request was not replicated, so mark the flow with its original state
+                    if (mutableRequest) {
+                        notifyDataFlowManagmentServiceOfFlowStateChange(originalPersistedFlowState);
                     }
-                    final String nodeExplanation = clientResponse.getEntity(String.class);
-                    throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: " + nodeExplanation, response.getThrowable());
-                }
-            }
 
-            // set flow state to unknown to denote a mutable request replication in progress
-            logger.debug("Setting Flow State to UNKNOWN due to mutable request to {} {}", method, uri);
-            notifyDataFlowManagmentServiceOfFlowStateChange(PersistedFlowState.UNKNOWN);
-        }
+                    throw uce;
+                }
 
-        // replicate request
-        final Set<NodeResponse> nodeResponses;
-        try {
-            if (entity == null) {
-                nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
-            } else {
-                nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
-            }
-        } catch (final UriConstructionException uce) {
-            // request was not replicated, so mark the flow with its original state
-            if (mutableRequest) {
-                notifyDataFlowManagmentServiceOfFlowStateChange(originalPersistedFlowState);
+                // merge the response
+                final NodeResponse clientResponse = mergeResponses(uri, method, nodeResponses, mutableRequest);
+                holder.set(clientResponse);
+                
+                // if we have a response get the updated cluster context for auditing and revision updating
+                Revision updatedRevision = null;
+                if (mutableRequest && clientResponse != null) {
+                    try {
+                        // get the cluster context from the response header
+                        final String serializedClusterContext = clientResponse.getClientResponse().getHeaders().getFirst(CLUSTER_CONTEXT_HTTP_HEADER);
+                        if (StringUtils.isNotBlank(serializedClusterContext)) {
+                            // deserialize object
+                            final Serializable clusterContextObj = WebUtils.deserializeHexToObject(serializedClusterContext);
+
+                            // if we have a valid object, audit the actions
+                            if (clusterContextObj instanceof ClusterContext) {
+                                final ClusterContext clusterContext = (ClusterContext) clusterContextObj;
+                                if (auditService != null) {
+                                    try {
+                                        auditService.addActions(clusterContext.getActions());
+                                    } catch (Throwable t) {
+                                        logger.warn("Unable to record actions: " + t.getMessage());
+                                        if (logger.isDebugEnabled()) {
+                                            logger.warn(StringUtils.EMPTY, t);
+                                        }
+                                    }
+                                }
+                                updatedRevision = clusterContext.getRevision();
+                            }
+                        }
+                    } catch (final ClassNotFoundException cnfe) {
+                        logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe);
+                    }
+                }
+                
+                return updatedRevision;
             }
-
-            throw uce;
+        };
+        
+        // federate the request and lock on the revision
+        if (mutableRequest) {
+            optimisticLockingManager.setRevision(federateRequest);
+        } else {
+            federateRequest.execute(optimisticLockingManager.getLastModification().getRevision());
         }
-
-        final NodeResponse clientResponse = mergeResponses(uri, method, nodeResponses, mutableRequest);
-        return clientResponse;
+        
+        return holder.get();
     }
 
     private static boolean isProcessorsEndpoint(final URI uri, final String method) {
@@ -2032,7 +2387,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     }
 
     private static boolean isProcessorEndpoint(final URI uri, final String method) {
-        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches()) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches()) ) {
             return true;
         } else if ("POST".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) {
             return true;
@@ -2079,13 +2434,51 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     private static boolean isProvenanceEventEndpoint(final URI uri, final String method) {
         return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
     }
+    
+    private static boolean isControllerServicesEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath());
+    }
+    
+    private static boolean isControllerServiceEndpoint(final URI uri, final String method) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath())) {
+            return true;
+        }
+
+        return false;
+    }
+    
+    private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        }
+        
+        return false;
+    }
+    
+    private static boolean isReportingTasksEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath());
+    }
+    
+    private static boolean isReportingTaskEndpoint(final URI uri, final String method) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if ("POST".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath())) {
+            return true;
+        }
+
+        return false;
+    }
 
     static boolean isResponseInterpreted(final URI uri, final String method) {
         return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method)
                 || isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method)
                 || isProcessGroupEndpoint(uri, method)
                 || isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method)
-                || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method);
+                || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method)
+                || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method)
+                || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method);
     }
 
     private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) {
@@ -2095,37 +2488,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             final NodeIdentifier nodeId = nodeEntry.getKey();
             final ProcessorDTO nodeProcessor = nodeEntry.getValue();
 
-            // get the processor's validation errors and put them into a map
-            // where the key is the validation error and the value is the set of all
-            // nodes that reported that validation error.
-            final Collection<String> nodeValidationErrors = nodeProcessor.getValidationErrors();
-            if (nodeValidationErrors != null) {
-                for (final String nodeValidationError : nodeValidationErrors) {
-                    Set<NodeIdentifier> nodeSet = validationErrorMap.get(nodeValidationError);
-                    if (nodeSet == null) {
-                        nodeSet = new HashSet<>();
-                        validationErrorMap.put(nodeValidationError, nodeSet);
-                    }
-                    nodeSet.add(nodeId);
-                }
-            }
-        }
-
-        final Set<String> normalizedValidationErrors = new HashSet<>();
-        for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) {
-            final String msg = validationEntry.getKey();
-            final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
-
-            if (nodeIds.size() == processorMap.size()) {
-                normalizedValidationErrors.add(msg);
-            } else {
-                for (final NodeIdentifier nodeId : nodeIds) {
-                    normalizedValidationErrors.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg);
-                }
-            }
+            // merge the validation errors
+            mergeValidationErrors(validationErrorMap, nodeId, nodeProcessor.getValidationErrors());
         }
 
-        processor.setValidationErrors(normalizedValidationErrors);
+        // set the merged the validation errors
+        processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, processorMap.size()));
     }
 
     private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) {
@@ -2293,7 +2661,158 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             remoteProcessGroup.setAuthorizationIssues(mergedAuthorizationIssues);
         }
     }
+    
+    private void mergeControllerServiceReferences(final Set<ControllerServiceReferencingComponentDTO> referencingComponents, final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) {
+        final Map<String, Integer> activeThreadCounts = new HashMap<>();
+        final Map<String, String> states = new HashMap<>();
+        for (final Map.Entry<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeEntry : referencingComponentMap.entrySet()) {
+            final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeEntry.getValue();
+
+            // go through all the nodes referencing components
+            if ( nodeReferencingComponents != null ) {
+                for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) {
+                    // handle active thread counts
+                    if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) {
+                        final Integer current = activeThreadCounts.get(nodeReferencingComponent.getId());
+                        if (current == null) {
+                            activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount());
+                        } else {
+                            activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current);
+                        }
+                    }
+                    
+                    // handle controller service state
+                    final String state = states.get(nodeReferencingComponent.getId());
+                    if (state == null) {
+                        if (ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState())) {
+                            states.put(nodeReferencingComponent.getId(), ControllerServiceState.DISABLING.name());
+                        } else if (ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState())) {
+                            states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name());
+                        }
+                    }
+                }
+            }
+        }            
+
+        // go through each referencing components
+        for (final ControllerServiceReferencingComponentDTO referencingComponent : referencingComponents) {
+            final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId());
+            if (activeThreadCount != null) {
+                referencingComponent.setActiveThreadCount(activeThreadCount);
+            }
+            
+            final String state = states.get(referencingComponent.getId());
+            if (state != null) {
+                referencingComponent.setState(state);
+            }
+        }
+    }
+    
+    private void mergeControllerService(final ControllerServiceDTO controllerService, final Map<NodeIdentifier, ControllerServiceDTO> controllerServiceMap) {
+        final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
+        final Set<ControllerServiceReferencingComponentDTO> referencingComponents = controllerService.getReferencingComponents();
+        final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>();
+        
+        String state = null;
+        for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : controllerServiceMap.entrySet()) {
+            final NodeIdentifier nodeId = nodeEntry.getKey();
+            final ControllerServiceDTO nodeControllerService = nodeEntry.getValue();
+            
+            if (state == null) {
+                if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) {
+                    state = ControllerServiceState.DISABLING.name();
+                } else if (ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState())) {
+                    state = ControllerServiceState.ENABLING.name();
+                }
+            }
+            
+            for (final ControllerServiceReferencingComponentDTO nodeReferencingComponents : nodeControllerService.getReferencingComponents()) {
+                nodeReferencingComponentsMap.put(nodeId, nodeReferencingComponents.getReferencingComponents());
+            }
+            
+            // merge the validation errors
+            mergeValidationErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors());
+        }
+        
+        // merge the referencing components
+        mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap);
+        
+        // store the 'transition' state is applicable
+        if (state != null) {
+            controllerService.setState(state);
+        }
+        
+        // set the merged the validation errors
+        controllerService.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, controllerServiceMap.size()));
+    }
+    
+    private void mergeReportingTask(final ReportingTaskDTO reportingTask, final Map<NodeIdentifier, ReportingTaskDTO> reportingTaskMap) {
+        final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
+
+        int activeThreadCount = 0;
+        for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : reportingTaskMap.entrySet()) {
+            final NodeIdentifier nodeId = nodeEntry.getKey();
+            final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue();
+
+            if (nodeReportingTask.getActiveThreadCount() != null) {
+                activeThreadCount += nodeReportingTask.getActiveThreadCount();
+            }
+            
+            // merge the validation errors
+            mergeValidationErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors());
+        }
+
+        // set the merged active thread counts
+        reportingTask.setActiveThreadCount(activeThreadCount);
+        
+        // set the merged the validation errors
+        reportingTask.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, reportingTaskMap.size()));
+    }
 
+    /**
+     * Merges the validation errors into the specified map, recording the corresponding node identifier.
+     * 
+     * @param validationErrorMap
+     * @param nodeId
+     * @param nodeValidationErrors 
+     */
+    public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) {
+        if (nodeValidationErrors != null) {
+            for (final String nodeValidationError : nodeValidationErrors) {
+                Set<NodeIdentifier> nodeSet = validationErrorMap.get(nodeValidationError);
+                if (nodeSet == null) {
+                    nodeSet = new HashSet<>();
+                    validationErrorMap.put(nodeValidationError, nodeSet);
+                }
+                nodeSet.add(nodeId);
+            }
+        }
+    }
+    
+    /**
+     * Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes.
+     * 
+     * @param validationErrorMap
+     * @param totalNodes
+     * @return 
+     */
+    public Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
+        final Set<String> normalizedValidationErrors = new HashSet<>();
+        for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) {
+            final String msg = validationEntry.getKey();
+            final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
+
+            if (nodeIds.size() == totalNodes) {
+                normalizedValidationErrors.add(msg);
+            } else {
+                for (final NodeIdentifier nodeId : nodeIds) {
+                    normalizedValidationErrors.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg);
+                }
+            }
+        }
+        return normalizedValidationErrors;
+    }
+    
     // requires write lock to be already acquired unless request is not mutable
     private NodeResponse mergeResponses(final URI uri, final String method, final Set<NodeResponse> nodeResponses, final boolean mutableRequest) {
         // holds the one response of all the node responses to return to the client
@@ -2583,6 +3102,126 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             event.setClusterNodeAddress(nodeId.getApiAddress() + ":" + nodeId.getApiPort());
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isControllerServiceEndpoint(uri, method)) {
+            final ControllerServiceEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+            final ControllerServiceDTO controllerService = responseEntity.getControllerService();
+            
+            final Map<NodeIdentifier, ControllerServiceDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ControllerServiceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+                final ControllerServiceDTO nodeControllerService = nodeResponseEntity.getControllerService();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeControllerService);
+            }
+            mergeControllerService(controllerService, resultsMap);
+            
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isControllerServicesEndpoint(uri, method)) {
+            final ControllerServicesEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+            final Set<ControllerServiceDTO> controllerServices = responseEntity.getControllerServices();
+            
+            final Map<String, Map<NodeIdentifier, ControllerServiceDTO>> controllerServiceMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ControllerServicesEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+                final Set<ControllerServiceDTO> nodeControllerServices = nodeResponseEntity.getControllerServices();
+
+                for (final ControllerServiceDTO nodeControllerService : nodeControllerServices) {
+                    Map<NodeIdentifier, ControllerServiceDTO> innerMap = controllerServiceMap.get(nodeControllerService.getId());
+                    if (innerMap == null) {
+                        innerMap = new HashMap<>();
+                        controllerServiceMap.put(nodeControllerService.getId(), innerMap);
+                    }
+
+                    innerMap.put(nodeResponse.getNodeId(), nodeControllerService);
+                }
+            }
+
+            for (final ControllerServiceDTO controllerService : controllerServices) {
+                final String procId = controllerService.getId();
+                final Map<NodeIdentifier, ControllerServiceDTO> mergeMap = controllerServiceMap.get(procId);
+
+                mergeControllerService(controllerService, mergeMap);
+            }
+
+            // create a new client response
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isControllerServiceReferenceEndpoint(uri, method)) {
+            final ControllerServiceReferencingComponentsEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+            final Set<ControllerServiceReferencingComponentDTO> referencingComponents = responseEntity.getControllerServiceReferencingComponents();
+            
+            final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ControllerServiceReferencingComponentsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+                final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
+            }
+            mergeControllerServiceReferences(referencingComponents, resultsMap);
+            
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isReportingTaskEndpoint(uri, method)) {
+            final ReportingTaskEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+            final ReportingTaskDTO reportingTask = responseEntity.getReportingTask();
+            
+            final Map<NodeIdentifier, ReportingTaskDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ReportingTaskEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+                final ReportingTaskDTO nodeReportingTask = nodeResponseEntity.getReportingTask();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
+            }
+            mergeReportingTask(reportingTask, resultsMap);
+            
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isReportingTasksEndpoint(uri, method)) {
+            final ReportingTasksEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+            final Set<ReportingTaskDTO> reportingTaskSet = responseEntity.getReportingTasks();
+            
+            final Map<String, Map<NodeIdentifier, ReportingTaskDTO>> reportingTaskMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ReportingTasksEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+                final Set<ReportingTaskDTO> nodeReportingTasks = nodeResponseEntity.getReportingTasks();
+
+                for (final ReportingTaskDTO nodeReportingTask : nodeReportingTasks) {
+                    Map<NodeIdentifier, ReportingTaskDTO> innerMap = reportingTaskMap.get(nodeReportingTask.getId());
+                    if (innerMap == null) {
+                        innerMap = new HashMap<>();
+                        reportingTaskMap.put(nodeReportingTask.getId(), innerMap);
+                    }
+
+                    innerMap.put(nodeResponse.getNodeId(), nodeReportingTask);
+                }
+            }
+
+            for (final ReportingTaskDTO reportingTask : reportingTaskSet) {
+                final String procId = reportingTask.getId();
+                final Map<NodeIdentifier, ReportingTaskDTO> mergeMap = reportingTaskMap.get(procId);
+
+                mergeReportingTask(reportingTask, mergeMap);
+            }
+
+            // create a new client response
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else {
             if (!nodeResponsesToDrain.isEmpty()) {
                 drainResponses(nodeResponsesToDrain);
@@ -2616,36 +3255,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     logger.warn("All nodes failed to process URI {}. As a result, no node will be disconnected from cluster", uri);
                 }
             }
-
-            // if at least one node satisfied the request, then audit the action 
-            if (hasClientResponse) {
-                try {
-                    // get the cluster context from the response header
-                    final String serializedClusterContext = clientResponse.getClientResponse().getHeaders().getFirst(CLUSTER_CONTEXT_HTTP_HEADER);
-                    if (StringUtils.isNotBlank(serializedClusterContext)) {
-                        // deserialize object
-                        final Serializable clusterContextObj = WebUtils.deserializeHexToObject(serializedClusterContext);
-
-                        // if we have a valid object, audit the actions
-                        if (clusterContextObj instanceof ClusterContext) {
-                            final ClusterContext clusterContext = (ClusterContext) clusterContextObj;
-                            if (auditService != null) {
-                                try {
-                                    auditService.addActions(clusterContext.getActions());
-                                } catch (Throwable t) {
-                                    logger.warn("Unable to record actions: " + t.getMessage());
-                                    if (logger.isDebugEnabled()) {
-                                        logger.warn(StringUtils.EMPTY, t);
-                                    }
-                                }
-                            }
-                            revision = clusterContext.getRevision();
-                        }
-                    }
-                } catch (final ClassNotFoundException cnfe) {
-                    logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe);
-                }
-            }
         }
 
         return clientResponse;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
index 7169730..d3cff3b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.cluster.spring;
 
-import java.nio.file.Paths;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.cluster.event.EventManager;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
@@ -26,11 +25,11 @@ import org.apache.nifi.cluster.manager.HttpResponseMapper;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
 import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
 import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.OptimisticLockingManager;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.FactoryBean;
 import org.springframework.context.ApplicationContext;
@@ -50,6 +49,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
     private NiFiProperties properties;
 
     private StringEncryptor encryptor;
+    
+    private OptimisticLockingManager optimisticLockingManager;
 
     @Override
     public Object getObject() throws Exception {
@@ -62,13 +63,6 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
              */
             return null;
         } else if (clusterManager == null) {
-
-            // get the service configuration path (fail early)
-            final String serviceConfigurationFile = properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE);
-            if (serviceConfigurationFile == null) {
-                throw new NullPointerException("The service configuration file has not been specified.");
-            }
-
             final HttpRequestReplicator requestReplicator = applicationContext.getBean("httpRequestReplicator", HttpRequestReplicator.class);
             final HttpResponseMapper responseMapper = applicationContext.getBean("httpResponseMapper", HttpResponseMapper.class);
             final DataFlowManagementService dataFlowService = applicationContext.getBean("dataFlowManagementService", DataFlowManagementService.class);
@@ -81,7 +75,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
                     dataFlowService,
                     senderListener,
                     properties,
-                    encryptor
+                    encryptor,
+                    optimisticLockingManager
             );
 
             // set the service broadcaster
@@ -106,10 +101,6 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
 
             // set the audit service
             clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class));
-
-            // load the controller services
-            final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile));
-            serviceLoader.loadControllerServices(clusterManager);
         }
         return clusterManager;
     }
@@ -136,4 +127,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
     public void setEncryptor(final StringEncryptor encryptor) {
         this.encryptor = encryptor;
     }
+    
+    public void setOptimisticLockingManager(OptimisticLockingManager optimisticLockingManager) {
+        this.optimisticLockingManager = optimisticLockingManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
index 68c29bc..72c7bff 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -91,10 +91,14 @@
         <property name="properties" ref="nifiProperties"/>
     </bean>
 
+    <!-- cluster manager optimistic locking manager -->
+    <bean id="clusterManagerOptimisticLockingManager" class="org.apache.nifi.web.StandardOptimisticLockingManager"/>
+
     <!-- cluster manager -->
     <bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean">
         <property name="properties" ref="nifiProperties"/>
         <property name="encryptor" ref="stringEncryptor"/>
+        <property name="optimisticLockingManager" ref="clusterManagerOptimisticLockingManager"/>
     </bean>
     
     <!-- discoverable services -->

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
index ea8c4bf..29546b5 100755
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
@@ -1 +1,2 @@
 /target
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
index ef4b72a..c44161f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
@@ -149,6 +150,16 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
                 final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
                 String value = null;
                 if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) {
+                	
+                	if ( descriptor.getControllerServiceDefinition() != null ) {
+                		if (value != null) {
+                            final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value);
+                            if (oldNode != null) {
+                                oldNode.removeReference(this);
+                            }
+                        }
+                	}
+                	
                     component.onPropertyModified(descriptor, value, null);
                     return true;
                 }
@@ -250,12 +261,17 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
         return true;
     }
 
+    
     @Override
     public Collection<ValidationResult> getValidationErrors() {
+        return getValidationErrors(Collections.<String>emptySet());
+    }
+    
+    public Collection<ValidationResult> getValidationErrors(final Set<String> serviceIdentifiersNotToValidate) {
         final List<ValidationResult> results = new ArrayList<>();
         lock.lock();
         try {
-            final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData());
+            final ValidationContext validationContext = validationContextFactory.newValidationContext(serviceIdentifiersNotToValidate, getProperties(), getAnnotationData());
 
             final Collection<ValidationResult> validationResults;
             try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
deleted file mode 100644
index 38df6f7..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-public enum Availability {
-
-    CLUSTER_MANAGER_ONLY,
-    NODE_ONLY,
-    BOTH;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index 303f540..c3b6613 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -19,8 +19,7 @@ package org.apache.nifi.controller;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.OnUnscheduled;
+import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
 public interface ProcessScheduler {
@@ -143,4 +142,28 @@ public interface ProcessScheduler {
      * @param procNode
      */
     void yield(ProcessorNode procNode);
+    
+    /**
+     * Stops scheduling the given Reporting Task to run
+     * @param taskNode
+     */
+    void unschedule(ReportingTaskNode taskNode);
+    
+    /**
+     * Begins scheduling the given Reporting Task to run
+     * @param taskNode
+     */
+    void schedule(ReportingTaskNode taskNode);
+    
+    /**
+     * Enables the Controller Service so that it can be used by Reporting Tasks and Processors
+     * @param service
+     */
+    void enableControllerService(ControllerServiceNode service);
+    
+    /**
+     * Disables the Controller Service so that it can be updated
+     * @param service
+     */
+    void disableControllerService(ControllerServiceNode service);
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index f6786fa..3189edd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -21,6 +21,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.processor.Processor;
@@ -77,4 +78,19 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
 
     public abstract void setStyle(Map<String, String> style);
 
+    /**
+     * Returns the number of threads (concurrent tasks) currently being used by this Processor
+     * @return
+     */
+    public abstract int getActiveThreadCount();
+    
+    /**
+     * Verifies that this Processor can be started if the provided set of
+     * services are enabled. This is introduced because we need to verify that all components
+     * can be started before starting any of them. In order to do that, we need to know that this
+     * component can be started if the given services are enabled, as we will then enable the given 
+     * services before starting this component.
+     * @param ignoredReferences
+     */
+    public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index fa48cb3..c932f30 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -16,18 +16,16 @@
  */
 package org.apache.nifi.controller;
 
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
 public interface ReportingTaskNode extends ConfiguredComponent {
 
-    Availability getAvailability();
-
-    void setAvailability(Availability availability);
-
     void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
 
     SchedulingStrategy getSchedulingStrategy();
@@ -53,6 +51,12 @@ public interface ReportingTaskNode extends ConfiguredComponent {
     ConfigurationContext getConfigurationContext();
 
     boolean isRunning();
+
+    /**
+     * Returns the number of threads (concurrent tasks) currently being used by this ReportingTask
+     * @return
+     */
+    int getActiveThreadCount();
     
     /**
      * Indicates the {@link ScheduledState} of this <code>ReportingTask</code>. A
@@ -68,6 +72,20 @@ public interface ReportingTaskNode extends ConfiguredComponent {
     
     void setScheduledState(ScheduledState state);
     
+    String getComments();
+    
+    void setComments(String comment);
+    
+    /**
+     * Verifies that this Reporting Task can be enabled if the provided set of
+     * services are enabled. This is introduced because we need to verify that all components
+     * can be started before starting any of them. In order to do that, we need to know that this
+     * component can be started if the given services are enabled, as we will then enable the given 
+     * services before starting this component.
+     * @param ignoredReferences
+     */
+    void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
+    
     void verifyCanStart();
     void verifyCanStop();
     void verifyCanDisable();