You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 15:43:55 UTC

[20/62] [abbrv] incubator-nifi git commit: Squashed commit of the following:

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
index df3c251..09479d5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -24,4 +25,7 @@ import org.apache.nifi.components.ValidationContext;
 public interface ValidationContextFactory {
 
     ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData);
+    
+    ValidationContext newValidationContext(Set<String> serviceIdentifiersToNotValidate, Map<PropertyDescriptor, String> properties, String annotationData);
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java
new file mode 100644
index 0000000..18cfcda
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.exception;
+
+public class ControllerServiceInstantiationException extends RuntimeException {
+
+    private static final long serialVersionUID = -544424320587059277L;
+
+    /**
+     * Constructs a default exception
+     */
+    public ControllerServiceInstantiationException() {
+        super();
+    }
+
+    /**
+     * @param message
+     */
+    public ControllerServiceInstantiationException(String message) {
+        super(message);
+    }
+
+    /**
+     * @param cause
+     */
+    public ControllerServiceInstantiationException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * @param message
+     * @param cause
+     */
+    public ControllerServiceInstantiationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
deleted file mode 100644
index 4cdbe54..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.exception;
-
-public class ControllerServiceNotFoundException extends RuntimeException {
-
-    private static final long serialVersionUID = -544424320587059277L;
-
-    /**
-     * Constructs a default exception
-     */
-    public ControllerServiceNotFoundException() {
-        super();
-    }
-
-    /**
-     * @param message
-     */
-    public ControllerServiceNotFoundException(String message) {
-        super(message);
-    }
-
-    /**
-     * @param cause
-     */
-    public ControllerServiceNotFoundException(Throwable cause) {
-        super(cause);
-    }
-
-    /**
-     * @param message
-     * @param cause
-     */
-    public ControllerServiceNotFoundException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
new file mode 100644
index 0000000..bb6f3f7
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.reporting;
+
+import java.util.Set;
+import org.apache.nifi.controller.ReportingTaskNode;
+
+/**
+ * A ReportingTaskProvider is responsible for providing management of, and access to, Reporting Tasks
+ */
+public interface ReportingTaskProvider {
+
+    /**
+     * Creates a new instance of a reporting task
+     * 
+     * @param type the type (fully qualified class name) of the reporting task to instantiate
+     * @param id the identifier for the Reporting Task
+     * @param firstTimeAdded whether or not this is the first time that the reporting task is being added
+     * to the flow. I.e., this will be true only when the user adds the reporting task to the flow, not when
+     * the flow is being restored after a restart of the software
+     * 
+     * @return the ReportingTaskNode that is used to manage the reporting task
+     * 
+     * @throws ReportingTaskInstantiationException if unable to create the Reporting Task
+     */
+    ReportingTaskNode createReportingTask(String type, String id, boolean firstTimeAdded) throws ReportingTaskInstantiationException;
+    
+    /**
+     * Returns the reporting task that has the given identifier, or <code>null</code> if no reporting task
+     * exists with that ID.
+     * 
+     * @param identifier
+     * @return
+     */
+    ReportingTaskNode getReportingTaskNode(String identifier);
+    
+    /**
+     * Returns a Set of all Reporting Tasks that exist for this service provider.
+     * @return
+     */
+    Set<ReportingTaskNode> getAllReportingTasks();
+    
+    /**
+     * Removes the given reporting task from the flow
+     * 
+     * @param reportingTask
+     * 
+     * @throws IllegalStateException if the reporting task cannot be removed because it is not stopped, or
+     * if the reporting task is not known in the flow
+     */
+    void removeReportingTask(ReportingTaskNode reportingTask);
+    
+    /**
+     * Begins scheduling the reporting task to run and invokes appropriate lifecycle methods
+     * @param reportingTask
+     * 
+     * @throws IllegalStateException if the ReportingTask's state is not STOPPED, or if the Reporting Task has active
+     * threads, or if the ReportingTask is not valid
+     */
+    void startReportingTask(ReportingTaskNode reportingTask);
+    
+    /**
+     * Stops scheduling the reporting task to run and invokes appropriate lifecycle methods
+     * @param reportingTask
+     * 
+     * @throws IllegalStateException if the ReportingTask's state is not RUNNING
+     */
+    void stopReportingTask(ReportingTaskNode reportingTask);
+    
+    
+    /**
+     * Enables the reporting task to be scheduled to run
+     * @param reportingTask
+     * 
+     * @throws IllegalStateException if the ReportingTask's state is not DISABLED
+     */
+    void enableReportingTask(ReportingTaskNode reportingTask);
+    
+    
+    /**
+     * Disables the ability to schedul the reporting task to run
+     * 
+     * @param reportingTask
+     * 
+     * @throws IllegalStateException if the ReportingTask's state is not STOPPED, or if the Reporting Task has active
+     * threads
+     */
+    void disableReportingTask(ReportingTaskNode reportingTask);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 66bad39..50bf469 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -16,7 +16,8 @@
  */
 package org.apache.nifi.controller.service;
 
-import org.apache.nifi.controller.Availability;
+import java.util.Set;
+
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
 
@@ -26,22 +27,42 @@ public interface ControllerServiceNode extends ConfiguredComponent {
     
     ControllerService getControllerServiceImplementation();
 
-    Availability getAvailability();
-
-    void setAvailability(Availability availability);
-
-    boolean isDisabled();
-
-    void setDisabled(boolean disabled);
-
+    ControllerServiceState getState();
+    void setState(ControllerServiceState state);
+    
     ControllerServiceReference getReferences();
 
     void addReference(ConfiguredComponent referringComponent);
 
     void removeReference(ConfiguredComponent referringComponent);
     
+    void setComments(String comment);
+    String getComments();
+    
     void verifyCanEnable();
     void verifyCanDisable();
+    
+    /**
+     * Verifies that this Controller Service can be disabled if the provided set of 
+     * services are also disabled. This is introduced because we can have an instance
+     * where A references B, which references C, which references A and we want
+     * to disable service C. In this case, the cycle needs to not cause us to fail, 
+     * so we want to verify that C can be disabled if A and B also are. 
+     * 
+     * @param ignoredReferences
+     */
+    void verifyCanDisable(Set<ControllerServiceNode> ignoredReferences);
+    
+    /**
+     * Verifies that this Controller Service can be enabled if the provided set of
+     * services are also enabled. This is introduced because we can have an instance where
+     * A reference B, which references C, which references A and we want to enable
+     * Service A. In this case, the cycle needs to not cause us to fail, so we want to verify
+     * that A can be enabled if A and B also are.
+     * @param ignoredReferences
+     */
+    void verifyCanEnable(Set<ControllerServiceNode> ignoredReferences);
+    
     void verifyCanDelete();
     void verifyCanUpdate();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 5f2fc2e..1901fb6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -16,6 +16,9 @@
  */
 package org.apache.nifi.controller.service;
 
+import java.util.Collection;
+import java.util.Set;
+
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.controller.ControllerServiceLookup;
 
@@ -25,7 +28,7 @@ import org.apache.nifi.controller.ControllerServiceLookup;
 public interface ControllerServiceProvider extends ControllerServiceLookup {
 
     /**
-     * Creates a new Controller Service of the given type and assigns it the given id. If <code>firstTimeadded</code>
+     * Creates a new Controller Service of the specified type and assigns it the given id. If <code>firstTimeadded</code>
      * is true, calls any methods that are annotated with {@link OnAdded}
      *
      * @param type
@@ -61,9 +64,87 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
     void enableControllerService(ControllerServiceNode serviceNode);
     
     /**
+     * Enables the collection of services. If a service in this collection depends on another service,
+     * the service being depended on must either already be enabled or must be in the collection as well.
+     * @param serviceNodes
+     */
+    void enableControllerServices(Collection<ControllerServiceNode> serviceNodes);
+    
+    /**
      * Disables the given controller service so that it cannot be used by other components. This allows
      * configuration to be updated or allows service to be removed.
      * @param serviceNode
      */
     void disableControllerService(ControllerServiceNode serviceNode);
+    
+    /**
+     * Returns a Set of all Controller Services that exist for this service provider.
+     * @return
+     */
+    Set<ControllerServiceNode> getAllControllerServices();
+    
+    /**
+     * Verifies that all running Processors and Reporting Tasks referencing the Controller Service (or a service
+     * that depends on the provided service) can be stopped. 
+     * @param serviceNode
+     * 
+     * @throws IllegalStateException if any referencing component cannot be stopped
+     */
+    void verifyCanStopReferencingComponents(ControllerServiceNode serviceNode);
+    
+    /**
+     * Recursively unschedules all schedulable components (Processors and Reporting Tasks) that reference the given
+     * Controller Service. For any Controller services that reference this one, its schedulable referencing components will also
+     * be unscheduled.
+     * @param serviceNode
+     */
+    void unscheduleReferencingComponents(ControllerServiceNode serviceNode);
+    
+    /**
+     * Verifies that all Controller Services referencing the provided Controller Service can be disabled. 
+     * @param serviceNode
+     * 
+     * @throws IllegalStateException if any referencing service cannot be disabled
+     */
+    void verifyCanDisableReferencingServices(ControllerServiceNode serviceNode);
+    
+    /**
+     * Disables any Controller Service that references the provided Controller Service. This action is performed recursively
+     * so that if service A references B and B references C, disabling references for C will first disable A, then B.
+     * @param serviceNode
+     */
+    void disableReferencingServices(ControllerServiceNode serviceNode);
+    
+    /**
+     * Verifies that all Controller Services referencing the provided ControllerService can be enabled.
+     * @param serviceNode
+     * 
+     * @throws IllegalStateException if any referencing component cannot be enabled
+     */
+    void verifyCanEnableReferencingServices(ControllerServiceNode serviceNode);
+    
+    
+    /**
+     * Enables all Controller Services that are referencing the given service. If Service A references Service B and Service
+     * B references serviceNode, Service A and B will both be enabled.
+     * @param serviceNode
+     */
+    void enableReferencingServices(ControllerServiceNode serviceNode);
+    
+    /**
+     * Verifies that all enabled Processors referencing the ControllerService (or a service that depends on 
+     * the provided service) can be scheduled to run.
+     * @param serviceNode
+     * 
+     * @throws IllegalStateException if any referencing component cannot be scheduled
+     */
+    void verifyCanScheduleReferencingComponents(ControllerServiceNode serviceNode);
+    
+    /**
+     * Schedules any schedulable component (Processor, ReportingTask) that is referencing the given Controller Service
+     * to run. This is performed recursively, so if a Processor is referencing Service A, which is referencing serviceNode,
+     * then the Processor will also be started.
+     * @param serviceNode
+     */
+    void scheduleReferencingComponents(ControllerServiceNode serviceNode);
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
index 5cb676f..67ffb6c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
@@ -41,10 +41,11 @@ public interface ControllerServiceReference {
     Set<ConfiguredComponent> getReferencingComponents();
 
     /**
-     * Returns a {@link Set} of all Processors and Reporting Tasks that are
-     * referencing the Controller Service and are running, in addition to all
+     * Returns a {@link Set} of all Processors, Reporting Tasks, and Controller Services that are
+     * referencing the Controller Service and are running (in the case of Processors and Reporting Tasks)
+     * or enabled (in the case of Controller Services)
      *
      * @return
      */
-    Set<ConfiguredComponent> getRunningReferences();
+    Set<ConfiguredComponent> getActiveReferences();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java
new file mode 100644
index 0000000..2ed8fd9
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+
+/**
+ * Represents the valid states for a Controller Service.
+ */
+public enum ControllerServiceState {
+    /**
+     * Controller Service is disabled and cannot be used.
+     */
+    DISABLED,
+    
+    /**
+     * Controller Service has been disabled but has not yet finished its lifecycle
+     * methods.
+     */
+    DISABLING,
+    
+    /**
+     * Controller Service has been enabled but has not yet finished its lifecycle methods.
+     */
+    ENABLING,
+    
+    /**
+     * Controller Service has been enabled and has finished its lifecycle methods. The Controller SErvice
+     * is ready to be used.
+     */
+    ENABLED;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 06ef203..f3fb67c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -52,6 +52,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.UserService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.DataFlow;
@@ -62,6 +63,7 @@ import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
@@ -77,6 +79,8 @@ import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.label.StandardLabel;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.reporting.ReportingTaskProvider;
+import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
 import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.CounterRepository;
@@ -103,6 +107,7 @@ import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.PortStatus;
@@ -129,6 +134,7 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.groups.StandardProcessGroup;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.LogRepositoryFactory;
@@ -161,6 +167,8 @@ import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -170,6 +178,7 @@ import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.FunnelDTO;
 import org.apache.nifi.web.api.dto.LabelDTO;
@@ -189,7 +198,7 @@ import org.slf4j.LoggerFactory;
 
 import com.sun.jersey.api.client.ClientHandlerException;
 
-public class FlowController implements EventAccess, ControllerServiceProvider, Heartbeater, QueueProvider {
+public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, Heartbeater, QueueProvider {
 
     // default repository implementations
     public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
@@ -374,7 +383,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
         this.properties = properties;
         sslContext = SslContextFactory.createSslContext(properties, false);
         extensionManager = new ExtensionManager();
-        controllerServiceProvider = new StandardControllerServiceProvider();
 
         timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
         eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
@@ -398,6 +406,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
 
         processScheduler = new StandardProcessScheduler(this, this, encryptor);
         eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
+        controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository);
 
         final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
@@ -593,7 +602,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
                             startConnectable(connectable);
                         }
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
+                        LOG.error("Unable to start {} due to {}", new Object[]{connectable, t.toString()});
+                        if ( LOG.isDebugEnabled() ) {
+                            LOG.error("", t);
+                        }
                     }
                 }
     
@@ -1063,7 +1075,23 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
 
             // Trigger any processors' methods marked with @OnShutdown to be called
             rootGroup.shutdown();
-
+            
+            // invoke any methods annotated with @OnShutdown on Controller Services
+            for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider);
+                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
+                }
+            }
+            
+            // invoke any methods annotated with @OnShutdown on Reporting Tasks
+            for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) {
+                final ConfigurationContext configContext = taskNode.getConfigurationContext();
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, taskNode.getReportingTask(), configContext);
+                }
+            }
+            
             try {
                 this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
                 this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
@@ -1402,6 +1430,30 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
             validateSnippetContents(requireNonNull(group), dto);
 
             //
+            // Instantiate Controller Services
+            //
+            for ( final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices() ) {
+                final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), true);
+                
+                serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
+                serviceNode.setComments(controllerServiceDTO.getComments());
+                serviceNode.setName(controllerServiceDTO.getName());
+            }
+            
+            // configure controller services. We do this after creating all of them in case 1 service
+            // references another service.
+            for ( final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices() ) {
+                final String serviceId = controllerServiceDTO.getId();
+                final ControllerServiceNode serviceNode = getControllerServiceNode(serviceId);
+                
+                for ( final Map.Entry<String, String> entry : controllerServiceDTO.getProperties().entrySet() ) {
+                    if ( entry.getValue() != null ) {
+                        serviceNode.setProperty(entry.getKey(), entry.getValue());
+                    }
+                }
+            }
+            
+            //
             // Instantiate the labels
             //
             for (final LabelDTO labelDTO : dto.getLabels()) {
@@ -1411,7 +1463,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
                     label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight()));
                 }
 
