You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 15:43:57 UTC

[22/62] [abbrv] incubator-nifi git commit: Squashed commit of the following:

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskTypesEntity.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskTypesEntity.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskTypesEntity.java
new file mode 100644
index 0000000..4b021ef
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskTypesEntity.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import java.util.Set;
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of
+ * a response to the API. This particular entity holds a reference to a list of
+ * reporting task types.
+ */
+@XmlRootElement(name = "reportingTaskTypesEntity")
+public class ReportingTaskTypesEntity extends Entity {
+
+    private Set<DocumentedTypeDTO> reportingTaskTypes;
+
+    /**
+     * The list of reporting task types that are being serialized.
+     *
+     * @return
+     */
+    public Set<DocumentedTypeDTO> getReportingTaskTypes() {
+        return reportingTaskTypes;
+    }
+
+    public void setReportingTaskTypes(Set<DocumentedTypeDTO> reportingTaskTypes) {
+        this.reportingTaskTypes = reportingTaskTypes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTasksEntity.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTasksEntity.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTasksEntity.java
new file mode 100644
index 0000000..4699d5d
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTasksEntity.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import java.util.Set;
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of
+ * a response to the API. This particular entity holds a reference to a list of
+ * reporting tasks.
+ */
+@XmlRootElement(name = "reportingTasksEntity")
+public class ReportingTasksEntity extends Entity {
+
+    private Set<ReportingTaskDTO> reportingTasks;
+
+    /**
+     * The list of reporting tasks that are being serialized.
+     *
+     * @return
+     */
+    public Set<ReportingTaskDTO> getReportingTasks() {
+        return reportingTasks;
+    }
+
+    public void setReportingTasks(Set<ReportingTaskDTO> reportingTasks) {
+        this.reportingTasks = reportingTasks;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
index fcd3ea3..69ce8d9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
@@ -18,6 +18,7 @@ package org.apache.nifi.documentation.mock;
 
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.logging.ComponentLog;
 
 /**
  * A Mock ControllerServiceInitializationContext so that ControllerServices can
@@ -37,4 +38,9 @@ public class MockControllerServiceInitializationContext implements ControllerSer
         return new MockControllerServiceLookup();
     }
 
+    @Override
+    public ComponentLog getLogger() {
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java
index f11bc68..5c60881 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java
@@ -52,4 +52,14 @@ public class MockControllerServiceLookup implements ControllerServiceLookup {
         return Collections.emptySet();
     }
 
+    @Override
+    public boolean isControllerServiceEnabling(String serviceIdentifier) {
+        return false;
+    }
+
+    @Override
+    public String getControllerServiceName(String serviceIdentifier) {
+        return serviceIdentifier;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
index 910ce5a..dc6e236 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.documentation.mock;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
@@ -26,8 +27,6 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
  * A Mock ReportingInitializationContext that can be used to initialize a
  * ReportingTask for the purposes of documentation generation.
  *
- * @author Alligator
- *
  */
 public class MockReportingInitializationContext implements ReportingInitializationContext {
 
@@ -60,4 +59,9 @@ public class MockReportingInitializationContext implements ReportingInitializati
     public SchedulingStrategy getSchedulingStrategy() {
         return SchedulingStrategy.TIMER_DRIVEN;
     }
+
+    @Override
+    public ComponentLog getLogger() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
index f9ee703..70dcc81 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
@@ -34,10 +34,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-web-optimistic-locking</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-administration</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
index 012e7c7..c8c7206 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
@@ -32,12 +32,7 @@ public class ClusterContextThreadLocal {
     }
     
     public static ClusterContext getContext() {
-        ClusterContext ctx = contextHolder.get();
-        if(ctx == null) {
-            ctx = createEmptyContext();
-            contextHolder.set(ctx);
-        }
-        return ctx;
+        return contextHolder.get();
     }
     
     public static void setContext(final ClusterContext context) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
deleted file mode 100644
index 90b8a37..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.web;
-
-import org.apache.nifi.cluster.context.ClusterContext;
-import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
-
-/**
- * An optimistic locking manager that provides for optimistic locking in a clustered
- * environment.
- * 
- * @author unattributed
- */
-public class ClusterAwareOptimisticLockingManager implements OptimisticLockingManager {
-
-    private final OptimisticLockingManager optimisticLockingManager;
-    
-    public ClusterAwareOptimisticLockingManager(final OptimisticLockingManager optimisticLockingManager) {
-        this.optimisticLockingManager = optimisticLockingManager;
-    }
-    
-    @Override
-    public Revision checkRevision(Revision revision) throws InvalidRevisionException {
-        final Revision currentRevision = getRevision();
-        if(currentRevision.equals(revision) == false) {
-            throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.", revision, currentRevision));
-        } else {
-            return revision.increment(revision.getClientId());
-        }
-    }
-
-    @Override
-    public boolean isCurrent(Revision revision) {
-        return getRevision().equals(revision);
-    }
-
-    @Override
-    public Revision getRevision() {
-        final ClusterContext ctx = ClusterContextThreadLocal.getContext();
-        if(ctx == null || ctx.getRevision() == null) {
-            return optimisticLockingManager.getRevision();
-        } else {
-            return ctx.getRevision();
-        }
-    }
-
-    @Override
-    public void setRevision(final Revision revision) {
-        final ClusterContext ctx = ClusterContextThreadLocal.getContext();
-        if(ctx != null) {
-            ctx.setRevision(revision);
-        }
-        optimisticLockingManager.setRevision(revision);
-    }
-
-    @Override
-    public Revision incrementRevision() {
-        final Revision currentRevision = getRevision();
-        final Revision incRevision = currentRevision.increment();
-        setRevision(incRevision);
-        return incRevision;
-    }
-
-    @Override
-    public Revision incrementRevision(final String clientId) {
-        final Revision currentRevision = getRevision();
-        final Revision incRevision = currentRevision.increment(clientId);
-        setRevision(incRevision);
-        return incRevision;
-    }
-
-    @Override
-    public String getLastModifier() {
-        return optimisticLockingManager.getLastModifier();
-    }
-
-    @Override
-    public void setLastModifier(final String lastModifier) {
-        optimisticLockingManager.setLastModifier(lastModifier);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
index 7b6a418..bdff00f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
@@ -47,6 +47,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-web-optimistic-locking</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-framework-core</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
index eedb88f..c17b429 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
@@ -27,14 +27,25 @@ import org.apache.nifi.cluster.protocol.StandardDataFlow;
 public class ClusterDataFlow {
 
     private final StandardDataFlow dataFlow;
-
     private final NodeIdentifier primaryNodeId;
+    private final byte[] controllerServices;
+    private final byte[] reportingTasks;
 
-    public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId) {
+    public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId, final byte[] controllerServices, final byte[] reportingTasks) {
         this.dataFlow = dataFlow;
         this.primaryNodeId = primaryNodeId;
+        this.controllerServices = controllerServices;
+        this.reportingTasks = reportingTasks;
     }
 
+    public byte[] getControllerServices() {
+    	return controllerServices;
+    }
+    
+    public byte[] getReportingTasks() {
+    	return reportingTasks;
+    }
+    
     public NodeIdentifier getPrimaryNodeId() {
         return primaryNodeId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
index 339d904..082d65e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.cluster.flow;
 
 import java.util.Set;
+
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
 /**
@@ -67,6 +68,22 @@ public interface DataFlowManagementService {
     void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException;
 
     /**
+     * Updates the dataflow with the given serialized form of the Controller Services that are to exist on the NCM.
+     * 
+     * @param serializedControllerServices
+     * @throws DaoException
+     */
+    void updateControllerServices(byte[] serializedControllerServices) throws DaoException;
+    
+    /**
+     * Updates the dataflow with the given serialized form of Reporting Tasks that are to exist on the NCM.
+     * 
+     * @param serviceNodes
+     * @throws DaoException
+     */
+    void updateReportingTasks(byte[] serializedReportingTasks) throws DaoException;
+    
+    /**
      * Sets the state of the flow.
      *
      * @param flowState the state

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
index 72b594a..dd9d2a3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
@@ -111,6 +111,8 @@ public class DataFlowDaoImpl implements DataFlowDao {
     public static final String FLOW_XML_FILENAME = "flow.xml";
     public static final String TEMPLATES_FILENAME = "templates.xml";
     public static final String SNIPPETS_FILENAME = "snippets.xml";
+    public static final String CONTROLLER_SERVICES_FILENAME = "controller-services.xml";
+    public static final String REPORTING_TASKS_FILENAME = "reporting-tasks.xml";
     public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml";
 
     private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class));
@@ -408,13 +410,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
         final File stateFile = new File(dir, FLOW_PACKAGE);
         stateFile.createNewFile();
 
-        final byte[] flowBytes = getEmptyFlowBytes();
-        final byte[] templateBytes = new byte[0];
-        final byte[] snippetBytes = new byte[0];
-        final DataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
-
-        final ClusterMetadata clusterMetadata = new ClusterMetadata();
-        writeDataFlow(stateFile, dataFlow, clusterMetadata);
+        writeDataFlow(stateFile, new ClusterDataFlow(null, null, new byte[0], new byte[0]), new ClusterMetadata());
 
         return stateFile;
     }
@@ -479,7 +475,9 @@ public class DataFlowDaoImpl implements DataFlowDao {
         byte[] templateBytes = new byte[0];
         byte[] snippetBytes = new byte[0];
         byte[] clusterInfoBytes = new byte[0];
-
+        byte[] controllerServiceBytes = new byte[0];
+        byte[] reportingTaskBytes = new byte[0];
+        
         try (final InputStream inStream = new FileInputStream(file);
                 final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) {
             TarArchiveEntry tarEntry;
@@ -501,6 +499,14 @@ public class DataFlowDaoImpl implements DataFlowDao {
                         clusterInfoBytes = new byte[(int) tarEntry.getSize()];
                         StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true);
                         break;
+                    case CONTROLLER_SERVICES_FILENAME:
+                    	controllerServiceBytes = new byte[(int) tarEntry.getSize()];
+                    	StreamUtils.fillBuffer(tarIn, controllerServiceBytes, true);
+                    	break;
+                    case REPORTING_TASKS_FILENAME:
+                    	reportingTaskBytes = new byte[(int) tarEntry.getSize()];
+                    	StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true);
+                    	break;
                     default:
                         throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName());
                 }
@@ -518,7 +524,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
         final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
         dataFlow.setAutoStartProcessors(autoStart);
 
-        return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId());
+        return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes);
     }
 
     private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException {
@@ -536,7 +542,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
         clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId());
 
         // write to disk
-        writeDataFlow(file, dataFlow, clusterMetadata);
+        writeDataFlow(file, clusterDataFlow, clusterMetadata);
     }
 
     private void writeTarEntry(final TarArchiveOutputStream tarOut, final String filename, final byte[] bytes) throws IOException {
@@ -547,14 +553,23 @@ public class DataFlowDaoImpl implements DataFlowDao {
         tarOut.closeArchiveEntry();
     }
 
-    private void writeDataFlow(final File file, final DataFlow dataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException {
+    private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException {
 
         try (final OutputStream fos = new FileOutputStream(file);
                 final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) {
 
-            writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
-            writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates());
-            writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets());
+            final DataFlow dataFlow = clusterDataFlow.getDataFlow();
+            if ( dataFlow == null ) {
+                writeTarEntry(tarOut, FLOW_XML_FILENAME, getEmptyFlowBytes());
+                writeTarEntry(tarOut, TEMPLATES_FILENAME, new byte[0]);
+                writeTarEntry(tarOut, SNIPPETS_FILENAME, new byte[0]);
+            } else {
+                writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
+                writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates());
+                writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets());
+            }
+            writeTarEntry(tarOut, CONTROLLER_SERVICES_FILENAME, clusterDataFlow.getControllerServices());
+            writeTarEntry(tarOut, REPORTING_TASKS_FILENAME, clusterDataFlow.getReportingTasks());
 
             final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
             writeClusterMetadata(clusterMetadata, baos);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
index e135af3..1bb8ca3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
@@ -41,7 +41,6 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
 import org.apache.nifi.logging.NiFiLog;
 import org.apache.nifi.util.FormatUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -154,17 +153,74 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
             final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
 
             final StandardDataFlow dataFlow;
+            final byte[] controllerServiceBytes;
+            final byte[] reportingTaskBytes;
             if (existingClusterDataFlow == null) {
                 dataFlow = null;
+                controllerServiceBytes = new byte[0];
+                reportingTaskBytes = new byte[0];
             } else {
                 dataFlow = existingClusterDataFlow.getDataFlow();
+                controllerServiceBytes = existingClusterDataFlow.getControllerServices();
+                reportingTaskBytes = existingClusterDataFlow.getReportingTasks();
             }
 
-            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId));
+            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
         } finally {
             resourceLock.unlock("updatePrimaryNode");
         }
     }
+    
+    
+    @Override
+    public void updateControllerServices(final byte[] controllerServiceBytes) throws DaoException {
+    	resourceLock.lock();
+    	try {
+    		final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+
+            final StandardDataFlow dataFlow;
+            final byte[] reportingTaskBytes;
+            final NodeIdentifier nodeId;
+            if (existingClusterDataFlow == null) {
+                dataFlow = null;
+                nodeId = null;
+                reportingTaskBytes = new byte[0];
+            } else {
+                dataFlow = existingClusterDataFlow.getDataFlow();
+                nodeId = existingClusterDataFlow.getPrimaryNodeId();
+                reportingTaskBytes = existingClusterDataFlow.getReportingTasks();
+            }
+
+            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
+    	} finally {
+    		resourceLock.unlock("updateControllerServices");
+    	}
+    }
+    
+    @Override
+    public void updateReportingTasks(final byte[] reportingTaskBytes) throws DaoException {
+    	resourceLock.lock();
+    	try {
+    		final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+
+            final StandardDataFlow dataFlow;
+            final byte[] controllerServiceBytes;
+            final NodeIdentifier nodeId;
+            if (existingClusterDataFlow == null) {
+                dataFlow = null;
+                nodeId = null;
+                controllerServiceBytes = null;
+            } else {
+                dataFlow = existingClusterDataFlow.getDataFlow();
+                nodeId = existingClusterDataFlow.getPrimaryNodeId();
+                controllerServiceBytes = existingClusterDataFlow.getControllerServices();
+            }
+
+            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
+    	} finally {
+    		resourceLock.unlock("updateControllerServices");
+    	}
+    }
 
     @Override
     public PersistedFlowState getPersistedFlowState() {
@@ -303,9 +359,10 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
                             final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
                             final ClusterDataFlow currentClusterDataFlow;
                             if (existingClusterDataFlow == null) {
-                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, null);
+                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, null, new byte[0], new byte[0]);
                             } else {
-                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId());
+                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(), 
+                                		existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks());
                             }
                             flowDao.saveDataFlow(currentClusterDataFlow);
                             flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
index 3f966e5..8bc73ab 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
@@ -192,6 +192,20 @@ public class NodeResponse {
     }
 
     /**
+     * If this node response has been merged returns the updated entity, 
+     * otherwise null. Also returns null if hasThrowable() is true. The
+     * intent of this method is to support getting the response entity
+     * when it was already consumed during the merge operation. In this
+     * case the client response rom getClientResponse() will not support 
+     * a getEntity(...) or getEntityInputStream()  call.
+     * 
+     * @return 
+     */
+    public Entity getUpdatedEntity() {
+        return updatedEntity;
+    }
+    
+    /**
      * Creates a Response by mapping the ClientResponse values to it. Since the
      * ClientResponse's input stream can only be read once, this method should
      * only be called once. Furthermore, the caller should not have already read