You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/04/15 22:04:02 UTC

[10/22] nifi git commit: NIFI-1551: - Removing the AuthorityProvider. - Refactoring REST API in preparation for introduction of the Authorizer. - Updating UI accordingly. - Removing unneeded properties from nifi.properties. - Addressing comments from PR.

http://git-wip-us.apache.org/repos/asf/nifi/blob/153f63ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 96beff5..1154a39 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -17,13 +17,14 @@
 package org.apache.nifi.web.api;
 
 import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
 import com.wordnik.swagger.annotations.ApiOperation;
 import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.context.ClusterContext;
+import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
@@ -33,23 +34,41 @@ import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.api.dto.ConnectionDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.PositionDTO;
+import org.apache.nifi.web.api.dto.FunnelDTO;
+import org.apache.nifi.web.api.dto.LabelDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ConnectionsEntity;
 import org.apache.nifi.web.api.entity.FlowSnippetEntity;
+import org.apache.nifi.web.api.entity.FunnelEntity;
+import org.apache.nifi.web.api.entity.FunnelsEntity;
+import org.apache.nifi.web.api.entity.InputPortEntity;
+import org.apache.nifi.web.api.entity.InputPortsEntity;
+import org.apache.nifi.web.api.entity.LabelEntity;
+import org.apache.nifi.web.api.entity.LabelsEntity;
+import org.apache.nifi.web.api.entity.OutputPortEntity;
+import org.apache.nifi.web.api.entity.OutputPortsEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupsEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorsEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.DoubleParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-import org.springframework.security.access.prepost.PreAuthorize;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -64,12 +83,11 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.net.URI;
-import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -79,7 +97,7 @@ import java.util.UUID;
 /**
  * RESTful endpoint for managing a Group.
  */