-                // TODO: Update the label's "style"
+                label.setStyle(labelDTO.getStyle());
                 group.addLabel(label);
             }
 
@@ -1737,14 +1789,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
         }
 
         // validate that all Processor Types and Prioritizer Types are valid
-        final List<String> processorClasses = new ArrayList<>();
+        final Set<String> processorClasses = new HashSet<>();
         for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
             processorClasses.add(c.getName());
         }
-        final List<String> prioritizerClasses = new ArrayList<>();
+        final Set<String> prioritizerClasses = new HashSet<>();
         for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
             prioritizerClasses.add(c.getName());
         }
+        final Set<String> controllerServiceClasses = new HashSet<>();
+        for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) {
+            controllerServiceClasses.add(c.getName());
+        }
 
         final Set<ProcessorDTO> allProcs = new HashSet<>();
         final Set<ConnectionDTO> allConns = new HashSet<>();
@@ -1760,6 +1816,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
                 throw new IllegalStateException("Invalid Processor Type: " + proc.getType());
             }
         }
+        
+        final Set<ControllerServiceDTO> controllerServices = templateContents.getControllerServices();
+        if (controllerServices != null) {
+            for (final ControllerServiceDTO service : controllerServices) {
+                if (!controllerServiceClasses.contains(service.getType())) {
+                    throw new IllegalStateException("Invalid Controller Service Type: " + service.getType());
+                }
+            }
+        }
 
         for (final ConnectionDTO conn : allConns) {
             final List<String> prioritizers = conn.getPrioritizers();
@@ -2480,17 +2545,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
         lookupGroup(groupId).stopProcessing();
     }
 
