You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/30 19:01:04 UTC

[1/7] incubator-nifi git commit: NIFI-250: Do not try to load controller services and reporting tasks from old .xml files because they are now in flow.xml

Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-250 e7beef8d2 -> 4ae2a10e6


NIFI-250: Do not try to load controller services and reporting tasks from old .xml files because they are now in flow.xml


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/9a6acab3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/9a6acab3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/9a6acab3

Branch: refs/heads/NIFI-250
Commit: 9a6acab3734fbbb670fb5db2388ac85ef9b6fc6d
Parents: 60ad998
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jan 29 11:24:56 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jan 29 11:24:56 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/controller/StandardFlowService.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a6acab3/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index d459b00..790485c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -608,7 +608,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         if (firstControllerInitialization) {
             // load the controller services
             logger.debug("Loading controller services");
-            dao.loadControllerServices(controller);
+//            dao.loadControllerServices(controller);
         }
 
         // load the flow
@@ -625,7 +625,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             logger.debug("First controller initialization. Loading reporting tasks and initializing controller.");
 
             // load the controller tasks
-            dao.loadReportingTasks(controller);
+//            dao.loadReportingTasks(controller);
 
             // initialize the flow
             controller.initializeFlow();


[6/7] incubator-nifi git commit: NIFI-250: Change Availability to just NODE or NCM; update NCM to store controller services and reporting tasks in separate entries in tar file instead of in flow.xml

Posted by ma...@apache.org.
NIFI-250: Change Availability to just NODE or NCM; update NCM to store controller services and reporting tasks in separate entries in tar file instead of in flow.xml


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/52149d85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/52149d85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/52149d85

Branch: refs/heads/NIFI-250
Commit: 52149d8510d8c517063bfd6359bde3950862db77
Parents: 3102e08
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 12:44:30 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 12:44:30 2015 -0500

----------------------------------------------------------------------
 .../nifi/cluster/flow/ClusterDataFlow.java      |  15 +-
 .../cluster/flow/DataFlowManagementService.java |  17 ++
 .../nifi/cluster/flow/impl/DataFlowDaoImpl.java |  16 +-
 .../impl/DataFlowManagementServiceImpl.java     |  65 ++++++-
 .../cluster/manager/impl/WebClusterManager.java | 175 +++++++++++++++++--
 .../reporting/ClusteredReportingTaskNode.java   |   3 +-
 .../apache/nifi/controller/Availability.java    |  12 +-
 .../nifi/controller/ReportingTaskNode.java      |   2 -
 .../service/ControllerServiceNode.java          |   2 -
 .../service/ControllerServiceProvider.java      |   7 +-
 .../apache/nifi/controller/FlowController.java  |  65 +++----
 .../nifi/controller/StandardFlowSerializer.java |  20 +--
 .../controller/StandardFlowSynchronizer.java    |   6 +-
 .../reporting/AbstractReportingTaskNode.java    |  12 +-
 .../reporting/StandardReportingTaskNode.java    |   3 +-
 .../service/ControllerServiceLoader.java        |   8 +-
 .../service/StandardControllerServiceNode.java  |  12 +-
 .../StandardControllerServiceProvider.java      |   9 +-
 .../nifi/web/controller/ControllerFacade.java   |   9 +-
 19 files changed, 337 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
index eedb88f..c17b429 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
@@ -27,14 +27,25 @@ import org.apache.nifi.cluster.protocol.StandardDataFlow;
 public class ClusterDataFlow {
 
     private final StandardDataFlow dataFlow;
-
     private final NodeIdentifier primaryNodeId;
+    private final byte[] controllerServices;
+    private final byte[] reportingTasks;
 
-    public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId) {
+    public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId, final byte[] controllerServices, final byte[] reportingTasks) {
         this.dataFlow = dataFlow;
         this.primaryNodeId = primaryNodeId;
+        this.controllerServices = controllerServices;
+        this.reportingTasks = reportingTasks;
     }
 