-@Api(hidden = true)
+@Path("process-groups")
 public class ProcessGroupResource extends ApplicationResource {
 
     private static final String VERBOSE = "false";
@@ -91,119 +109,14 @@ public class ProcessGroupResource extends ApplicationResource {
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
     private NiFiProperties properties;
-    private String groupId;
 
-    /**
-     * Get the processor resource within the specified group.
-     *
-     * @return the processor resource within the specified group
-     */
-    @Path("processors")
-    @ApiOperation(
-            value = "Gets the processor resource",
-            response = ProcessorResource.class
-    )
-    public ProcessorResource getProcessorResource() {
-        ProcessorResource processorResource = resourceContext.getResource(ProcessorResource.class);
-        processorResource.setGroupId(groupId);
-        return processorResource;
-    }
-
-    /**
-     * Get the connection sub-resource within the specified group.
-     *
-     * @return the connection sub-resource within the specified group
-     */
-    @Path("connections")
-    @ApiOperation(
-            value = "Gets the connection resource",
-            response = ConnectionResource.class
-    )
-    public ConnectionResource getConnectionResource() {
-        ConnectionResource connectionResource = resourceContext.getResource(ConnectionResource.class);
-        connectionResource.setGroupId(groupId);
-        return connectionResource;
-    }
-
-    /**
-     * Get the input ports sub-resource within the specified group.
-     *
-     * @return the input ports sub-resource within the specified group
-     */
-    @Path("input-ports")
-    @ApiOperation(
-            value = "Gets the input port resource",
-            response = InputPortResource.class
-    )
-    public InputPortResource getInputPortResource() {
-        InputPortResource inputPortResource = resourceContext.getResource(InputPortResource.class);
-        inputPortResource.setGroupId(groupId);
-        return inputPortResource;
-    }
-
-    /**
-     * Get the output ports sub-resource within the specified group.
-     *
-     * @return the output ports sub-resource within the specified group
-     */
-    @Path("output-ports")
-    @ApiOperation(
-            value = "Gets the output port resource",
-            response = OutputPortResource.class
-    )
-    public OutputPortResource getOutputPortResource() {
-        OutputPortResource outputPortResource = resourceContext.getResource(OutputPortResource.class);
-        outputPortResource.setGroupId(groupId);
-        return outputPortResource;
-    }
-
-    /**
-     * Locates the label sub-resource within the specified group.
-     *
-     * @return the label sub-resource within the specified group
-     */
-    @Path("labels")
-    @ApiOperation(
-            value = "Gets the label resource",
-            response = LabelResource.class
-    )
-    public LabelResource getLabelResource() {
-        LabelResource labelResource = resourceContext.getResource(LabelResource.class);
-        labelResource.setGroupId(groupId);
-        return labelResource;
-    }
-
-    /**
-     * Locates the funnel sub-resource within the specified group.
-     *
-     * @return the funnel sub-resource within the specified group
-     */
-    @Path("funnels")
-    @ApiOperation(
-            value = "Gets the funnel resource",
-            response = FunnelResource.class
-    )
-    public FunnelResource getFunnelResource() {
-        FunnelResource funnelResource = resourceContext.getResource(FunnelResource.class);
-        funnelResource.setGroupId(groupId);
-        return funnelResource;
-    }
-
-    /**
-     * Locates the remote process group sub-resource within the specified group.
-     *
-     * @return the remote process group sub-resource within the specified group
-     */
-    @Path("remote-process-groups")
-    @ApiOperation(
-            value = "Gets the remote process group resource",
-            response = RemoteProcessGroupResource.class
-    )
-    public RemoteProcessGroupResource getRemoteProcessGroupResource() {
-        RemoteProcessGroupResource remoteProcessGroupResource = resourceContext.getResource(RemoteProcessGroupResource.class);
-        remoteProcessGroupResource.setGroupId(groupId);
-        return remoteProcessGroupResource;
-    }
+    private ProcessorResource processorResource;
+    private InputPortResource inputPortResource;
+    private OutputPortResource outputPortResource;
+    private FunnelResource funnelResource;
+    private LabelResource labelResource;
+    private RemoteProcessGroupResource remoteProcessGroupResource;
+    private ConnectionResource connectionResource;
 
     /**
      * Populates the remaining fields in the specified process groups.
@@ -213,7 +126,7 @@ public class ProcessGroupResource extends ApplicationResource {
      */
     public Set<ProcessGroupDTO> populateRemainingProcessGroupsContent(Set<ProcessGroupDTO> processGroups) {
         for (ProcessGroupDTO processGroup : processGroups) {
-            populateRemainingProcessGroupContent(processGroup, getProcessGroupReferenceUri(processGroup));
+            populateRemainingProcessGroupContent(processGroup);
         }
         return processGroups;
     }
@@ -222,10 +135,9 @@ public class ProcessGroupResource extends ApplicationResource {
      * Populates the remaining fields in the specified process group.
      *
      * @param processGroup group
-     * @param processGroupUri processGroupUri
      * @return group dto
      */
-    private ProcessGroupDTO populateRemainingProcessGroupContent(ProcessGroupDTO processGroup, String processGroupUri) {
+    private ProcessGroupDTO populateRemainingProcessGroupContent(ProcessGroupDTO processGroup) {
         FlowSnippetDTO flowSnippet = processGroup.getContents();
 
         // populate the remaining fields for the processors, connections, process group refs, remote process groups, and labels if appropriate
@@ -234,7 +146,7 @@ public class ProcessGroupResource extends ApplicationResource {
         }
 
         // set the process group uri
-        processGroup.setUri(processGroupUri);
+        processGroup.setUri(generateResourceUri("process-groups",  processGroup.getId()));
 
         return processGroup;
     }
@@ -243,13 +155,13 @@ public class ProcessGroupResource extends ApplicationResource {
      * Populates the remaining content of the specified snippet.
      */
     private FlowSnippetDTO populateRemainingSnippetContent(FlowSnippetDTO snippet) {
-        getProcessorResource().populateRemainingProcessorsContent(snippet.getProcessors());
-        getConnectionResource().populateRemainingConnectionsContent(snippet.getConnections());
-        getInputPortResource().populateRemainingInputPortsContent(snippet.getInputPorts());
-        getOutputPortResource().populateRemainingOutputPortsContent(snippet.getOutputPorts());
-        getRemoteProcessGroupResource().populateRemainingRemoteProcessGroupsContent(snippet.getRemoteProcessGroups());
-        getFunnelResource().populateRemainingFunnelsContent(snippet.getFunnels());
-        getLabelResource().populateRemainingLabelsContent(snippet.getLabels());
+        processorResource.populateRemainingProcessorsContent(snippet.getProcessors());
+        connectionResource.populateRemainingConnectionsContent(snippet.getConnections());
+        inputPortResource.populateRemainingInputPortsContent(snippet.getInputPorts());
+        outputPortResource.populateRemainingOutputPortsContent(snippet.getOutputPorts());
+        remoteProcessGroupResource.populateRemainingRemoteProcessGroupsContent(snippet.getRemoteProcessGroups());
+        funnelResource.populateRemainingFunnelsContent(snippet.getFunnels());
+        labelResource.populateRemainingLabelsContent(snippet.getLabels());
 
         // go through each process group child and populate its uri
         if (snippet.getProcessGroups() != null) {
@@ -260,108 +172,13 @@ public class ProcessGroupResource extends ApplicationResource {
     }
 
     /**
-     * Generates a URI for a process group.
-     */
-    private String getProcessGroupUri(String processGroupId) {
-        return generateResourceUri("controller", "process-groups", processGroupId);
-    }
-
-    /**
-     * Generates a URI for a process group reference.
-     */
-    private String getProcessGroupReferenceUri(ProcessGroupDTO processGroup) {
-        return generateResourceUri("controller", "process-groups", processGroup.getParentGroupId(), "process-group-references", processGroup.getId());
-    }
-
-    /**
-     * Retrieves the content of the specified group. This includes all processors, the connections, the process group references, the remote process group references, and the labels.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param recursive Optional recursive flag that defaults to false. If set to true, all descendent groups and their content will be included if the verbose flag is also set to true.
-     * @param verbose Optional verbose flag that defaults to false. If the verbose flag is set to true processor configuration and property details will be included in the response.
-     * @return A processGroupEntity.
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("") // necessary due to bug in swagger
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets a process group",
-            notes = "Gets a process group and includes all components contained in this group. The verbose and recursive flags can be used to adjust "
-            + "the default behavior. This endpoint is starting point for obtaining the current flow and consequently includes the current "
-            + "flow revision.",
-            response = ProcessGroupEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
-                @Authorization(value = "Administrator", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getProcessGroup(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "Whether the response should contain all encapsulated components or just the immediate children.",
-                    required = false,
-                    allowableValues = "true, false"
-            )
-            @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive,
-            @ApiParam(
-                    value = "Whether to include any encapulated components or just details about the process group.",
-                    required = false
-            )
-            @QueryParam("verbose") @DefaultValue(VERBOSE) Boolean verbose) {
-
-        // replicate if cluster manager
-        if (properties.isClusterManager()) {
-            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
-        }
-
-        // only recurse if the request is verbose and recursive
-        final boolean recurse = verbose && recursive;
-
-        // get this process group contents
-        final ConfigurationSnapshot<ProcessGroupDTO> controllerResponse = serviceFacade.getProcessGroup(groupId, recurse);
-        final ProcessGroupDTO processGroup = controllerResponse.getConfiguration();
-
-        // prune response if necessary
-        if (!verbose) {
-            processGroup.setContents(null);
-        }
-
-        // get the updated revision
-        final RevisionDTO revision = new RevisionDTO();
-        revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getVersion());
-
-        // create the response entity
-        final ProcessGroupEntity processGroupEntity = new ProcessGroupEntity();
-        processGroupEntity.setRevision(revision);
-        processGroupEntity.setProcessGroup(populateRemainingProcessGroupContent(processGroup, getProcessGroupUri(processGroup.getId())));
-
-        return clusterContext(generateOkResponse(processGroupEntity)).build();
-    }
-
-    /**
      * Copies the specified snippet within this ProcessGroup. The snippet instance that is instantiated cannot be referenced at a later time, therefore there is no
      * corresponding URI. Instead the request URI is returned.
      *
      * Alternatively, we could have performed a PUT request. However, PUT requests are supposed to be idempotent and this endpoint is certainly not.
      *
      * @param httpServletRequest request
+     * @param groupId The group id
      * @param version The revision is used to verify the client is working with the latest version of the flow.
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
      * @param snippetId The id of the snippet to copy.
@@ -371,9 +188,9 @@ public class ProcessGroupResource extends ApplicationResource {
      */
     @POST
     @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/snippet-instance")
-    @PreAuthorize("hasRole('ROLE_DFM')")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/snippet-instance")
+    // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
     @ApiOperation(
             value = "Copies a snippet",
             response = FlowSnippetEntity.class,
@@ -393,6 +210,11 @@ public class ProcessGroupResource extends ApplicationResource {
     public Response copySnippet(
             @Context HttpServletRequest httpServletRequest,
             @ApiParam(
+                value = "The process group id.",
+                required = true
+            )
+            @PathParam("id") String groupId,
+            @ApiParam(
                     value = "The revision is used to verify the client is working with the latest version of the flow.",
                     required = false
             )
@@ -476,6 +298,7 @@ public class ProcessGroupResource extends ApplicationResource {
      * Alternatively, we could have performed a PUT request. However, PUT requests are supposed to be idempotent and this endpoint is certainly not.
      *
      * @param httpServletRequest request
+     * @param groupId The group id
      * @param version The revision is used to verify the client is working with the latest version of the flow.
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
      * @param templateId The id of the template to instantiate.
@@ -485,9 +308,9 @@ public class ProcessGroupResource extends ApplicationResource {
      */
     @POST
     @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/template-instance")
-    @PreAuthorize("hasRole('ROLE_DFM')")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/template-instance")
+    // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
     @ApiOperation(
             value = "Instantiates a template",
             response = FlowSnippetEntity.class,
@@ -507,6 +330,11 @@ public class ProcessGroupResource extends ApplicationResource {
     public Response instantiateTemplate(
             @Context HttpServletRequest httpServletRequest,
             @ApiParam(
+                value = "The process group id.",
+                required = true
+            )
+            @PathParam("id") String groupId,
+            @ApiParam(
                     value = "The revision is used to verify the client is working with the latest version of the flow.",
                     required = false
             )
@@ -581,66 +409,107 @@ public class ProcessGroupResource extends ApplicationResource {
     }
 
     /**
-     * Updates the state of all processors in the process group. Supports modifying whether the processors and process groups are running/stopped and instantiating templates.
+     * Retrieves the contents of the specified group.
      *
-     * @param httpServletRequest request
-     * @param version The revision is used to verify the client is working with the latest version of the flow.
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param running Optional flag that indicates whether all processors in this group should be started/stopped.
+     * @param recursive Optional recursive flag that defaults to false. If set to true, all descendent groups and their content will be included if the verbose flag is also set to true.
+     * @param groupId The id of the process group.
+     * @param verbose Optional verbose flag that defaults to false. If the verbose flag is set to true processor configuration and property details will be included in the response.
      * @return A processGroupEntity.
      */
-    @PUT
-    @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("") // necessary due to bug in swagger
-    @PreAuthorize("hasRole('ROLE_DFM')")
-    public Response updateProcessGroup(
-            @Context HttpServletRequest httpServletRequest,
-            @FormParam(VERSION) LongParameter version,
-            @FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @FormParam("running") Boolean running) {
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Gets a process group",
+            response = ProcessGroupEntity.class,
+            authorizations = {
+                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+                @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response getProcessGroup(
+            @ApiParam(
+                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+                    required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+            @ApiParam(
+                    value = "The process group id.",
+                    required = false
+            )
+            @PathParam("id") String groupId,
+            @ApiParam(
+                    value = "Whether the response should contain all encapsulated components or just the immediate children.",
+                    required = false
+            )
+            @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive,
+            @ApiParam(
+                    value = "Whether to include any encapulated components or just details about the process group.",
+                    required = false
+            )
+            @QueryParam("verbose") @DefaultValue(VERBOSE) Boolean verbose) {
+
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+        }
+
+        // only recurse if the request is verbose and recursive
+        final boolean recurse = verbose && recursive;
 
-        // create the process group dto
-        final ProcessGroupDTO processGroup = new ProcessGroupDTO();
-        processGroup.setId(groupId);
-        processGroup.setRunning(running);
+        // get this process group contents
+        final ConfigurationSnapshot<ProcessGroupDTO> controllerResponse = serviceFacade.getProcessGroup(groupId, recurse);
+        final ProcessGroupDTO processGroup = controllerResponse.getConfiguration();
+
+        // prune the response if necessary
+        if (!verbose) {
+            processGroup.setContents(null);
+        }
 
         // create the revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
+        revision.setVersion(controllerResponse.getVersion());
 
-        if (version != null) {
-            revision.setVersion(version.getLong());
-        }
-
-        // create the entity for the request
-        final ProcessGroupEntity entity = new ProcessGroupEntity();
-        entity.setRevision(revision);
-        entity.setProcessGroup(processGroup);
+        // create the response entity
+        final ProcessGroupEntity processGroupEntity = new ProcessGroupEntity();
+        processGroupEntity.setRevision(revision);
+        processGroupEntity.setProcessGroup(populateRemainingProcessGroupContent(processGroup));
 
-        // update the process group
-        return updateProcessGroup(httpServletRequest, entity);
+        return clusterContext(generateOkResponse(processGroupEntity)).build();
     }
 
     /**
-     * Updates the state of all processors in the process group. Supports modifying whether the processors and process groups are running/stopped and instantiating templates.
+     * Updates the specified process group.
      *
      * @param httpServletRequest request
-     * @param processGroupEntity A processGroupEntity
-     * @return A processGroupEntity
+     * @param id The id of the process group.
+     * @param processGroupEntity A processGroupEntity.
+     * @return A processGroupEntity.
      */
     @PUT
-    @Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("") // necessary due to bug in swagger
-    @PreAuthorize("hasRole('ROLE_DFM')")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}")
+    // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
     @ApiOperation(
             value = "Updates a process group",
             response = ProcessGroupEntity.class,
             authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
-                @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
             }
     )
     @ApiResponses(
@@ -652,11 +521,15 @@ public class ProcessGroupResource extends ApplicationResource {
                 @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
             }
     )
-    public Response updateProcessGroup(
+    public Response updateProcessGroupReference(
             @Context HttpServletRequest httpServletRequest,
             @ApiParam(
-                    value = "The process group to update. The only action that is supported at this endpoint is to set the running flag in order "
-                    + "to start or stop all descendent schedulable components. This defines the schema of the expected input.",
+                    value = "The process group id.",
+                    required = true
+            )
+            @PathParam("id") String id,
+            @ApiParam(
+                    value = "The process group configuration details.",
                     required = true
             )
             ProcessGroupEntity processGroupEntity) {
@@ -670,20 +543,14 @@ public class ProcessGroupResource extends ApplicationResource {
         }
 
         // ensure the same id is being used
-        ProcessGroupDTO requestProcessGroupDTO = processGroupEntity.getProcessGroup();
-        if (!groupId.equals(requestProcessGroupDTO.getId())) {
+        final ProcessGroupDTO requestProcessGroupDTO = processGroupEntity.getProcessGroup();
+        if (!id.equals(requestProcessGroupDTO.getId())) {
             throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
-                    + "not equal the process group id of the requested resource (%s).", requestProcessGroupDTO.getId(), groupId));
+                    + "not equal the process group id of the requested resource (%s).", requestProcessGroupDTO.getId(), id));
         }
 
-        // replicate if cluster manager
         if (properties.isClusterManager()) {
-            // change content type to JSON for serializing entity
-            final Map<String, String> headersToOverride = new HashMap<>();
-            headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
-
-            // replicate the request
-            return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), updateClientId(processGroupEntity), getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), updateClientId(processGroupEntity), getHeaders()).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)
@@ -696,10 +563,10 @@ public class ProcessGroupResource extends ApplicationResource {
         // update the process group
         final RevisionDTO revision = processGroupEntity.getRevision();
         final ConfigurationSnapshot<ProcessGroupDTO> response = serviceFacade.updateProcessGroup(
-                new Revision(revision.getVersion(), revision.getClientId()), null, requestProcessGroupDTO);
+                new Revision(revision.getVersion(), revision.getClientId()), requestProcessGroupDTO);
         final ProcessGroupDTO processGroup = response.getConfiguration();
 
-        // get the updated revision
+        // create the revision
         final RevisionDTO updatedRevision = new RevisionDTO();
         updatedRevision.setClientId(revision.getClientId());
         updatedRevision.setVersion(response.getVersion());
@@ -707,33 +574,34 @@ public class ProcessGroupResource extends ApplicationResource {
         // create the response entity
         final ProcessGroupEntity entity = new ProcessGroupEntity();
         entity.setRevision(updatedRevision);
-        entity.setProcessGroup(populateRemainingProcessGroupContent(processGroup, getProcessGroupUri(processGroup.getId())));
+        entity.setProcessGroup(populateRemainingProcessGroupContent(processGroup));
 
-        // generate the response
-        return clusterContext(generateOkResponse(entity)).build();
+        if (response.isNew()) {
+            return clusterContext(generateCreatedResponse(URI.create(processGroup.getUri()), entity)).build();
+        } else {
+            return clusterContext(generateOkResponse(entity)).build();
+        }
     }
 
     /**
-     * Retrieves the contents of the specified group.
+     * Removes the specified process group reference.
      *
+     * @param httpServletRequest request
+     * @param version The revision is used to verify the client is working with the latest version of the flow.
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param recursive Optional recursive flag that defaults to false. If set to true, all descendent groups and their content will be included if the verbose flag is also set to true.
-     * @param processGroupReferenceId The id of the process group.
-     * @param verbose Optional verbose flag that defaults to false. If the verbose flag is set to true processor configuration and property details will be included in the response.
+     * @param id The id of the process group to be removed.
      * @return A processGroupEntity.
      */
-    @GET
+    @DELETE
     @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/process-group-references/{id}")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}")
+    // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
     @ApiOperation(
-            value = "Gets a process group",
+            value = "Deletes a process group",
             response = ProcessGroupEntity.class,
             authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
-                @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
             }
     )
     @ApiResponses(
@@ -745,77 +613,81 @@ public class ProcessGroupResource extends ApplicationResource {
                 @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
             }
     )
-    public Response getProcessGroup(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+    public Response removeProcessGroupReference(
+            @Context HttpServletRequest httpServletRequest,
             @ApiParam(
-                    value = "The process group id.",
+                    value = "The revision is used to verify the client is working with the latest version of the flow.",
                     required = false
             )
-            @PathParam("id") String processGroupReferenceId,
+            @QueryParam(VERSION) LongParameter version,
             @ApiParam(
-                    value = "Whether the response should contain all encapsulated components or just the immediate children.",
+                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
                     required = false
             )
-            @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive,
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
             @ApiParam(
-                    value = "Whether to include any encapulated components or just details about the process group.",
-                    required = false
+                    value = "The process group id.",
+                    required = true
             )
-            @QueryParam("verbose") @DefaultValue(VERBOSE) Boolean verbose) {
+            @PathParam("id") String id) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
-        // only recurse if the request is verbose and recursive
-        final boolean recurse = verbose && recursive;
-
-        // get this process group contents
-        final ConfigurationSnapshot<ProcessGroupDTO> controllerResponse = serviceFacade.getProcessGroup(processGroupReferenceId, recurse);
-        final ProcessGroupDTO processGroup = controllerResponse.getConfiguration();
+        // handle expects request (usually from the cluster manager)
+        final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+        if (expects != null) {
+            serviceFacade.verifyDeleteProcessGroup(id);
+            return generateContinueResponse().build();
+        }
 
-        // prune the response if necessary
-        if (!verbose) {
-            processGroup.setContents(null);
+        // determine the specified version
+        Long clientVersion = null;
+        if (version != null) {
+            clientVersion = version.getLong();
         }
 
-        // create the revision
+        // delete the process group
+        final ConfigurationSnapshot<Void> controllerResponse = serviceFacade.deleteProcessGroup(new Revision(clientVersion, clientId.getClientId()), id);
+
+        // get the updated revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
         revision.setVersion(controllerResponse.getVersion());
 
         // create the response entity
-        final ProcessGroupEntity processGroupEntity = new ProcessGroupEntity();
-        processGroupEntity.setRevision(revision);
-        processGroupEntity.setProcessGroup(populateRemainingProcessGroupContent(processGroup, getProcessGroupReferenceUri(processGroup)));
+        final ProcessGroupEntity entity = new ProcessGroupEntity();
+        entity.setRevision(revision);
 
-        return clusterContext(generateOkResponse(processGroupEntity)).build();
+        // create the response
+        return clusterContext(generateOkResponse(entity)).build();
     }
 
     /**
-     * Retrieves the content of the specified group reference.
+     * Retrieves the status report for this NiFi.
      *
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param verbose Optional verbose flag that defaults to false. If the verbose flag is set to true processor configuration and property details will be included in the response.
-     * @return A controllerEntity.
+     * @param recursive Optional recursive flag that defaults to false. If set to true, all descendant groups and the status of their content will be included.
+     * @param groupId The group id
+     * @return A processGroupStatusEntity.
      */
     @GET
     @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/process-group-references")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/status")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN', 'ROLE_NIFI')")
     @ApiOperation(
-            value = "Gets all process groups",
-            response = ProcessGroupsEntity.class,
+            value = "Gets the status for a process group",
+            notes = "The status for a process group includes status for all descendent components. When invoked on the root group with "
+            + "recursive set to true, it will return the current status of every component in the flow.",
+            response = ProcessGroupStatusEntity.class,
             authorizations = {
                 @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
                 @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
-                @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+                @Authorization(value = "Administrator", type = "ROLE_ADMIN"),
+                @Authorization(value = "NiFi", type = "ROLE_NIFI")
             }
     )
     @ApiResponses(
@@ -827,30 +699,75 @@ public class ProcessGroupResource extends ApplicationResource {
                 @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
             }
     )
-    public Response getProcessGroupReferences(
+    public Response getProcessGroupStatus(
             @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
+                value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+                required = false
             )
             @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
             @ApiParam(
-                    value = "Whether to include any encapulated components or just details about the process group.",
-                    required = false
+                value = "Whether all descendant groups and the status of their content will be included. Optional, defaults to false",
+                required = false
             )
-            @QueryParam("verbose") @DefaultValue(VERBOSE) Boolean verbose) {
+            @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive,
+            @ApiParam(
+                value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+                required = false
+            )
+            @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+            @ApiParam(
+                value = "The id of the node where to get the status.",
+                required = false
+            )
+            @QueryParam("clusterNodeId") String clusterNodeId,
+            @ApiParam(
+                value = "The process group id.",
+                required = true
+            )
+            @PathParam("id") String groupId) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+        }
 
-        // replicate if cluster manager
         if (properties.isClusterManager()) {
-            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+                final ProcessGroupStatusEntity entity = (ProcessGroupStatusEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getProcessGroupStatus().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+            }
         }
 
-        // get this process group contents
-        final Set<ProcessGroupDTO> processGroups = serviceFacade.getProcessGroups(groupId);
+        // get the status
+        final ProcessGroupStatusDTO statusReport = serviceFacade.getProcessGroupStatus(groupId);
 
-        // prune the response if necessary
-        if (!verbose) {
-            for (ProcessGroupDTO processGroup : processGroups) {
-                processGroup.setContents(null);
+        // prune the response as necessary
+        if (!recursive) {
+            pruneChildGroups(statusReport.getAggregateSnapshot());
+            if (statusReport.getNodeSnapshots() != null) {
+                for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : statusReport.getNodeSnapshots()) {
+                    pruneChildGroups(nodeSnapshot.getStatusSnapshot());
+                }
             }
         }
 
@@ -859,96 +776,124 @@ public class ProcessGroupResource extends ApplicationResource {
         revision.setClientId(clientId.getClientId());
 
         // create the response entity
-        final ProcessGroupsEntity processGroupsEntity = new ProcessGroupsEntity();
-        processGroupsEntity.setRevision(revision);
-        processGroupsEntity.setProcessGroups(populateRemainingProcessGroupsContent(processGroups));
+        final ProcessGroupStatusEntity entity = new ProcessGroupStatusEntity();
+        entity.setRevision(revision);
+        entity.setProcessGroupStatus(statusReport);
+
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
+    }
 
-        return clusterContext(generateOkResponse(processGroupsEntity)).build();
+    private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO snapshot) {
+        for (final ProcessGroupStatusSnapshotDTO childProcessGroupStatus : snapshot.getProcessGroupStatusSnapshots()) {
+            childProcessGroupStatus.setConnectionStatusSnapshots(null);
+            childProcessGroupStatus.setProcessGroupStatusSnapshots(null);
+            childProcessGroupStatus.setInputPortStatusSnapshots(null);
+            childProcessGroupStatus.setOutputPortStatusSnapshots(null);
+            childProcessGroupStatus.setProcessorStatusSnapshots(null);
+            childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null);
+        }
     }
 
     /**
-     * Adds the specified process group.
+     * Retrieves the specified remote process groups status history.
      *
-     * @param httpServletRequest request
-     * @param version The revision is used to verify the client is working with the latest version of the flow.
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param name The name of the process group
-     * @param x The x coordinate for this funnels position.
-     * @param y The y coordinate for this funnels position.
-     * @return A processGroupEntity
+     * @param groupId The group id
+     * @return A processorEntity.
      */
-    @POST
-    @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/process-group-references")
-    @PreAuthorize("hasRole('ROLE_DFM')")
-    public Response createProcessGroupReference(
-            @Context HttpServletRequest httpServletRequest,
-            @FormParam(VERSION) LongParameter version,
-            @FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @FormParam("name") String name, @FormParam("x") DoubleParameter x, @FormParam("y") DoubleParameter y) {
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/status/history")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Gets status history for a remote process group",
+            response = StatusHistoryEntity.class,
+            authorizations = {
+                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+                @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response getProcessGroupStatusHistory(
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+        @ApiParam(
+            value = "The process group id.",
+            required = true
+        )
+        @PathParam("id") String groupId) {
 
-        // ensure the position has been specified
-        if (x == null || y == null) {
-            throw new IllegalArgumentException("The position (x, y) must be specified");
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
-        // create the process group dto
-        final ProcessGroupDTO processGroup = new ProcessGroupDTO();
-        processGroup.setName(name);
-        processGroup.setPosition(new PositionDTO(x.getDouble(), y.getDouble()));
+        // get the specified processor status history
+        final StatusHistoryDTO processGroupStatusHistory = serviceFacade.getProcessGroupStatusHistory(groupId);
 
         // create the revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
 
-        if (version != null) {
-            revision.setVersion(version.getLong());
-        }
-
-        // create the entity for the request
-        final ProcessGroupEntity entity = new ProcessGroupEntity();
+        // generate the response entity
+        final StatusHistoryEntity entity = new StatusHistoryEntity();
         entity.setRevision(revision);
-        entity.setProcessGroup(processGroup);
+        entity.setStatusHistory(processGroupStatusHistory);
 
-        // create the process group
-        return createProcessGroupReference(httpServletRequest, entity);
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
     }
 
     /**
      * Adds the specified process group.
      *
      * @param httpServletRequest request
+     * @param groupId The group id
      * @param processGroupEntity A processGroupEntity
      * @return A processGroupEntity
      */
     @POST
-    @Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/process-group-references")
-    @PreAuthorize("hasRole('ROLE_DFM')")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/process-groups")
+    // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
     @ApiOperation(
-            value = "Creates a process group",
-            response = ProcessGroupEntity.class,
-            authorizations = {
-                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
-            }
+        value = "Creates a process group",
+        response = ProcessGroupEntity.class,
+        authorizations = {
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+        }
     )
     @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
     )
-    public Response createProcessGroupReference(
-            @Context HttpServletRequest httpServletRequest,
-            @ApiParam(
-                    value = "The process group configuration details.",
-                    required = true
-            )
+    public Response createProcessGroup(
+        @Context HttpServletRequest httpServletRequest,
+        @ApiParam(
+            value = "The process group id.",
+            required = false
+        )
+        @PathParam("id") String groupId,
+        @ApiParam(
+            value = "The process group configuration details.",
+            required = true
+        )
             ProcessGroupEntity processGroupEntity) {
 
         if (processGroupEntity == null || processGroupEntity.getProcessGroup() == null) {
@@ -963,29 +908,8 @@ public class ProcessGroupResource extends ApplicationResource {
             throw new IllegalArgumentException("Process group ID cannot be specified.");
         }
 
-        // if cluster manager, convert POST to PUT (to maintain same ID across nodes) and replicate
         if (properties.isClusterManager()) {
-
-            // create ID for resource
-            final String id = UUID.randomUUID().toString();
-
-            // set ID for resource
-            processGroupEntity.getProcessGroup().setId(id);
-
-            // convert POST request to PUT request to force entity ID to be the same across nodes
-            URI putUri = null;
-            try {
-                putUri = new URI(getAbsolutePath().toString() + "/" + id);
-            } catch (final URISyntaxException e) {
-                throw new WebApplicationException(e);
-            }
-
-            // change content type to JSON for serializing entity
-            final Map<String, String> headersToOverride = new HashMap<>();
-            headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
-
-            // replicate put request
-            return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(processGroupEntity), getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), updateClientId(processGroupEntity), getHeaders()).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)
@@ -994,10 +918,18 @@ public class ProcessGroupResource extends ApplicationResource {
             return generateContinueResponse().build();
         }
 
+        // set the processor id as appropriate
+        final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
+        if (clusterContext != null) {
+            processGroupEntity.getProcessGroup().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
+        } else {
+            processGroupEntity.getProcessGroup().setId(UUID.randomUUID().toString());
+        }
+
         // create the process group contents
         final RevisionDTO revision = processGroupEntity.getRevision();
         final ConfigurationSnapshot<ProcessGroupDTO> controllerResponse = serviceFacade.createProcessGroup(groupId,
-                new Revision(revision.getVersion(), revision.getClientId()), processGroupEntity.getProcessGroup());
+            new Revision(revision.getVersion(), revision.getClientId()), processGroupEntity.getProcessGroup());
         final ProcessGroupDTO processGroup = controllerResponse.getConfiguration();
 
         // get the updated revision
@@ -1008,7 +940,7 @@ public class ProcessGroupResource extends ApplicationResource {
         // create the response entity
         final ProcessGroupEntity entity = new ProcessGroupEntity();
         entity.setRevision(updatedRevision);
-        entity.setProcessGroup(populateRemainingProcessGroupContent(processGroup, getProcessGroupReferenceUri(processGroup)));
+        entity.setProcessGroup(populateRemainingProcessGroupContent(processGroup));
 
         // generate a 201 created response
         String uri = processGroup.getUri();
@@ -1016,414 +948,1210 @@ public class ProcessGroupResource extends ApplicationResource {
     }
 
     /**
-     * Updates the specified process group.
+     * Retrieves all the processors in this NiFi.
      *
-     * @param httpServletRequest request
-     * @param version The revision is used to verify the client is working with the latest version of the flow.
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the process group
-     * @param name The name of the process group.
-     * @param comments The comments for the process group.
-     * @param running Optional flag that indicates whether all processors should be started/stopped.
-     * @param x The x coordinate for this funnels position.
-     * @param y The y coordinate for this funnels position.
-     * @return A processGroupEntity.
+     * @return A processorsEntity.
      */
-    @PUT
-    @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/process-group-references/{id}")
-    @PreAuthorize("hasRole('ROLE_DFM')")
-    public Response updateProcessGroupReference(
-            @Context HttpServletRequest httpServletRequest,
-            @FormParam(VERSION) LongParameter version,
-            @FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @PathParam("id") String id,
-            @FormParam("name") String name,
-            @FormParam("comments") String comments,
-            @FormParam("running") Boolean running,
-            @FormParam("x") DoubleParameter x,
-            @FormParam("y") DoubleParameter y) {
-
-        // create the process group dto
-        final ProcessGroupDTO processGroup = new ProcessGroupDTO();
-        processGroup.setId(id);
-        processGroup.setName(name);
-        processGroup.setComments(comments);
-        processGroup.setRunning(running);
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/process-groups")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+        value = "Gets all process groups",
+        response = ProcessorsEntity.class,
+        authorizations = {
+            @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+            @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response getProcessGroups(
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+        @ApiParam(
+            value = "The process group id.",
+            required = true
+        )
+        @PathParam("id") String groupId) {
 
-        // require both coordinates to be specified
-        if (x != null && y != null) {
-            processGroup.setPosition(new PositionDTO(x.getDouble(), y.getDouble()));
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // get the process groups
+        final Set<ProcessGroupDTO> processGroupDTOs = serviceFacade.getProcessGroups(groupId);
+
         // create the revision
         final RevisionDTO revision = new RevisionDTO();
         revision.setClientId(clientId.getClientId());
 
-        if (version != null) {
-            revision.setVersion(version.getLong());
-        }
-
-        // create the entity for the request
-        final ProcessGroupEntity entity = new ProcessGroupEntity();
+        // create the response entity
+        final ProcessGroupsEntity entity = new ProcessGroupsEntity();
         entity.setRevision(revision);
-        entity.setProcessGroup(processGroup);
+        entity.setProcessGroups(populateRemainingProcessGroupsContent(processGroupDTOs));
 
-        // update the process group
-        return updateProcessGroupReference(httpServletRequest, id, entity);
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
     }
 
     /**
-     * Updates the specified process group.
+     * Creates a new processor.
      *
      * @param httpServletRequest request
-     * @param id The id of the process group.
-     * @param processGroupEntity A processGroupEntity.
-     * @return A processGroupEntity.
+     * @param groupId The group id
+     * @param processorEntity A processorEntity.
+     * @return A processorEntity.
      */
-    @PUT
-    @Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/process-group-references/{id}")
-    @PreAuthorize("hasRole('ROLE_DFM')")
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/processors")
+    // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
     @ApiOperation(
-            value = "Updates a process group",
-            response = ProcessGroupEntity.class,
-            authorizations = {
-                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
-            }
+        value = "Creates a new processor",
+        response = ProcessorEntity.class,
+        authorizations = {
+            @Authorization(value = "ROLE_DFM", type = "ROLE_DFM")
+        }
     )
     @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
     )
-    public Response updateProcessGroupReference(
+    public Response createProcessor(
             @Context HttpServletRequest httpServletRequest,
             @ApiParam(
-                    value = "The process group id.",
-                    required = true
+                value = "The process group id.",
+                required = true
             )
-            @PathParam("id") String id,
+            @PathParam("id") String groupId,
             @ApiParam(
-                    value = "The process group configuration details.",
-                    required = true
+                value = "The processor configuration details.",
+                required = true
             )
-            ProcessGroupEntity processGroupEntity) {
+            ProcessorEntity processorEntity) {
 
-        if (processGroupEntity == null || processGroupEntity.getProcessGroup() == null) {
-            throw new IllegalArgumentException("Process group details must be specified.");
+        if (processorEntity == null || processorEntity.getProcessor() == null) {
+            throw new IllegalArgumentException("Processor details must be specified.");
         }
 
-        if (processGroupEntity.getRevision() == null) {
+        if (processorEntity.getRevision() == null) {
             throw new IllegalArgumentException("Revision must be specified.");
         }
 
-        // ensure the same id is being used
-        final ProcessGroupDTO requestProcessGroupDTO = processGroupEntity.getProcessGroup();
-        if (!id.equals(requestProcessGroupDTO.getId())) {
-            throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
-                    + "not equal the process group id of the requested resource (%s).", requestProcessGroupDTO.getId(), id));
+        if (processorEntity.getProcessor().getId() != null) {
+            throw new IllegalArgumentException("Processor ID cannot be specified.");
+        }
+
+        if (StringUtils.isBlank(processorEntity.getProcessor().getType())) {
+            throw new IllegalArgumentException("The type of processor to create must be specified.");
+        }
+
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), updateClientId(processorEntity), getHeaders()).getResponse();
+        }
+
+        // handle expects request (usually from the cluster manager)
+        final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+        if (expects != null) {
+            return generateContinueResponse().build();
+        }
+
+        // set the processor id as appropriate
+        final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
+        if (clusterContext != null) {
+            processorEntity.getProcessor().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
+        } else {
+            processorEntity.getProcessor().setId(UUID.randomUUID().toString());
+        }
+
+        // create the new processor
+        final RevisionDTO revision = processorEntity.getRevision();
+        final ConfigurationSnapshot<ProcessorDTO> controllerResponse = serviceFacade.createProcessor(
+            new Revision(revision.getVersion(), revision.getClientId()), groupId, processorEntity.getProcessor());
+        final ProcessorDTO processor = controllerResponse.getConfiguration();
+        processorResource.populateRemainingProcessorContent(processor);
+
+        // get the updated revision
+        final RevisionDTO updatedRevision = new RevisionDTO();
+        updatedRevision.setClientId(revision.getClientId());
+        updatedRevision.setVersion(controllerResponse.getVersion());
+
+        // generate the response entity
+        final ProcessorEntity entity = new ProcessorEntity();
+        entity.setRevision(updatedRevision);
+        entity.setProcessor(processor);
+
+        // generate a 201 created response
+        String uri = processor.getUri();
+        return clusterContext(generateCreatedResponse(URI.create(uri), entity)).build();
+    }
+
+    /**
+     * Retrieves all the processors in this NiFi.
+     *
+     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+     * @return A processorsEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/processors")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+        value = "Gets all processors",
+        response = ProcessorsEntity.class,
+        authorizations = {
+            @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+            @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response getProcessors(
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+        @ApiParam(
+            value = "The process group id.",
+            required = true
+        )
+        @PathParam("id") String groupId) {
+
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+        }
+
+        // get the processors
+        final Set<ProcessorDTO> processorDTOs = serviceFacade.getProcessors(groupId);
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // create the response entity
+        final ProcessorsEntity entity = new ProcessorsEntity();
+        entity.setRevision(revision);
+        entity.setProcessors(processorResource.populateRemainingProcessorsContent(processorDTOs));
+
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
+    }
+
+    /**
+     * Creates a new input port.
+     *
+     * @param httpServletRequest request
+     * @param groupId The group id
+     * @param portEntity A inputPortEntity.
+     * @return A inputPortEntity.
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/input-ports")
+    // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+    @ApiOperation(
+        value = "Creates an input port",
+        response = InputPortEntity.class,
+        authorizations = {
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response createInputPort(
+        @Context HttpServletRequest httpServletRequest,
+        @ApiParam(
+            value = "The process group id.",
+            required = true
+        )
+        @PathParam("id") String groupId,
+        @ApiParam(
+            value = "The input port configuration details.",
+            required = true
+        ) InputPortEntity portEntity) {
+
+        if (portEntity == null || portEntity.getInputPort() == null) {
+            throw new IllegalArgumentException("Port details must be specified.");
+        }
+
+        if (portEntity.getRevision() == null) {
+            throw new IllegalArgumentException("Revision must be specified.");
+        }
+
+        if (portEntity.getInputPort().getId() != null) {
+            throw new IllegalArgumentException("Input port ID cannot be specified.");
+        }
+
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), updateClientId(portEntity), getHeaders()).getResponse();
+        }
+
+        // handle expects request (usually from the cluster manager)
+        final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+        if (expects != null) {
+            return generateContinueResponse().build();
+        }
+
+        // set the processor id as appropriate
+        final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
+        if (clusterContext != null) {
+            portEntity.getInputPort().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
+        } else {
+            portEntity.getInputPort().setId(UUID.randomUUID().toString());
+        }
+
+        // create the input port and generate the json
+        final RevisionDTO revision = portEntity.getRevision();
+        final ConfigurationSnapshot<PortDTO> controllerResponse = serviceFacade.createInputPort(
+            new Revision(revision.getVersion(), revision.getClientId()), groupId, portEntity.getInputPort());
+        final PortDTO port = controllerResponse.getConfiguration();
+        inputPortResource.populateRemainingInputPortContent(port);
+
+        // get the updated revision
+        final RevisionDTO updatedRevision = new RevisionDTO();
+        updatedRevision.setClientId(revision.getClientId());
+        updatedRevision.setVersion(controllerResponse.getVersion());
+
+        // build the response entity
+        final InputPortEntity entity = new InputPortEntity();
+        entity.setRevision(updatedRevision);
+        entity.setInputPort(port);
+
+        // build the response
+        return clusterContext(generateCreatedResponse(URI.create(port.getUri()), entity)).build();
+    }
+
+    /**
+     * Retrieves all the of input ports in this NiFi.
+     *
+     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+     * @return A inputPortsEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/input-ports")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+        value = "Gets all input ports",
+        response = InputPortsEntity.class,
+        authorizations = {
+            @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+            @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response getInputPorts(
+        @ApiParam(
+            value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+            required = false
+        )
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+        @ApiParam(
+            value = "The process group id.",
+            required = true
+        )
+        @PathParam("id") String groupId) {
+
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+        }
+
+        // get all the input ports
+        final Set<PortDTO> inputPorts = inputPortResource.populateRemainingInputPortsContent(serviceFacade.getInputPorts(groupId));
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // create the response entity
+        final InputPortsEntity entity = new InputPortsEntity();
+        entity.setRevision(revision);
+        entity.setInputPorts(inputPorts);
+
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
+    }
+
+    /**
+     * Creates a new output port.
+     *
+     * @param httpServletRequest request
+     * @param groupId The group id
+     * @param portEntity A outputPortEntity.
+     * @return A outputPortEntity.
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/output-ports")
+    // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+    @ApiOperation(
+        value = "Creates an output port",
+        response = OutputPortEntity.class,
+        authorizations = {
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response createOutputPort(
+        @Context HttpServletRequest httpServletRequest,
+        @ApiParam(
+            value = "The process group id.",
+            required = true
+        )
+        @PathParam("id") String groupId,
+        @ApiParam(
+            value = "The output port configuration.",
+            required = true
+        ) OutputPortEntity portEntity) {
+
+        if (portEntity == null || portEntity.getOutputPort() == null) {
+            throw new IllegalArgumentException("Port details must be specified.");
+        }
+
+        if (portEntity.getRevision() == null) {
+            throw new IllegalArgumentException("Revision must be specified.");
+        }
+
+        if (portEntity.getOutputPort().getId() != null) {
+            throw new IllegalArgumentException("Output port ID cannot be specified.");
+        }
+
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), updateClientId(portEntity), getHeaders()).getResponse();
+        }
+
+        // handle expects request (usually from the cluster manager)
+        final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+        if (expects != null) {
+            return generateContinueResponse().build();
+        }
+
+        // set the processor id as appropriate
+        final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
+        if (clusterContext != null) {
+            portEntity.getOutputPort().setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
+        } else {
+            portEntity.getOutputPort().setId(UUID.randomUUID().toString());
+        }
+
+        // create the output port and generate the json
+        final RevisionDTO revision = portEntity.getRevision();
+        final ConfigurationSnapshot<PortDTO> controllerResponse = serviceFacade.createOutputPort(
+            new Revision(revision.getVersion(), revision.getClientId()), groupId, portEntity.getOutputPort());
+        final PortDTO port = controllerResponse.getConfiguration();
+        outputPortResource.populateRemainingOutputPortContent(port);
+
+        // get the updated revision
+        final RevisionDTO updatedRevision = new RevisionDTO();
+        updatedRevision.setClientId(revision.getClientId());
+        updatedRevision.setVersion(controllerResponse.getVersion());
+
+        // build the response entity
+        final OutputPortEntity entity = new OutputPortEntity();
+        entity.setRevision(updatedRevision);
+        entity.setOutputPort(port);
+
+        // build the response
+        return clusterContext(generateCreatedResponse(URI.create(port.getUri()), entity)).build();
+    }
+
+    /**
+     * Retrieves all the of output ports in this NiFi.
+     *
+     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+     * @return A outputPortsEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/output-ports")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+        value = "Gets all output ports",
+        response = OutputPortsEntity.class,
+        authorizations = {
+            @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+            @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response getOutputPorts(
+        @ApiParam(
+            value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+            required = false
+        )
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+        @ApiParam(
+            value = "The process group id.",
+            required = true
+        )
+        @PathParam("id") String groupId) {
+
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+        }
+
+        // get all the output ports
+        final Set<PortDTO> outputPorts = outputPortResource.populateRemainingOutputPortsContent(serviceFacade.getOutputPorts(groupId));
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // create the response entity
+        final OutputPortsEntity entity = new OutputPortsEntity();
+        entity.setRevision(revision);
+        entity.setOutputPorts(outputPorts);
+
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
+    }
+
+    /**
+     * Creates a new Funnel.
+     *
+     * @param httpServletRequest request
+     * @param groupId The group id
+     * @param funnelEntity A funnelEntity.
+     * @return A funnelEntity.
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/funnels")
+    // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+    @ApiOperation(
+        value = "Creates a funnel",
+        response = FunnelEntity.class,
+        authorizations = {
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = 

<TRUNCATED>