-    public ReportingTaskNode createReportingTask(final String type, String id) throws ReportingTaskInstantiationException {
-        return createReportingTask(type, id, true);
+    public ReportingTaskNode createReportingTask(final String type) throws ReportingTaskInstantiationException {
+        return createReportingTask(type, true);
+    }
+    
+    public ReportingTaskNode createReportingTask(final String type, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+    	return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded);
     }
     
-    public ReportingTaskNode createReportingTask(final String type, String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
-        if (type == null) {
+    @Override
+    public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+        if (type == null || id == null) {
             throw new NullPointerException();
         }
-
-        id = requireNonNull(id).intern();
-
+        
         ReportingTask task = null;
         final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
         try {
@@ -2516,8 +2584,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
 
         final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
         final ReportingTaskNode taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory);
+        taskNode.setName(task.getClass().getSimpleName());
         
         if ( firstTimeAdded ) {
+            final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask());
+            final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
+                    SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
+
+            try {
+                task.initialize(config);
+            } catch (final InitializationException ie) {
+                throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie);
+            }
+                    
             try (final NarCloseable x = NarCloseable.withNarLoader()) {
                 ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
             } catch (final Exception e) {
@@ -2529,30 +2608,33 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
         return taskNode;
     }
 
+    @Override
     public ReportingTaskNode getReportingTaskNode(final String taskId) {
         return reportingTasks.get(taskId);
     }
 
+    @Override
     public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
         if (isTerminated()) {
             throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode + " because the controller is terminated");
         }
 
         reportingTaskNode.verifyCanStart();