+    public byte[] getControllerServices() {
+    	return controllerServices;
+    }
+    
+    public byte[] getReportingTasks() {
+    	return reportingTasks;
+    }
+    
     public NodeIdentifier getPrimaryNodeId() {
         return primaryNodeId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
index 339d904..082d65e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.cluster.flow;
 
 import java.util.Set;
+
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
 /**
@@ -67,6 +68,22 @@ public interface DataFlowManagementService {
     void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException;
 
     /**
+     * Updates the dataflow with the given serialized form of the Controller Services that are to exist on the NCM.
+     * 
+     * @param serializedControllerServices
+     * @throws DaoException
+     */
+    void updateControllerServices(byte[] serializedControllerServices) throws DaoException;
+    
+    /**
+     * Updates the dataflow with the given serialized form of Reporting Tasks that are to exist on the NCM.
+     * 
+     * @param serviceNodes
+     * @throws DaoException
+     */
+    void updateReportingTasks(byte[] serializedReportingTasks) throws DaoException;
+    
+    /**
      * Sets the state of the flow.
      *
      * @param flowState the state

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
index 72b594a..0d7caf3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
@@ -111,6 +111,8 @@ public class DataFlowDaoImpl implements DataFlowDao {
     public static final String FLOW_XML_FILENAME = "flow.xml";
     public static final String TEMPLATES_FILENAME = "templates.xml";
     public static final String SNIPPETS_FILENAME = "snippets.xml";
+    public static final String CONTROLLER_SERVICES_FILENAME = "controller-services.xml";
+    public static final String REPORTING_TASKS_FILENAME = "reporting-tasks.xml";
     public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml";
 
     private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class));
@@ -479,7 +481,9 @@ public class DataFlowDaoImpl implements DataFlowDao {
         byte[] templateBytes = new byte[0];
         byte[] snippetBytes = new byte[0];
         byte[] clusterInfoBytes = new byte[0];
-
+        byte[] controllerServiceBytes = new byte[0];
+        byte[] reportingTaskBytes = new byte[0];
+        
         try (final InputStream inStream = new FileInputStream(file);
                 final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) {
             TarArchiveEntry tarEntry;
@@ -501,6 +505,14 @@ public class DataFlowDaoImpl implements DataFlowDao {
                         clusterInfoBytes = new byte[(int) tarEntry.getSize()];
                         StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true);
                         break;
+                    case CONTROLLER_SERVICES_FILENAME:
+                    	controllerServiceBytes = new byte[(int) tarEntry.getSize()];
+                    	StreamUtils.fillBuffer(tarIn, controllerServiceBytes, true);
+                    	break;
+                    case REPORTING_TASKS_FILENAME:
+                    	reportingTaskBytes = new byte[(int) tarEntry.getSize()];
+                    	StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true);
+                    	break;
                     default:
                         throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName());
                 }
@@ -518,7 +530,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
         final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
         dataFlow.setAutoStartProcessors(autoStart);
 
-        return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId());
+        return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes);
     }
 
     private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
index e135af3..1bb8ca3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
@@ -41,7 +41,6 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
 import org.apache.nifi.logging.NiFiLog;
 import org.apache.nifi.util.FormatUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -154,17 +153,74 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
             final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
 
             final StandardDataFlow dataFlow;
+            final byte[] controllerServiceBytes;
+            final byte[] reportingTaskBytes;
             if (existingClusterDataFlow == null) {
                 dataFlow = null;
+                controllerServiceBytes = new byte[0];
+                reportingTaskBytes = new byte[0];
             } else {
                 dataFlow = existingClusterDataFlow.getDataFlow();
+                controllerServiceBytes = existingClusterDataFlow.getControllerServices();
+                reportingTaskBytes = existingClusterDataFlow.getReportingTasks();
             }
 
-            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId));
+            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
         } finally {
             resourceLock.unlock("updatePrimaryNode");
         }
     }
+    
+    
+    @Override
+    public void updateControllerServices(final byte[] controllerServiceBytes) throws DaoException {
+    	resourceLock.lock();
+    	try {
+    		final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+
+            final StandardDataFlow dataFlow;
+            final byte[] reportingTaskBytes;
+            final NodeIdentifier nodeId;
+            if (existingClusterDataFlow == null) {
+                dataFlow = null;
+                nodeId = null;
+                reportingTaskBytes = new byte[0];
+            } else {
+                dataFlow = existingClusterDataFlow.getDataFlow();
+                nodeId = existingClusterDataFlow.getPrimaryNodeId();
+                reportingTaskBytes = existingClusterDataFlow.getReportingTasks();
+            }
+
+            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
+    	} finally {
+    		resourceLock.unlock("updateControllerServices");
+    	}
+    }
+    
+    @Override
+    public void updateReportingTasks(final byte[] reportingTaskBytes) throws DaoException {
+    	resourceLock.lock();
+    	try {
+    		final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+
+            final StandardDataFlow dataFlow;
+            final byte[] controllerServiceBytes;
+            final NodeIdentifier nodeId;
+            if (existingClusterDataFlow == null) {
+                dataFlow = null;
+                nodeId = null;
+                controllerServiceBytes = null;
+            } else {
+                dataFlow = existingClusterDataFlow.getDataFlow();
+                nodeId = existingClusterDataFlow.getPrimaryNodeId();
+                controllerServiceBytes = existingClusterDataFlow.getControllerServices();
+            }
+
+            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
+    	} finally {
+    		resourceLock.unlock("updateControllerServices");
+    	}
+    }
 
     @Override
     public PersistedFlowState getPersistedFlowState() {
@@ -303,9 +359,10 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
                             final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
                             final ClusterDataFlow currentClusterDataFlow;
                             if (existingClusterDataFlow == null) {
-                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, null);
+                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, null, new byte[0], new byte[0]);
                             } else {
-                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId());
+                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(), 
+                                		existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks());
                             }
                             flowDao.saveDataFlow(currentClusterDataFlow);
                             flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 9649fee..50d7087 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.cluster.manager.impl;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -58,13 +59,19 @@ import javax.xml.XMLConstants;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
 import javax.xml.validation.Schema;
 import javax.xml.validation.SchemaFactory;
 import javax.xml.validation.Validator;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.ClusterNodeInformation;
 import org.apache.nifi.cluster.HeartbeatPayload;
@@ -125,6 +132,7 @@ import org.apache.nifi.controller.Availability;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.StandardFlowSerializer;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
@@ -170,6 +178,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.DomUtils;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.NodeDTO;
@@ -305,7 +314,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write");
 
     private final Set<Node> nodes = new HashSet<>();
-    private final Set<ReportingTaskNode> reportingTasks = new HashSet<>();
+    private final Map<String, ReportingTaskNode> reportingTasks = new HashMap<>();
 
     // null means the dataflow should be read from disk
     private StandardDataFlow cachedDataFlow = null;
@@ -453,7 +462,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
                 // Load and start running Reporting Tasks
                 final File taskFile = new File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
-                reportingTasks.addAll(loadReportingTasks(taskFile));
+                reportingTasks.putAll(loadReportingTasks(taskFile));
             } catch (final IOException ioe) {
                 logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
                 stop();
@@ -867,8 +876,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         reconnectionThread.start();
     }
 
-    private List<ReportingTaskNode> loadReportingTasks(final File taskConfigXml) {
-        final List<ReportingTaskNode> tasks = new ArrayList<>();
+    private Map<String, ReportingTaskNode> loadReportingTasks(final File taskConfigXml) {
+        final Map<String, ReportingTaskNode> tasks = new HashMap<>();
         if (taskConfigXml == null) {
             logger.info("No controller tasks to start");
             return tasks;
@@ -945,7 +954,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 }
 
                 processScheduler.schedule(reportingTaskNode);
-                tasks.add(reportingTaskNode);
+                tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode);
             }
         } catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
             logger.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXml, t});
@@ -1295,8 +1304,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     }
     
     @Override
-    public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
-    	return controllerServiceProvider.createControllerService(type, firstTimeAdded);
+    public ControllerServiceNode createControllerService(final String type, final Availability availability, final boolean firstTimeAdded) {
+    	if ( availability == null ) {
+    		throw new NullPointerException("availability is null");
+    	}
+    	if ( availability == Availability.NODE ) {
+    		throw new IllegalArgumentException("Cannot create Controller Service with Availability 'NODE' on the Cluster Manager");
+    	}
+    	
+    	return controllerServiceProvider.createControllerService(type, availability, firstTimeAdded);
     }
 
     /**
@@ -1308,8 +1324,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
      * @return
      */
     @Override
-    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
-        return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+    public ControllerServiceNode createControllerService(final String type, final String id, final Availability availability, final boolean firstTimeAdded) {
+    	if ( availability == null ) {
+    		throw new NullPointerException("availability is null");
+    	}
+    	if ( availability == Availability.NODE ) {
+    		throw new IllegalArgumentException("Cannot create Controller Service with Availability 'NODE' on the Cluster Manager");
+    	}
+
+    	return controllerServiceProvider.createControllerService(type, id, availability, firstTimeAdded);
     }
 
     @Override
@@ -1345,21 +1368,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     @Override
     public void enableControllerService(final ControllerServiceNode serviceNode) {
-    	if ( serviceNode.getAvailability() == Availability.NODE_ONLY ) {
-    		serviceNode.setDisabled(false);	// update disabled flag to stay in sync across cluster
-        	return;
-        }
-    	
         controllerServiceProvider.enableControllerService(serviceNode);
     }
     
     @Override
     public void disableControllerService(final ControllerServiceNode serviceNode) {
-    	if ( serviceNode.getAvailability() == Availability.NODE_ONLY ) {
-    		serviceNode.setDisabled(true);	// update disabled flag to stay in sync across cluster
-        	return;
-        }
-    	
         controllerServiceProvider.disableControllerService(serviceNode);
     }
     
@@ -1368,6 +1381,130 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     	return controllerServiceProvider.getAllControllerServices();
     }
     