-        
-        processScheduler.schedule(reportingTaskNode);
+       	processScheduler.schedule(reportingTaskNode);
     }
 
+    
+    @Override
     public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
         if (isTerminated()) {
             return;
         }
 
         reportingTaskNode.verifyCanStop();
-        
         processScheduler.unschedule(reportingTaskNode);
     }
 
+    @Override
     public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
         final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
         if ( existing == null || existing != reportingTaskNode ) {
@@ -2565,44 +2647,101 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
         }
         
+        for ( final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet() ) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.getControllerServiceDefinition() != null ) {
+                final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
+                if ( value != null ) {
+                    final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
+                    if ( serviceNode != null ) {
+                        serviceNode.removeReference(reportingTaskNode);
+                    }
+                }
+            }
+        }
+        
         reportingTasks.remove(reportingTaskNode.getIdentifier());
     }
     
-    Collection<ReportingTaskNode> getReportingTasks() {
-        return reportingTasks.values();
+    @Override
+    public Set<ReportingTaskNode> getAllReportingTasks() {
+        return new HashSet<>(reportingTasks.values());
     }
 
-
+    @Override
+    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
+        return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+    }
+    
+    @Override
     public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
         reportingTaskNode.verifyCanEnable();
-        
         processScheduler.enableReportingTask(reportingTaskNode);
     }
     
+    @Override
     public void disableReportingTask(final ReportingTaskNode reportingTaskNode) {
         reportingTaskNode.verifyCanDisable();
-        
         processScheduler.disableReportingTask(reportingTaskNode);
     }
     
     @Override
+    public void disableReferencingServices(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.disableReferencingServices(serviceNode);
+    }
+    
+    @Override
+    public void enableReferencingServices(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.enableReferencingServices(serviceNode);
+    }
+    
+    @Override
+    public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.scheduleReferencingComponents(serviceNode);
+    }
+    
+    @Override
+    public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
+    }
+    
+    @Override
     public void enableControllerService(final ControllerServiceNode serviceNode) {
-        serviceNode.verifyCanEnable();
         controllerServiceProvider.enableControllerService(serviceNode);
     }
     
     @Override
+    public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
+        controllerServiceProvider.enableControllerServices(serviceNodes);
+    }
+    
+    @Override
     public void disableControllerService(final ControllerServiceNode serviceNode) {
         serviceNode.verifyCanDisable();
         controllerServiceProvider.disableControllerService(serviceNode);
     }
-
+    
     @Override
-    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
-        return controllerServiceProvider.createControllerService(type, id.intern(), firstTimeAdded);
+    public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
+    }
+    
+    @Override
+    public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
     }
 
     @Override
+    public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode);
+    }
+    
+    @Override
+    public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode);
+    }
+    
+    @Override
     public ControllerService getControllerService(final String serviceIdentifier) {
         return controllerServiceProvider.getControllerService(serviceIdentifier);
     }
@@ -2623,10 +2762,24 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
     }
 
     @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) {
+        return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier);
+    }
+    
+    @Override
+    public String getControllerServiceName(final String serviceIdentifier) {
+    	return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
+    }
+
     public void removeControllerService(final ControllerServiceNode serviceNode) {
         controllerServiceProvider.removeControllerService(serviceNode);
     }
     
+    @Override
+    public Set<ControllerServiceNode> getAllControllerServices() {
+    	return controllerServiceProvider.getAllControllerServices();
+    }
+    
     //
     // Counters
     //

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
index c67181a..85ad159 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.connectable.Size;
+import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
@@ -33,6 +34,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.DomUtils;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.FunnelDTO;
 import org.apache.nifi.web.api.dto.LabelDTO;
@@ -42,7 +44,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
 
@@ -77,6 +79,40 @@ public class FlowFromDOMFactory {
 
         return styles;
     }
+    
+    public static ControllerServiceDTO getControllerService(final Element element, final StringEncryptor encryptor) {
+    	final ControllerServiceDTO dto = new ControllerServiceDTO();
+    	
+    	dto.setId(getString(element, "id"));
+    	dto.setName(getString(element, "name"));
+    	dto.setComments(getString(element, "comment"));
+    	dto.setType(getString(element, "class"));
+
+    	final boolean enabled = getBoolean(element, "enabled");
+    	dto.setState(enabled ? ControllerServiceState.ENABLED.name() : ControllerServiceState.DISABLED.name());
+    	
+        dto.setProperties(getProperties(element, encryptor));
+        dto.setAnnotationData(getString(element, "annotationData"));
+
+        return dto;
+    }
+    
+    public static ReportingTaskDTO getReportingTask(final Element element, final StringEncryptor encryptor) {
+    	final ReportingTaskDTO dto = new ReportingTaskDTO();
+    	
+    	dto.setId(getString(element, "id"));
+    	dto.setName(getString(element, "name"));
+    	dto.setComments(getString(element, "comment"));
+    	dto.setType(getString(element, "class"));
+    	dto.setSchedulingPeriod(getString(element, "schedulingPeriod"));
+    	dto.setState(getString(element, "scheduledState"));
+    	dto.setSchedulingStrategy(getString(element, "schedulingStrategy"));
+    	
+    	dto.setProperties(getProperties(element, encryptor));
+    	dto.setAnnotationData(getString(element, "annotationData"));
+
+    	return dto;
+    }
 
     public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor) {
         final ProcessGroupDTO dto = new ProcessGroupDTO();
@@ -310,7 +346,6 @@ public class FlowFromDOMFactory {
         final ProcessorConfigDTO configDto = new ProcessorConfigDTO();
         dto.setConfig(configDto);
         configDto.setComments(getString(element, "comment"));
-        configDto.setAnnotationData(getString(element, "annotationData"));
         configDto.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
         final String schedulingPeriod = getString(element, "schedulingPeriod");
         configDto.setSchedulingPeriod(schedulingPeriod);
@@ -334,14 +369,8 @@ public class FlowFromDOMFactory {
             configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos));
         }
 
-        final LinkedHashMap<String, String> properties = new LinkedHashMap<>();
-        final List<Element> propertyNodeList = getChildrenByTagName(element, "property");
-        for (final Element propertyElement : propertyNodeList) {
-            final String name = getString(propertyElement, "name");
-            final String value = decrypt(getString(propertyElement, "value"), encryptor);
-            properties.put(name, value);
-        }
-        configDto.setProperties(properties);
+        configDto.setProperties(getProperties(element, encryptor));
+        configDto.setAnnotationData(getString(element, "annotationData"));
 
         final Set<String> autoTerminatedRelationships = new HashSet<>();
         final List<Element> autoTerminateList = getChildrenByTagName(element, "autoTerminatedRelationship");
@@ -353,6 +382,17 @@ public class FlowFromDOMFactory {
         return dto;
     }
 