+    
+    private byte[] serialize(final Document doc) throws TransformerException {
+    	final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    	final DOMSource domSource = new DOMSource(doc);
+        final StreamResult streamResult = new StreamResult(baos);
+
+        // configure the transformer and convert the DOM
+        final TransformerFactory transformFactory = TransformerFactory.newInstance();
+        final Transformer transformer = transformFactory.newTransformer();
+        transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+        transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+
+        // transform the document to byte stream
+        transformer.transform(domSource, streamResult);
+        return baos.toByteArray();
+    }
+    
+    private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException {
+    	final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+        final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+        final Document document = docBuilder.newDocument();
+    	final Element rootElement = document.createElement("controllerServices");
+    	
+    	for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) {
+    		StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor);
+    	}
+    	
+    	return serialize(document);
+    }
+    
+    private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException {
+    	final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+        final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+        final Document document = docBuilder.newDocument();
+    	final Element rootElement = document.createElement("controllerServices");
+    	
+    	for ( final ReportingTaskNode taskNode : getReportingTasks() ) {
+    		StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor);
+    	}
+    	
+    	return serialize(document);
+    }
+    
+    
+    public void saveControllerServices() {
+    	try {
+    		dataFlowManagementService.updateControllerServices(serializeControllerServices());
+    	} catch (final Exception e) {
+    		logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e);
+    		if ( logger.isDebugEnabled() ) {
+    			logger.error("", e);
+    		}
+    		
+    		getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(), 
+    				"Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details."));
+    	}
+    }
+    
+    public void saveReportingTasks() {
+    	try {
+    		dataFlowManagementService.updateReportingTasks(serializeReportingTasks());
+    	} catch (final Exception e) {
+    		logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e);
+    		if ( logger.isDebugEnabled() ) {
+    			logger.error("", e);
+    		}
+    		
+    		getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(), 
+    				"Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details."));
+    	}
+    }
+
+    
+    public Set<ReportingTaskNode> getReportingTasks() {
+    	readLock.lock();
+    	try {
+    		return new HashSet<>(reportingTasks.values());
+    	} finally {
+    		readLock.unlock("getReportingTasks");
+    	}
+    }
+    
+    
+    public ReportingTaskNode getReportingTaskNode(final String taskId) {
+    	readLock.lock();
+    	try {
+    		return reportingTasks.get(taskId);
+    	} finally {
+    		readLock.unlock("getReportingTaskNode");
+    	}
+    }
+
+    public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
+        reportingTaskNode.verifyCanStart();
+       	processScheduler.schedule(reportingTaskNode);
+    }
+
+    
+    public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
+        reportingTaskNode.verifyCanStop();
+        processScheduler.unschedule(reportingTaskNode);
+    }
+
+    public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
+    	writeLock.lock();
+    	try {
+	        final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
+	        if ( existing == null || existing != reportingTaskNode ) {
+	            throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
+	        }
+	        
+	        reportingTaskNode.verifyCanDelete();
+	        
+	        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+	            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
+	        }
+	        
+	        reportingTasks.remove(reportingTaskNode.getIdentifier());
+    	} finally {
+    		writeLock.unlock("removeReportingTask");
+    	}
+    }
+    
+    
     /**
      * Handle a bulletins message.
      *

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
index 1ed5b30..5477d8e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.reporting;
 
 import org.apache.nifi.cluster.manager.impl.ClusteredReportingContext;
+import org.apache.nifi.controller.Availability;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -34,7 +35,7 @@ public class ClusteredReportingTaskNode extends AbstractReportingTaskNode {
     public ClusteredReportingTaskNode(final ReportingTask reportingTask, final String id, final ProcessScheduler scheduler,
             final EventAccess eventAccess, final BulletinRepository bulletinRepository, final ControllerServiceProvider serviceProvider,
             final ValidationContextFactory validationContextFactory) {
-        super(reportingTask, id, serviceProvider, scheduler, validationContextFactory);
+        super(reportingTask, id, Availability.NCM, serviceProvider, scheduler, validationContextFactory);
 
         this.eventAccess = eventAccess;
         this.bulletinRepository = bulletinRepository;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
index 38df6f7..44987e2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
@@ -18,7 +18,13 @@ package org.apache.nifi.controller;
 
 public enum Availability {
 
-    CLUSTER_MANAGER_ONLY,
-    NODE_ONLY,
-    BOTH;
+	/**
+	 * Service or reporting task will run only on the NiFi Cluster Manager (NCM)
+	 */
+    NCM,
+    
+    /**
+     * Service or reporting task will run only on NiFi Nodes (or standalone instance, if not clustered)
+     */
+    NODE;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index c4960a9..b26bf82 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -26,8 +26,6 @@ public interface ReportingTaskNode extends ConfiguredComponent {
 
     Availability getAvailability();
 
-    void setAvailability(Availability availability);
-
     void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
 
     SchedulingStrategy getSchedulingStrategy();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index bf6b888..298a224 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -28,8 +28,6 @@ public interface ControllerServiceNode extends ConfiguredComponent {
 
     Availability getAvailability();
 
-    void setAvailability(Availability availability);
-
     boolean isDisabled();
 
     void setDisabled(boolean disabled);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 42d6609..a99452e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.service;
 import java.util.Set;
 
 import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.controller.Availability;
 import org.apache.nifi.controller.ControllerServiceLookup;
 
 /**
@@ -31,10 +32,11 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
      * is true, calls any methods that are annotated with {@link OnAdded}
      *
      * @param type
+     * @param availabilty
      * @param firstTimeAdded
      * @return
      */
-	ControllerServiceNode createControllerService(String type, boolean firstTimeAdded);
+	ControllerServiceNode createControllerService(String type, Availability availabilty, boolean firstTimeAdded);
 	
     /**
      * Creates a new Controller Service of the specified type and assigns it the given id. If <code>firstTimeadded</code>
@@ -42,10 +44,11 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
      *
      * @param type
      * @param id
+     * @param availabilty
      * @param firstTimeAdded
      * @return
      */
-    ControllerServiceNode createControllerService(String type, String id, boolean firstTimeAdded);
+    ControllerServiceNode createControllerService(String type, String id, Availability availabilty, boolean firstTimeAdded);
 
     /**
      * Gets the controller service node for the specified identifier. Returns

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 347d6b9..c9c41f2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2465,14 +2465,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
     }
 
     public ReportingTaskNode createReportingTask(final String type) throws ReportingTaskInstantiationException {
-        return createReportingTask(type, true);
+        return createReportingTask(type, Availability.NODE, true);
     }
     
-    public ReportingTaskNode createReportingTask(final String type, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
-    	return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded);
+    public ReportingTaskNode createReportingTask(final String type, final Availability availability, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+    	return createReportingTask(type, UUID.randomUUID().toString(), availability, firstTimeAdded);
     }
     
-    public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+    public ReportingTaskNode createReportingTask(final String type, final String id, final Availability availability, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
         if (type == null || id == null) {
             throw new NullPointerException();
         }
@@ -2520,36 +2520,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
     }
 
     public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
-    	if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
-    		reportingTaskNode.setScheduledState(ScheduledState.RUNNING); // updated scheduled state to keep state in sync across cluster
-        	return;
-        }
-    	
         if (isTerminated()) {
             throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode + " because the controller is terminated");
         }
 
-        if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
-        	return;
-        }
-
         reportingTaskNode.verifyCanStart();
        	processScheduler.schedule(reportingTaskNode);
     }
 
     
     public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
-    	if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
-    		reportingTaskNode.setScheduledState(ScheduledState.STOPPED); // updated scheduled state to keep state in sync across cluster
-        	return;
-        }
-    	
         if (isTerminated()) {
             return;
         }
 
         reportingTaskNode.verifyCanStop();
-        
         processScheduler.unschedule(reportingTaskNode);
     }
 
@@ -2650,53 +2635,47 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
     	}
     }
     
-    public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
-    	return controllerServiceProvider.createControllerService(type, firstTimeAdded);
+    public ControllerServiceNode createControllerService(final String type, final Availability availability, final boolean firstTimeAdded) {
+    	if ( availability == null ) {
+    		throw new NullPointerException("availability is null");
+    	}
+    	if ( availability == Availability.NCM ) {
+    		throw new IllegalArgumentException("Cannot create Controller Service with Availability 'NCM' on a standalone instance or a Node");
+    	}
+
+    	return controllerServiceProvider.createControllerService(type, availability, firstTimeAdded);
     }
     
     @Override
-    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
-        return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+    public ControllerServiceNode createControllerService(final String type, final String id, final Availability availability, final boolean firstTimeAdded) {
+    	if ( availability == null ) {
+    		throw new NullPointerException("availability is null");
+    	}
+    	if ( availability == Availability.NCM ) {
+    		throw new IllegalArgumentException("Cannot create Controller Service with Availability 'NCM' on a standalone instance or a Node");
+    	}
+
+        return controllerServiceProvider.createControllerService(type, id, availability, firstTimeAdded);
     }
     
     public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
-    	if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
-    		reportingTaskNode.setScheduledState(ScheduledState.STOPPED); // updated scheduled state to keep state in sync across cluster
-        	return;
-        }
-    	
         reportingTaskNode.verifyCanEnable();
         processScheduler.enableReportingTask(reportingTaskNode);
     }
     
     public void disableReportingTask(final ReportingTaskNode reportingTaskNode) {
-    	if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
-    		reportingTaskNode.setScheduledState(ScheduledState.DISABLED); // updated scheduled state to keep state in sync across cluster
-        	return;
-        }
-    	
         reportingTaskNode.verifyCanDisable();
         processScheduler.disableReportingTask(reportingTaskNode);
     }
     
     @Override
     public void enableControllerService(final ControllerServiceNode serviceNode) {
-        if ( serviceNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
-        	serviceNode.setDisabled(false);	// set the disabled flag so that we can keep in sync with cluster
-        	return;
-        }
-
         serviceNode.verifyCanEnable();
         controllerServiceProvider.enableControllerService(serviceNode);
     }
     
     @Override
     public void disableControllerService(final ControllerServiceNode serviceNode) {
-    	if ( serviceNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
-        	serviceNode.setDisabled(true);	// set the disabled flag so that we can keep in sync with cluster
-        	return;
-        }
-    	
         serviceNode.verifyCanDisable();
         controllerServiceProvider.disableControllerService(serviceNode);
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
index 1f483ed..18e2d8c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
@@ -83,13 +83,13 @@ public class StandardFlowSerializer implements FlowSerializer {
             final Element controllerServicesNode = doc.createElement("controllerServices");
             rootNode.appendChild(controllerServicesNode);
             for ( final ControllerServiceNode serviceNode : controller.getAllControllerServices() ) {
-            	addControllerService(controllerServicesNode, serviceNode);
+            	addControllerService(controllerServicesNode, serviceNode, encryptor);
             }
             
             final Element reportingTasksNode = doc.createElement("reportingTasks");
             rootNode.appendChild(reportingTasksNode);
             for ( final ReportingTaskNode taskNode : controller.getReportingTasks() ) {
-            	addReportingTask(reportingTasksNode, taskNode);
+            	addReportingTask(reportingTasksNode, taskNode, encryptor);
             }
 
             final DOMSource domSource = new DOMSource(doc);
@@ -312,14 +312,14 @@ public class StandardFlowSerializer implements FlowSerializer {
         addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
         addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
 
-        addConfiguration(element, processor.getProperties(), processor.getAnnotationData());
+        addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor);
         
         for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
             addTextElement(element, "autoTerminatedRelationship", rel.getName());
         }
     }
     
-    private void addConfiguration(final Element element, final Map<PropertyDescriptor, String> properties, final String annotationData) {
+    private static void addConfiguration(final Element element, final Map<PropertyDescriptor, String> properties, final String annotationData, final StringEncryptor encryptor) {
     	final Document doc = element.getOwnerDocument();
     	for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
             final PropertyDescriptor descriptor = entry.getKey();
@@ -406,7 +406,7 @@ public class StandardFlowSerializer implements FlowSerializer {
     }
 
     
-    private void addControllerService(final Element element, final ControllerServiceNode serviceNode) {
+    public static void addControllerService(final Element element, final ControllerServiceNode serviceNode, final StringEncryptor encryptor) {
     	final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
     	addTextElement(serviceElement, "id", serviceNode.getIdentifier());
     	addTextElement(serviceElement, "name", serviceNode.getName());
@@ -415,12 +415,12 @@ public class StandardFlowSerializer implements FlowSerializer {
         addTextElement(serviceElement, "enabled", String.valueOf(!serviceNode.isDisabled()));
         addTextElement(serviceElement, "availability", serviceNode.getAvailability().toString());
         
-        addConfiguration(serviceElement, serviceNode.getProperties(), serviceNode.getAnnotationData());
+        addConfiguration(serviceElement, serviceNode.getProperties(), serviceNode.getAnnotationData(), encryptor);
         
     	element.appendChild(serviceElement);
     }
     
-    private void addReportingTask(final Element element, final ReportingTaskNode taskNode) {
+    public static void addReportingTask(final Element element, final ReportingTaskNode taskNode, final StringEncryptor encryptor) {
     	final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
     	addTextElement(taskElement, "id", taskNode.getIdentifier());
     	addTextElement(taskElement, "name", taskNode.getName());
@@ -431,16 +431,16 @@ public class StandardFlowSerializer implements FlowSerializer {
         addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name());
     	addTextElement(taskElement, "availability", taskNode.getAvailability().toString());
     	
-    	addConfiguration(taskElement, taskNode.getProperties(), taskNode.getAnnotationData());
+    	addConfiguration(taskElement, taskNode.getProperties(), taskNode.getAnnotationData(), encryptor);
     	
     	element.appendChild(taskElement);
     }
     
-    private void addTextElement(final Element element, final String name, final long value) {
+    private static void addTextElement(final Element element, final String name, final long value) {
         addTextElement(element, name, String.valueOf(value));
     }
 
-    private void addTextElement(final Element element, final String name, final String value) {
+    private static void addTextElement(final Element element, final String name, final String value) {
         final Document doc = element.getOwnerDocument();
         final Element toAdd = doc.createElement(name);
         toAdd.setTextContent(value);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 0619793..f9279a9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -347,9 +347,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
     private void addControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) {
     	final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
     	
-    	final ControllerServiceNode node = controller.createControllerService(dto.getType(), dto.getId(), false);
+    	final ControllerServiceNode node = controller.createControllerService(dto.getType(), dto.getId(), Availability.valueOf(dto.getAvailability().toUpperCase()), false);
     	node.setName(dto.getName());
-    	node.setAvailability(Availability.valueOf(dto.getAvailability()));
     	node.setComments(dto.getComments());
     	node.setAnnotationData(dto.getAnnotationData());
     	
@@ -392,10 +391,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
     private void addReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) throws ReportingTaskInstantiationException {
     	final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
     	
-    	final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false);
+    	final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), Availability.valueOf(dto.getAvailability().toUpperCase()), false);
     	reportingTask.setName(dto.getName());
     	reportingTask.setComments(dto.getComment());
-    	reportingTask.setAvailability(Availability.valueOf(dto.getAvailability()));
     	reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
     	reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
     	

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 8a131d0..ed72618 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -42,31 +42,27 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
     private final ReportingTask reportingTask;
     private final ProcessScheduler processScheduler;
     private final ControllerServiceLookup serviceLookup;
+    private final Availability availability;
 
     private final AtomicReference<SchedulingStrategy> schedulingStrategy = new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN);
     private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins");
-    private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
     
     private volatile String comment;
     private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
     
-    public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
+    public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, final Availability availability,
             final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
             final ValidationContextFactory validationContextFactory) {
         super(reportingTask, id, validationContextFactory, controllerServiceProvider);
         this.reportingTask = reportingTask;
         this.processScheduler = processScheduler;
         this.serviceLookup = controllerServiceProvider;
+        this.availability = availability;
     }
 
     @Override
     public Availability getAvailability() {
-        return availability.get();
-    }
-
-    @Override
-    public void setAvailability(final Availability availability) {
-        this.availability.set(availability);
+        return availability;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index fe3af92..dd83c61 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller.reporting;
 
+import org.apache.nifi.controller.Availability;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ReportingTaskNode;
@@ -29,7 +30,7 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
 
     public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller,
             final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory) {
-        super(reportingTask, id, controller, processScheduler, validationContextFactory);
+        super(reportingTask, id, Availability.NODE, controller, processScheduler, validationContextFactory);
         this.flowController = controller;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index db44b5f..1ca6e54 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -36,6 +36,7 @@ import javax.xml.validation.SchemaFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.nifi.controller.Availability;
 import org.apache.nifi.util.DomUtils;
 import org.apache.nifi.util.file.FileUtils;
 import org.w3c.dom.Document;
@@ -124,9 +125,12 @@ public class ControllerServiceLoader {
                     //and schedulingPeriod must be set
                     final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
                     final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
-
+                    final String availabilityName = DomUtils.getChild(serviceElement, "availability").getTextContent().trim();
+                    
+                    final Availability availability = Availability.valueOf(availabilityName);
+                    
                     //set the class to be used for the configured controller task
-                    final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, false);
+                    final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, availability, false);
 
                     //optional task-specific properties
                     for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index dd06f40..be0e7d6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -19,7 +19,6 @@ package org.apache.nifi.controller.service;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -40,8 +39,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     private final ControllerService proxedControllerService;
     private final ControllerService implementation;
     private final ControllerServiceProvider serviceProvider;
+    private final Availability availability;
 
-    private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
     private final AtomicBoolean disabled = new AtomicBoolean(true);
 
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -52,11 +51,12 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     private String comment;
 
     public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
-            final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
+            final Availability availability, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
         super(proxiedControllerService, id, validationContextFactory, serviceProvider);
         this.proxedControllerService = proxiedControllerService;
         this.implementation = implementation;
         this.serviceProvider = serviceProvider;
+        this.availability = availability;
     }
 
     @Override
@@ -83,13 +83,9 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
 
     @Override
     public Availability getAvailability() {
-        return availability.get();
+    	return availability;
     }
 
-    @Override
-    public void setAvailability(final Availability availability) {
-        this.availability.set(availability);
-    }
 
     @Override
     public ControllerService getProxiedControllerService() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 3ec4755..fd97da7 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -36,6 +36,7 @@ import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.controller.Availability;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ValidationContextFactory;
@@ -98,12 +99,12 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     }
     
     @Override
-    public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
-    	return createControllerService(type, UUID.randomUUID().toString(), firstTimeAdded);
+    public ControllerServiceNode createControllerService(final String type, final Availability availability, final boolean firstTimeAdded) {
+    	return createControllerService(type, UUID.randomUUID().toString(), availability, firstTimeAdded);
     }
 
     @Override
-    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
+    public ControllerServiceNode createControllerService(final String type, final String id, final Availability availability, final boolean firstTimeAdded) {
         if (type == null || id == null) {
             throw new NullPointerException();
         }
@@ -147,7 +148,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
             final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
 
-            final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this);
+            final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, availability, validationContextFactory, this);
             serviceNodeHolder.set(serviceNode);
             serviceNode.setName(rawClass.getSimpleName());
             

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 40d3b1b..f8852d9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -43,6 +43,7 @@ import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.Availability;
 import org.apache.nifi.controller.ContentAvailability;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Counter;
@@ -404,13 +405,13 @@ public class ControllerFacade implements ControllerServiceProvider {
         return flowController.getControllerService(serviceIdentifier);
     }
 
-    public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
-    	return flowController.createControllerService(type, firstTimeAdded);
+    public ControllerServiceNode createControllerService(final String type, final Availability availability, final boolean firstTimeAdded) {
+    	return flowController.createControllerService(type, availability, firstTimeAdded);
     }
     
     @Override
-    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
-        return flowController.createControllerService(type, id, firstTimeAdded);
+    public ControllerServiceNode createControllerService(final String type, final String id, final Availability availability, final boolean firstTimeAdded) {
+        return flowController.createControllerService(type, id, availability, firstTimeAdded);
     }
     
     public void removeControllerService(ControllerServiceNode serviceNode) {


[7/7] incubator-nifi git commit: Load Controller Services from flow.tar file instead and external file

Posted by ma...@apache.org.
Load Controller Services from flow.tar file instead and external file


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4ae2a10e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4ae2a10e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4ae2a10e

Branch: refs/heads/NIFI-250
Commit: 4ae2a10e6d130353ea34aa403d3bcbad7a2ab4e8
Parents: 52149d8
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 13:01:00 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 13:01:00 2015 -0500

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java | 42 ++++------
 .../spring/WebClusterManagerFactoryBean.java    |  6 --
 .../service/ControllerServiceLoader.java        | 88 ++++++--------------
 .../nifi/persistence/FlowConfigurationDAO.java  | 12 ---
 .../StandardXMLFlowConfigurationDAO.java        | 11 +--
 5 files changed, 41 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 50d7087..d3cf6a1 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -16,13 +16,12 @@
  */
 package org.apache.nifi.cluster.manager.impl;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.net.URI;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -55,7 +54,6 @@ import javax.net.ssl.SSLContext;
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.StreamingOutput;
-import javax.xml.XMLConstants;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
@@ -65,9 +63,6 @@ import javax.xml.transform.TransformerException;
 import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMSource;
 import javax.xml.transform.stream.StreamResult;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import javax.xml.validation.Validator;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.AuditService;
@@ -140,6 +135,7 @@ import org.apache.nifi.controller.reporting.StandardReportingInitializationConte
 import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
+import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
@@ -444,14 +440,20 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 }
 
                 // load flow
+                final ClusterDataFlow clusterDataFlow;
                 if (dataFlowManagementService.isFlowCurrent()) {
-                    final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow();
+                    clusterDataFlow = dataFlowManagementService.loadDataFlow();
                     cachedDataFlow = clusterDataFlow.getDataFlow();
                     primaryNodeId = clusterDataFlow.getPrimaryNodeId();
                 } else {
                     throw new IOException("Flow is not current.");
                 }
 
+                final byte[] serializedServices = clusterDataFlow.getControllerServices();
+                if ( serializedServices != null && serializedServices.length > 0 ) {
+                	ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices));
+                }
+                
                 // start multicast broadcasting service, if configured
                 if (servicesBroadcaster != null) {
                     servicesBroadcaster.start();
@@ -461,8 +463,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 executeSafeModeTask();
 
                 // Load and start running Reporting Tasks
-                final File taskFile = new File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
-                reportingTasks.putAll(loadReportingTasks(taskFile));
+                reportingTasks.putAll(loadReportingTasks(clusterDataFlow.getReportingTasks()));
             } catch (final IOException ioe) {
                 logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
                 stop();
@@ -876,16 +877,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         reconnectionThread.start();
     }
 
-    private Map<String, ReportingTaskNode> loadReportingTasks(final File taskConfigXml) {
+    private Map<String, ReportingTaskNode> loadReportingTasks(final byte[] serialized) {
         final Map<String, ReportingTaskNode> tasks = new HashMap<>();
-        if (taskConfigXml == null) {
-            logger.info("No controller tasks to start");
-            return tasks;
-        }
 
         try {
-            final URL schemaUrl = getClass().getResource("/ReportingTaskConfiguration.xsd");
-            final Document document = parse(taskConfigXml, schemaUrl);
+            final Document document = parse(serialized);
 
             final NodeList tasksNodes = document.getElementsByTagName("tasks");
             final Element tasksElement = (Element) tasksNodes.item(0);
@@ -957,7 +953,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode);
             }
         } catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
-            logger.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXml, t});
+            logger.error("Unable to load reporting tasks due to {}", new Object[]{t});
             if (logger.isDebugEnabled()) {
                 logger.error("", t);
             }
@@ -999,11 +995,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return taskNode;
     }
 
-    private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
-        final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
-        final Schema schema = schemaFactory.newSchema(schemaUrl);
+    private Document parse(final byte[] serialized) throws SAXException, ParserConfigurationException, IOException {
         final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
-        docFactory.setSchema(schema);
         final DocumentBuilder builder = docFactory.newDocumentBuilder();
 
         builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -1036,12 +1029,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         });
 
         // build the docuemnt