+    private static LinkedHashMap<String, String> getProperties(final Element element, final StringEncryptor encryptor) {
+    	final LinkedHashMap<String, String> properties = new LinkedHashMap<>();
+        final List<Element> propertyNodeList = getChildrenByTagName(element, "property");
+        for (final Element propertyElement : propertyNodeList) {
+            final String name = getString(propertyElement, "name");
+            final String value = decrypt(getString(propertyElement, "value"), encryptor);
+            properties.put(name, value);
+        }
+        return properties;
+    }
+    
     private static String getString(final Element element, final String childElementName) {
         final List<Element> nodeList = getChildrenByTagName(element, childElementName);
         if (nodeList == null || nodeList.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
index e08a94d..7cd9d3b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
@@ -40,6 +40,8 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.connectable.Size;
 import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.groups.ProcessGroup;
@@ -47,7 +49,6 @@ import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
-
 import org.w3c.dom.DOMException;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -79,6 +80,18 @@ public class StandardFlowSerializer implements FlowSerializer {
             addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
             addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
             addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup");
+            
+            final Element controllerServicesNode = doc.createElement("controllerServices");
+            rootNode.appendChild(controllerServicesNode);
+            for ( final ControllerServiceNode serviceNode : controller.getAllControllerServices() ) {
+            	addControllerService(controllerServicesNode, serviceNode, encryptor);
+            }
+            
+            final Element reportingTasksNode = doc.createElement("reportingTasks");
+            rootNode.appendChild(reportingTasksNode);
+            for ( final ReportingTaskNode taskNode : controller.getAllReportingTasks() ) {
+            	addReportingTask(reportingTasksNode, taskNode, encryptor);
+            }
 
             final DOMSource domSource = new DOMSource(doc);
             final StreamResult streamResult = new StreamResult(new BufferedOutputStream(os));
@@ -300,8 +313,16 @@ public class StandardFlowSerializer implements FlowSerializer {
         addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
         addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
 
-        // properties.
-        for (final Map.Entry<PropertyDescriptor, String> entry : processor.getProperties().entrySet()) {
+        addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor);
+        
+        for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
+            addTextElement(element, "autoTerminatedRelationship", rel.getName());
+        }
+    }
+    
+    private static void addConfiguration(final Element element, final Map<PropertyDescriptor, String> properties, final String annotationData, final StringEncryptor encryptor) {
+    	final Document doc = element.getOwnerDocument();
+    	for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
             final PropertyDescriptor descriptor = entry.getKey();
             String value = entry.getValue();
 
@@ -322,14 +343,9 @@ public class StandardFlowSerializer implements FlowSerializer {
             element.appendChild(propElement);
         }
 
-        final String annotationData = processor.getAnnotationData();
         if (annotationData != null) {
             addTextElement(element, "annotationData", annotationData);
         }
-
-        for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
-            addTextElement(element, "autoTerminatedRelationship", rel.getName());
-        }
     }
 
     private void addConnection(final Element parentElement, final Connection connection) {
@@ -390,11 +406,43 @@ public class StandardFlowSerializer implements FlowSerializer {
         parentElement.appendChild(element);
     }
 
-    private void addTextElement(final Element element, final String name, final long value) {
+    
+    public static void addControllerService(final Element element, final ControllerServiceNode serviceNode, final StringEncryptor encryptor) {
+    	final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
+    	addTextElement(serviceElement, "id", serviceNode.getIdentifier());
+    	addTextElement(serviceElement, "name", serviceNode.getName());
+    	addTextElement(serviceElement, "comment", serviceNode.getComments());
+    	addTextElement(serviceElement, "class", serviceNode.getControllerServiceImplementation().getClass().getCanonicalName());
+    	
+    	final ControllerServiceState state = serviceNode.getState();
+    	final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
+        addTextElement(serviceElement, "enabled", String.valueOf(enabled));
+        
+        addConfiguration(serviceElement, serviceNode.getProperties(), serviceNode.getAnnotationData(), encryptor);
+        
+    	element.appendChild(serviceElement);
+    }
+    
+    public static void addReportingTask(final Element element, final ReportingTaskNode taskNode, final StringEncryptor encryptor) {
+    	final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
+    	addTextElement(taskElement, "id", taskNode.getIdentifier());
+    	addTextElement(taskElement, "name", taskNode.getName());
+    	addTextElement(taskElement, "comment", taskNode.getComments());
+    	addTextElement(taskElement, "class", taskNode.getReportingTask().getClass().getCanonicalName());
+        addTextElement(taskElement, "schedulingPeriod", taskNode.getSchedulingPeriod());
+        addTextElement(taskElement, "scheduledState", taskNode.getScheduledState().name());
+        addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name());
+    	
+    	addConfiguration(taskElement, taskNode.getProperties(), taskNode.getAnnotationData(), encryptor);
+    	
+    	element.appendChild(taskElement);
+    }
+    
+    private static void addTextElement(final Element element, final String name, final long value) {
         addTextElement(element, name, String.valueOf(value));
     }
 
-    private void addTextElement(final Element element, final String name, final String value) {
+    private static void addTextElement(final Element element, final String name, final String value) {
         final Document doc = element.getOwnerDocument();
         final Element toAdd = doc.createElement(name);
         toAdd.setTextContent(value);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 64ce5c4..fcfee83 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -81,8 +81,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
     private final FlowController controller;
     private final Path flowXml;
-    private final Path taskConfigXml;
-    private final Path serviceConfigXml;
     private final FlowConfigurationDAO dao;
     private final int gracefulShutdownSeconds;
     private final boolean autoResumeState;
@@ -154,14 +152,12 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         this.controller = controller;
         this.encryptor = encryptor;
         flowXml = Paths.get(properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE));
-        taskConfigXml = Paths.get(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
-        serviceConfigXml = Paths.get(properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE));
 
         gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS);
         autoResumeState = properties.getAutoResumeState();
         connectionRetryMillis = (int) FormatUtils.getTimeDuration(properties.getClusterManagerFlowRetrievalDelay(), TimeUnit.MILLISECONDS);
 
-        dao = new StandardXMLFlowConfigurationDAO(flowXml, taskConfigXml, serviceConfigXml, encryptor);
+        dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor);
 
         if (configuredForClustering) {
 
@@ -605,7 +601,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         if (firstControllerInitialization) {
             // load the controller services
             logger.debug("Loading controller services");
-            dao.loadControllerServices(controller);
         }
 
         // load the flow
@@ -622,7 +617,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             logger.debug("First controller initialization. Loading reporting tasks and initializing controller.");
 
             // load the controller tasks
-            dao.loadReportingTasks(controller);
+//            dao.loadReportingTasks(controller);
 
             // initialize the flow
             controller.initializeFlow();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 05a8f01..201482c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -26,6 +26,7 @@ import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +41,7 @@ import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.validation.Schema;
 import javax.xml.validation.SchemaFactory;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.connectable.Connectable;
@@ -51,23 +53,35 @@ import org.apache.nifi.connectable.Position;
 import org.apache.nifi.connectable.Size;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+import org.apache.nifi.controller.service.ControllerServiceLoader;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.util.file.FileUtils;
 import org.apache.nifi.fingerprint.FingerprintException;
 import org.apache.nifi.fingerprint.FingerprintFactory;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.DomUtils;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.file.FileUtils;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.FunnelDTO;
 import org.apache.nifi.web.api.dto.LabelDTO;
@@ -77,9 +91,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
@@ -96,9 +108,11 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
     private static final Logger logger = LoggerFactory.getLogger(StandardFlowSynchronizer.class);
     public static final URL FLOW_XSD_RESOURCE = StandardFlowSynchronizer.class.getResource("/FlowConfiguration.xsd");
     private final StringEncryptor encryptor;
+    private final boolean autoResumeState;
 
     public StandardFlowSynchronizer(final StringEncryptor encryptor) {
         this.encryptor = encryptor;
+        autoResumeState = NiFiProperties.getInstance().getAutoResumeState();
     }
 
     public static boolean isEmpty(final DataFlow dataFlow, final StringEncryptor encryptor) {
@@ -157,10 +171,26 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
                         controller.setMaxEventDrivenThreadCount(maxThreadCount / 3);
                     }
 
+                    final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks");
+                    final List<Element> taskElements;
+                    if ( reportingTasksElement == null ) {
+                        taskElements = Collections.emptyList();
+                    } else {
+                        taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
+                    }
+                    
+                    final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices");
+                    final List<Element> controllerServiceElements;
+                    if ( controllerServicesElement == null ) {
+                        controllerServiceElements = Collections.emptyList();
+                    } else {
+                        controllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
+                    }
+                    
                     logger.trace("Parsing process group from DOM");
                     final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
                     final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor);
-                    existingFlowEmpty = isEmpty(rootGroupDto);
+                    existingFlowEmpty = taskElements.isEmpty() && controllerServiceElements.isEmpty() && isEmpty(rootGroupDto);
                     logger.debug("Existing Flow Empty = {}", existingFlowEmpty);
                 }
             }
@@ -200,37 +230,64 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         // create document by parsing proposed flow bytes
         logger.trace("Parsing proposed flow bytes as DOM document");
         final Document configuration = parseFlowBytes(proposedFlow.getFlow());
-
+        
         // attempt to sync controller with proposed flow
         try {
             if (configuration != null) {
-                // get the root element
-                final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0);
-
-                // set controller config
-                logger.trace("Updating flow config");
-                final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount");
-                if (maxThreadCount == null) {
-                    controller.setMaxTimerDrivenThreadCount(getInt(rootElement, "maxTimerDrivenThreadCount"));
-                    controller.setMaxEventDrivenThreadCount(getInt(rootElement, "maxEventDrivenThreadCount"));
-                } else {
-                    controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3);
-                    controller.setMaxEventDrivenThreadCount(maxThreadCount / 3);
-                }
-
-                // get the root group XML element
-                final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
-
-                // if this controller isn't initialized or its emtpy, add the root group, otherwise update
-                if (!initialized || existingFlowEmpty) {
-                    logger.trace("Adding root process group");
-                    addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
-                } else {
-                    logger.trace("Updating root process group");
-                    updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
+                synchronized (configuration) {
+                    // get the root element
+                    final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0);
+    
+                    // set controller config
+                    logger.trace("Updating flow config");
+                    final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount");
+                    if (maxThreadCount == null) {
+                        controller.setMaxTimerDrivenThreadCount(getInt(rootElement, "maxTimerDrivenThreadCount"));
+                        controller.setMaxEventDrivenThreadCount(getInt(rootElement, "maxEventDrivenThreadCount"));
+                    } else {
+                        controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3);
+                        controller.setMaxEventDrivenThreadCount(maxThreadCount / 3);
+                    }
+    
+                    // get the root group XML element
+                    final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
+    
+                    final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices");
+    	            if ( controllerServicesElement != null ) {
+    	                final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
+    	                
+    	                if ( !initialized || existingFlowEmpty ) {
+    	                    ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState);
+    	                } else {
+    	                    for ( final Element serviceElement : serviceElements ) {
+    	                        updateControllerService(controller, serviceElement, encryptor);
+    	                    }
+    	                }
+                    }
+    
+                    // if this controller isn't initialized or its emtpy, add the root group, otherwise update
+                    if (!initialized || existingFlowEmpty) {
+                        logger.trace("Adding root process group");
+                        addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
+                    } else {
+                        logger.trace("Updating root process group");
+                        updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
+                    }
+    
+                    final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks");
+                    if ( reportingTasksElement != null ) {
+                    	final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
+                    	for ( final Element taskElement : taskElements ) {
+                    		if ( !initialized || existingFlowEmpty ) {
+                    			addReportingTask(controller, taskElement, encryptor);
+                    		} else {
+                    			updateReportingTask(controller, taskElement, encryptor);
+                    		}
+                    	}
+                    }
                 }
             }
-
+    
             logger.trace("Synching templates");
             if ((existingTemplates == null || existingTemplates.length == 0) && proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length > 0) {
                 // need to load templates
@@ -313,7 +370,124 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
 
         return baos.toByteArray();
     }
+    
+    
+    private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) {
+    	final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+    	
+    	final ControllerServiceState dtoState = ControllerServiceState.valueOf(dto.getState());
+        final boolean dtoEnabled = (dtoState == ControllerServiceState.ENABLED || dtoState == ControllerServiceState.ENABLING);
+        
+        final ControllerServiceNode serviceNode = controller.getControllerServiceNode(dto.getId());
+        final ControllerServiceState serviceState = serviceNode.getState();
+        final boolean serviceEnabled = (serviceState == ControllerServiceState.ENABLED || serviceState == ControllerServiceState.ENABLING);
+        
+    	if (dtoEnabled && !serviceEnabled) {
+    		controller.enableControllerService(controller.getControllerServiceNode(dto.getId()));
+    	} else if (!dtoEnabled && serviceEnabled) {
+    		controller.disableControllerService(controller.getControllerServiceNode(dto.getId()));
+    	}
+    }
+    
+    private void addReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) throws ReportingTaskInstantiationException {
+    	final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
+    	
+    	final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false);
+    	reportingTask.setName(dto.getName());
+    	reportingTask.setComments(dto.getComments());
+    	reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
+    	reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
+    	
+    	reportingTask.setAnnotationData(dto.getAnnotationData());
+    	
+        for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
+            if (entry.getValue() == null) {
+            	reportingTask.removeProperty(entry.getKey());
+            } else {
+            	reportingTask.setProperty(entry.getKey(), entry.getValue());
+            }
+        }
+        
+        final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
+        final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(),
+                SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller);
+        
+        try {
+            reportingTask.getReportingTask().initialize(config);
+        } catch (final InitializationException ie) {
+            throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie);
+        }
+        
+        if ( autoResumeState ) {
+	        if ( ScheduledState.RUNNING.name().equals(dto.getState()) ) {
+	        	try {
+	        		controller.startReportingTask(reportingTask);
+	        	} catch (final Exception e) {
+	        		logger.error("Failed to start {} due to {}", reportingTask, e);
+	        		if ( logger.isDebugEnabled() ) {
+	        			logger.error("", e);
+	        		}
+	        		controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+	        				"Reporting Tasks", Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e));
+	        	}
+	        } else if ( ScheduledState.DISABLED.name().equals(dto.getState()) ) {
+	        	try {
+	        		controller.disableReportingTask(reportingTask);
+	        	} catch (final Exception e) {
+	        		logger.error("Failed to mark {} as disabled due to {}", reportingTask, e);
+	        		if ( logger.isDebugEnabled() ) {
+	        			logger.error("", e);
+	        		}
+	        		controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+	        				"Reporting Tasks", Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to " + e));
+	        	}
+	        }
+        }
+    }
 