-        final Document document = builder.parse(xmlFile);
-
-        // ensure schema compliance
-        final Validator validator = schema.newValidator();
-        validator.validate(new DOMSource(document));
-
+        final Document document = builder.parse(new ByteArrayInputStream(serialized));
         return document;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
index 7169730..3881461 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.cluster.spring;
 
-import java.nio.file.Paths;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.cluster.event.EventManager;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
@@ -26,7 +25,6 @@ import org.apache.nifi.cluster.manager.HttpResponseMapper;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
 import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
 import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
@@ -106,10 +104,6 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
 
             // set the audit service
             clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class));
-
-            // load the controller services
-            final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile));
-            serviceLoader.loadControllerServices(clusterManager);
         }
         return clusterManager;
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 1ca6e54..01114e5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -17,31 +17,21 @@
 package org.apache.nifi.controller.service;
 
 import java.io.BufferedInputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.List;
 
-import javax.xml.XMLConstants;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.nifi.controller.Availability;
 import org.apache.nifi.util.DomUtils;
-import org.apache.nifi.util.file.FileUtils;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
@@ -52,33 +42,14 @@ public class ControllerServiceLoader {
 
     private static final Log logger = LogFactory.getLog(ControllerServiceLoader.class);
 
-    private final Path serviceConfigXmlPath;
 
-    public ControllerServiceLoader(final Path serviceConfigXmlPath) throws IOException {
-        final File serviceConfigXmlFile = serviceConfigXmlPath.toFile();
-        if (!serviceConfigXmlFile.exists() || !serviceConfigXmlFile.canRead()) {
-            throw new IOException(serviceConfigXmlPath + " does not appear to exist or cannot be read. Cannot load configuration.");
-        }
-
-        this.serviceConfigXmlPath = serviceConfigXmlPath;
-    }
-
-    public List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider) throws IOException {
-        final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+    public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream) throws IOException {
         final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
-        InputStream fis = null;
-        BufferedInputStream bis = null;
         documentBuilderFactory.setNamespaceAware(true);
 
         final List<ControllerServiceNode> services = new ArrayList<>();
 
-        try {
-            final URL configurationResource = this.getClass().getResource("/ControllerServiceConfiguration.xsd");
-            if (configurationResource == null) {
-                throw new NullPointerException("Unable to load XML Schema for ControllerServiceConfiguration");
-            }
-            final Schema schema = schemaFactory.newSchema(configurationResource);
-            documentBuilderFactory.setSchema(schema);
+        try (final InputStream in = new BufferedInputStream(serializedStream)) {
             final DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder();
 
             builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -112,42 +83,33 @@ public class ControllerServiceLoader {
             });
 
             //if controllerService.xml does not exist, create an empty file...
-            fis = Files.newInputStream(this.serviceConfigXmlPath, StandardOpenOption.READ);
-            bis = new BufferedInputStream(fis);
-            if (Files.size(this.serviceConfigXmlPath) > 0) {
-                final Document document = builder.parse(bis);
-                final NodeList servicesNodes = document.getElementsByTagName("services");
-                final Element servicesElement = (Element) servicesNodes.item(0);
-
-                final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(servicesElement, "service");
-                for (final Element serviceElement : serviceNodes) {
-                    //get properties for the specific controller task - id, name, class,
-                    //and schedulingPeriod must be set
-                    final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
-                    final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
-                    final String availabilityName = DomUtils.getChild(serviceElement, "availability").getTextContent().trim();
-                    
-                    final Availability availability = Availability.valueOf(availabilityName);
-                    
-                    //set the class to be used for the configured controller task
-                    final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, availability, false);
-
-                    //optional task-specific properties
-                    for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
-                        final String name = optionalProperty.getAttribute("name").trim();
-                        final String value = optionalProperty.getTextContent().trim();
-                        serviceNode.setProperty(name, value);
-                    }
-
-                    services.add(serviceNode);
-                    provider.enableControllerService(serviceNode);
+            final Document document = builder.parse(in);
+            final Element controllerServices = DomUtils.getChild(document.getDocumentElement(), "controllerServices");
+            final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
+            for (final Element serviceElement : serviceNodes) {
+                //get properties for the specific controller task - id, name, class,
+                //and schedulingPeriod must be set
+                final String serviceId = DomUtils.getChild(serviceElement, "id").getTextContent().trim();
+                final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
+                final String availabilityName = DomUtils.getChild(serviceElement, "availability").getTextContent().trim();
+                
+                final Availability availability = Availability.valueOf(availabilityName);
+                
+                //set the class to be used for the configured controller task
+                final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, availability, false);
+
+                //optional task-specific properties
+                for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
+                    final String name = optionalProperty.getAttribute("name").trim();
+                    final String value = optionalProperty.getTextContent().trim();
+                    serviceNode.setProperty(name, value);
                 }
+
+                services.add(serviceNode);
+                provider.enableControllerService(serviceNode);
             }
         } catch (SAXException | ParserConfigurationException sxe) {
             throw new IOException(sxe);
-        } finally {
-            FileUtils.closeQuietly(fis);
-            FileUtils.closeQuietly(bis);
         }
 
         return services;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
index 4f3afaf..8cab916 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
@@ -27,7 +27,6 @@ import org.apache.nifi.controller.FlowSerializationException;
 import org.apache.nifi.controller.FlowSynchronizationException;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.UninheritableFlowException;
-import org.apache.nifi.controller.service.ControllerServiceNode;
 
 /**
  * Interface to define service methods for FlowController configuration.
@@ -121,15 +120,4 @@ public interface FlowConfigurationDAO {
      */
     List<ReportingTaskNode> loadReportingTasks(FlowController controller) throws IOException;
 
-    /**
-     * Instantiates all controller services from the file used in the
-     * constructor
-     *
-     * @param controller
-     * @return 
-     * @throws java.io.IOException
-     * @returns all of the ReportingTasks that were instantiated & scheduled
-     */
-    List<ControllerServiceNode> loadControllerServices(FlowController controller) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
index 039b2c2..3000869 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
@@ -54,10 +54,7 @@ import org.apache.nifi.controller.StandardFlowSynchronizer;
 import org.apache.nifi.controller.UninheritableFlowException;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
-import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.util.file.FileUtils;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingInitializationContext;
@@ -65,7 +62,7 @@ import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.DomUtils;
 import org.apache.nifi.util.NiFiProperties;
-
+import org.apache.nifi.util.file.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.DOMException;
@@ -81,7 +78,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
 
     private final Path flowXmlPath;
     private final Path taskConfigXmlPath;