+    private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) {
+    	final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
+    	final ReportingTaskNode taskNode = controller.getReportingTaskNode(dto.getId());
+    	
+        if (!taskNode.getScheduledState().name().equals(dto.getState())) {
+            try {
+                switch (ScheduledState.valueOf(dto.getState())) {
+                    case DISABLED:
+                    	if ( taskNode.isRunning() ) {
+                    		controller.stopReportingTask(taskNode);
+                    	}
+                    	controller.disableReportingTask(taskNode);
+                        break;
+                    case RUNNING:
+                    	if ( taskNode.getScheduledState() == ScheduledState.DISABLED ) {
+                    		controller.enableReportingTask(taskNode);
+                    	}
+                    	controller.startReportingTask(taskNode);
+                        break;
+                    case STOPPED:
+                        if (taskNode.getScheduledState() == ScheduledState.DISABLED) {
+                        	controller.enableReportingTask(taskNode);
+                        } else if (taskNode.getScheduledState() == ScheduledState.RUNNING) {
+                        	controller.stopReportingTask(taskNode);
+                        }
+                        break;
+                }
+            } catch (final IllegalStateException ise) {
+                logger.error("Failed to change Scheduled State of {} from {} to {} due to {}", taskNode, taskNode.getScheduledState().name(), dto.getState(), ise.toString());
+                logger.error("", ise);
+
+                // create bulletin for the Processor Node
+                controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Node Reconnection", Severity.ERROR.name(),
+                        "Failed to change Scheduled State of " + taskNode + " from " + taskNode.getScheduledState().name() + " to " + dto.getState() + " due to " + ise.toString()));
+
+                // create bulletin at Controller level.
+                controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Node Reconnection", Severity.ERROR.name(),
+                        "Failed to change Scheduled State of " + taskNode + " from " + taskNode.getScheduledState().name() + " to " + dto.getState() + " due to " + ise.toString()));
+            }
+        }
+    }
+    
+    
     private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) throws ProcessorInstantiationException {
 
         // get the parent group ID

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index fe72ae4..355e303 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -52,6 +52,7 @@ import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Position;
+import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.logging.LogLevel;
@@ -120,7 +121,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
     private SchedulingStrategy schedulingStrategy;  // guarded by read/write lock
 
     @SuppressWarnings("deprecation")
-    StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory,
+    public StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory,
             final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider) {
         super(processor, uuid, validationContextFactory, controllerServiceProvider);
 
@@ -985,6 +986,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
             readLock.unlock();
         }
     }
+    
+    @Override
+    public int getActiveThreadCount() {
+        readLock.lock();
+        try {
+            return processScheduler.getActiveThreadCount(this);
+        } finally {
+            readLock.unlock();
+        }
+    }
 
     @Override
     public boolean isValid() {
@@ -1182,8 +1193,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
     public void verifyCanStart() {
         readLock.lock();
         try {
-            if (scheduledState.get() != ScheduledState.STOPPED) {
-                throw new IllegalStateException(this + " is not stopped");
+            switch (getScheduledState()) {
+                case DISABLED:
+                    throw new IllegalStateException(this + " cannot be started because it is disabled");
+                case RUNNING:
+                    throw new IllegalStateException(this + " cannot be started because it is already running");
+                case STOPPED:
+                    break;
             }
             verifyNoActiveThreads();
 
@@ -1194,6 +1210,31 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
             readLock.unlock();
         }
     }
+    
+    @Override
+    public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
+        switch (getScheduledState()) {
+            case DISABLED:
+                throw new IllegalStateException(this + " cannot be started because it is disabled");
+            case RUNNING:
+                throw new IllegalStateException(this + " cannot be started because it is already running");
+            case STOPPED:
+                break;
+        }
+        verifyNoActiveThreads();
+        
+        final Set<String> ids = new HashSet<>();
+        for ( final ControllerServiceNode node : ignoredReferences ) {
+            ids.add(node.getIdentifier());
+        }
+        
+        final Collection<ValidationResult> validationResults = getValidationErrors(ids);
+        for ( final ValidationResult result : validationResults ) {
+            if ( !result.isValid() ) {
+                throw new IllegalStateException(this + " cannot be started because it is not valid: " + result);
+            }
+        }
+    }
 
     @Override
     public void verifyCanStop() {