-    private final ControllerServiceLoader servicerLoader;
     private final StringEncryptor encryptor;
 
     private static final Logger LOG = LoggerFactory.getLogger(StandardXMLFlowConfigurationDAO.class);
@@ -103,7 +99,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
 
         this.flowXmlPath = flowXml;
         this.taskConfigXmlPath = taskConfigXml;
-        this.servicerLoader = new ControllerServiceLoader(serviceConfigXml);
         this.encryptor = encryptor;
     }
 
@@ -292,10 +287,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
         return tasks;
     }
 
-    @Override
-    public List<ControllerServiceNode> loadControllerServices(final FlowController controller) throws IOException {
-        return servicerLoader.loadControllerServices(controller);
-    }
 
     private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
         final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);


[4/7] incubator-nifi git commit: NIFI-250: Updated controller services to use appropriate defaults and use .identifiesControllerService instead of using the old way of obtaining controller services; do not fail to startup if controller service is invalid

Posted by ma...@apache.org.
NIFI-250: Updated controller services to use appropriate defaults and use .identifiesControllerService instead of using the old way of obtaining controller services; do not fail to startup if controller service is invalid


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3344cef3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3344cef3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3344cef3

Branch: refs/heads/NIFI-250
Commit: 3344cef3365dfc620f11aaf18fcfc8ba7d58e16a
Parents: 1682e47
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 09:05:58 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 09:05:58 2015 -0500

----------------------------------------------------------------------
 .../controller/StandardFlowSynchronizer.java    | 48 ++++++++++++++++++--
 .../nifi/processors/standard/PostHTTP.java      | 38 ++++++++++------
 .../DistributedMapCacheClientService.java       | 18 +++-----
 .../DistributedSetCacheClientService.java       | 11 ++---
 .../cache/server/DistributedCacheServer.java    |  9 ++--
 .../nifi/ssl/StandardSSLContextService.java     |  2 -
 6 files changed, 82 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 33650e1..0619793 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -100,9 +100,11 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
     private static final Logger logger = LoggerFactory.getLogger(StandardFlowSynchronizer.class);
     public static final URL FLOW_XSD_RESOURCE = StandardFlowSynchronizer.class.getResource("/FlowConfiguration.xsd");
     private final StringEncryptor encryptor;
+    private final boolean autoResumeState;
 
     public StandardFlowSynchronizer(final StringEncryptor encryptor) {
         this.encryptor = encryptor;
+        autoResumeState = NiFiProperties.getInstance().getAutoResumeState();
     }
 
     public static boolean isEmpty(final DataFlow dataFlow, final StringEncryptor encryptor) {
@@ -349,7 +351,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
     	node.setName(dto.getName());
     	node.setAvailability(Availability.valueOf(dto.getAvailability()));
     	node.setComments(dto.getComments());
-    	node.setDisabled(dto.getEnabled() != Boolean.TRUE);
     	node.setAnnotationData(dto.getAnnotationData());
     	
         for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
@@ -360,9 +361,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
             }
         }
 
-    	if ( dto.getEnabled() == Boolean.TRUE ) {
-    		controller.enableControllerService(node);
-    	}
+        if ( autoResumeState ) {
+	    	if ( dto.getEnabled() == Boolean.TRUE ) {
+	    		try {
+	    			controller.enableControllerService(node);
+	    		} catch (final Exception e) {
+	    			logger.error("Failed to enable " + node + " due to " + e);
+	    			if ( logger.isDebugEnabled() ) {
+	    				logger.error("", e);
+	    			}
+	    			
+	    			controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+	    					"Controller Service", Severity.ERROR.name(), "Could not start " + node + " due to " + e));
+	    		}
+	    	}
+        }
     }
     
     private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) {
@@ -384,7 +397,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
     	reportingTask.setComments(dto.getComment());
     	reportingTask.setAvailability(Availability.valueOf(dto.getAvailability()));
     	reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
-    	reportingTask.setScheduledState(ScheduledState.valueOf(dto.getScheduledState()));
     	reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
     	
     	reportingTask.setAnnotationData(dto.getAnnotationData());
@@ -396,6 +408,32 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
             	reportingTask.setProperty(entry.getKey(), entry.getValue());
             }
         }
+        
+        if ( autoResumeState ) {
+	        if ( ScheduledState.RUNNING.name().equals(dto.getScheduledState()) ) {
+	        	try {
+	        		controller.startReportingTask(reportingTask);
+	        	} catch (final Exception e) {
+	        		logger.error("Failed to start {} due to {}", reportingTask, e);
+	        		if ( logger.isDebugEnabled() ) {
+	        			logger.error("", e);
+	        		}
+	        		controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+	        				"Reporting Tasks", Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e));
+	        	}
+	        } else if ( ScheduledState.DISABLED.name().equals(dto.getScheduledState()) ) {
+	        	try {
+	        		controller.disableReportingTask(reportingTask);
+	        	} catch (final Exception e) {
+	        		logger.error("Failed to mark {} as disabled due to {}", reportingTask, e);
+	        		if ( logger.isDebugEnabled() ) {
+	        			logger.error("", e);
+	        		}
+	        		controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+	        				"Reporting Tasks", Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to " + e));
+	        	}
+	        }
+        }
     }
 
     private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
index fd486b0..ff7475b 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
@@ -68,6 +68,7 @@ import org.apache.http.conn.HttpClientConnectionManager;
 import org.apache.http.conn.ManagedHttpClientConnection;
 import org.apache.http.conn.socket.ConnectionSocketFactory;
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContextBuilder;
 import org.apache.http.conn.ssl.SSLContexts;
 import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
 import org.apache.http.entity.ContentProducer;
@@ -352,22 +353,29 @@ public class PostHTTP extends AbstractProcessor {
     private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException, 
         CertificateException, KeyManagementException, UnrecoverableKeyException 
     {
-        final KeyStore truststore  = KeyStore.getInstance(service.getTrustStoreType());
-        try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) {
-            truststore.load(in, service.getTrustStorePassword().toCharArray());
-        }
-        
-        final KeyStore keystore  = KeyStore.getInstance(service.getKeyStoreType());
-        try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
-            keystore.load(in, service.getKeyStorePassword().toCharArray());
-        }
-        
-        SSLContext sslContext = SSLContexts.custom()
-                .loadTrustMaterial(truststore, new TrustSelfSignedStrategy())
-                .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray())
-                .build();
+    	final SSLContextBuilder builder = SSLContexts.custom();
+    	
+    	final String trustStoreFilename = service.getTrustStoreFile();
+    	if ( trustStoreFilename != null ) {
+	        final KeyStore truststore  = KeyStore.getInstance(service.getTrustStoreType());
+	        try (final InputStream in = new FileInputStream(new File(trustStoreFilename))) {
+	            truststore.load(in, service.getTrustStorePassword().toCharArray());
+	        }
+	        
+	        builder.loadTrustMaterial(truststore, new TrustSelfSignedStrategy());
+    	}
         
-        return sslContext;
+    	final String keyStoreFilename = service.getKeyStoreFile();
+    	if ( keyStoreFilename != null ) {
+	        final KeyStore keystore  = KeyStore.getInstance(service.getKeyStoreType());
+	        try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
+	            keystore.load(in, service.getKeyStorePassword().toCharArray());
+	        }
+	        
+	        builder.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray());
+    	}
+    	
+        return builder.build();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index 6dad80b..956ed16 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -59,16 +59,15 @@ public class DistributedMapCacheClientService extends AbstractControllerService
             .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
-            .description(
-                    "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+            .description("If specified, indicates the SSL Context Service that is used to communicate with the "
+            		+ "remote server. If not specified, communications will not be encrypted")
             .required(false)
-            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
-            .defaultValue(null)
+            .identifiesControllerService(SSLContextService.class)
             .build();
     public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
             .name("Communications Timeout")
-            .description(
-                    "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+            .description("Specifies how long to wait when communicating with the remote server before determining that "
+            		+ "there is a communications failure if data cannot be sent or received")
             .required(true)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .defaultValue("30 secs")
@@ -94,8 +93,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
     }
 
     @Override
-    public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
-            throws IOException {
+    public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
         return withCommsSession(new CommsAction<Boolean>() {
             @Override
             public Boolean execute(final CommsSession session) throws IOException {
@@ -131,8 +129,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
     }
 
     @Override
-    public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
-            final Deserializer<V> valueDeserializer) throws IOException {
+    public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException {
         return withCommsSession(new CommsAction<V>() {
             @Override
             public V execute(final CommsSession session) throws IOException {
@@ -297,7 +294,6 @@ public class DistributedMapCacheClientService extends AbstractControllerService
     }
 
     private static interface CommsAction<T> {
-
         T execute(CommsSession commsSession) throws IOException;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
index dcc5558..67a2d43 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -59,16 +59,15 @@ public class DistributedSetCacheClientService extends AbstractControllerService
             .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
-            .description(
-                    "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+            .description("If specified, indicates the SSL Context Service that is used to communicate with the "
+            		+ "remote server. If not specified, communications will not be encrypted")
             .required(false)
-            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
-            .defaultValue(null)
+            .identifiesControllerService(SSLContextService.class)
             .build();
     public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
             .name("Communications Timeout")
-            .description(
-                    "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+            .description("Specifices how long to wait when communicating with the remote server before determining "
+            		+ "that there is a communications failure if data cannot be sent or received")
             .required(true)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .defaultValue("30 secs")

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
index 0817479..66b73e2 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -42,10 +42,10 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
             .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
-            .description(
-                    "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure")
+            .description("If specified, this service will be used to create an SSL Context that will be used "
+            		+ "to secure communications; if not specified, communications will not be secure")
             .required(false)
-            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+            .identifiesControllerService(SSLContextService.class)
             .build();
     public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
             .name("Maximum Cache Entries")
@@ -77,8 +77,7 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
         properties.add(MAX_CACHE_ENTRIES);
         properties.add(EVICTION_POLICY);
         properties.add(PERSISTENCE_PATH);
-        properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues(
-                getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build());
+        properties.add(SSL_CONTEXT_SERVICE);
         return properties;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
index ae2e19c..39bb5fb 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
@@ -62,7 +62,6 @@ public class StandardSSLContextService extends AbstractControllerService impleme
             .description("The Type of the Truststore. Either JKS or PKCS12")
             .allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue(STORE_TYPE_JKS)
             .sensitive(false)
             .build();
     public static final PropertyDescriptor TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder()
@@ -84,7 +83,6 @@ public class StandardSSLContextService extends AbstractControllerService impleme
             .description("The Type of the Keystore")
             .allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue(STORE_TYPE_JKS)
             .sensitive(false)
             .build();
     public static final PropertyDescriptor KEYSTORE_PASSWORD = new PropertyDescriptor.Builder()


[3/7] incubator-nifi git commit: Merge branch 'NIFI-250' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250

Posted by ma...@apache.org.
Merge branch 'NIFI-250' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1682e47a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1682e47a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1682e47a

Branch: refs/heads/NIFI-250
Commit: 1682e47aa3820bca6ec80e6009a16d0572fb1a9b
Parents: fbc14e0 e7beef8
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 08:36:55 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 08:36:55 2015 -0500

----------------------------------------------------------------------
 .../src/main/asciidoc/developer-guide.adoc      | 2116 +++++++++++++++++-
 1 file changed, 2098 insertions(+), 18 deletions(-)
----------------------------------------------------------------------



[2/7] incubator-nifi git commit: Merge branch 'NIFI-250' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250

Posted by ma...@apache.org.
Merge branch 'NIFI-250' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/fbc14e0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/fbc14e0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/fbc14e0a

Branch: refs/heads/NIFI-250
Commit: fbc14e0a3a46c0511d6135e86797cfc14c81d7d5
Parents: 9a6acab 1ebaf1d
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jan 29 16:26:08 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jan 29 16:26:08 2015 -0500

----------------------------------------------------------------------
 .../css/controller-service-configuration.css    | 32 ++++++++
 .../src/main/webapp/css/settings.css            |  2 +-
 .../nf-controller-service-configuration.js      | 79 ++++++++++++++++----
 3 files changed, 96 insertions(+), 17 deletions(-)
----------------------------------------------------------------------



[5/7] incubator-nifi git commit: NIFI-250: Updated ControllerServiceProvider to have a 'createControllerService' that takes no identifier and randomly generates one

Posted by ma...@apache.org.
NIFI-250: Updated ControllerServiceProvider to have a 'createControllerService' that takes no identifier and randomly generates one


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3102e083
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3102e083
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3102e083

Branch: refs/heads/NIFI-250
Commit: 3102e08378b34cb869c708faa05c427de6b68139
Parents: 3344cef
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 10:18:54 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 10:18:54 2015 -0500

----------------------------------------------------------------------
 .../nifi/cluster/manager/impl/WebClusterManager.java     |  3 ++-
 .../controller/service/ControllerServiceProvider.java    | 11 +++++++++++
 .../java/org/apache/nifi/controller/FlowController.java  |  2 +-
 .../service/StandardControllerServiceProvider.java       |  6 ++++++
 4 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3102e083/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 27620b5..9649fee 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -1294,8 +1294,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         }
     }
     
+    @Override
     public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
-    	return createControllerService(type, UUID.randomUUID().toString(), firstTimeAdded);
+    	return controllerServiceProvider.createControllerService(type, firstTimeAdded);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3102e083/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 1bc3964..42d6609 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -27,10 +27,21 @@ import org.apache.nifi.controller.ControllerServiceLookup;
 public interface ControllerServiceProvider extends ControllerServiceLookup {
 
     /**
+     * Creates a new Controller Service of the specified type and assigns it a randomly generated ID. If <code>firstTimeadded</code>
+     * is true, calls any methods that are annotated with {@link OnAdded}
+     *
+     * @param type
+     * @param firstTimeAdded
+     * @return
+     */
+	ControllerServiceNode createControllerService(String type, boolean firstTimeAdded);
+	
+    /**
      * Creates a new Controller Service of the specified type and assigns it the given id. If <code>firstTimeadded</code>
      * is true, calls any methods that are annotated with {@link OnAdded}
      *
      * @param type
+     * @param id
      * @param firstTimeAdded
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3102e083/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 209a4d6..347d6b9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2651,7 +2651,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
     }
     
     public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
-    	return createControllerService(type, UUID.randomUUID().toString(), firstTimeAdded);
+    	return controllerServiceProvider.createControllerService(type, firstTimeAdded);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3102e083/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 0263ee0..3ec4755 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -95,6 +96,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             populateInterfaces(superClass, interfacesDefinedThusFar);
         }
     }
+    
+    @Override
+    public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
+    	return createControllerService(type, UUID.randomUUID().toString(), firstTimeAdded);
+    }
 
     @Override
     public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {