You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:43 UTC

[07/18] nifi git commit: NIFI-1563: - Federate requests and merge responses from nodes instead of storing bulletins and stats at NCM - Updating UI to support restructured status history DTO. - Return 'Insufficient History' message if aggregate stats don'

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
index 857df56..ec4c69e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
@@ -16,24 +16,14 @@
  */
 package org.apache.nifi.web.api;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HEAD;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
+import com.sun.jersey.api.core.ResourceContext;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.util.NiFiProperties;
@@ -47,36 +37,29 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.search.NodeSearchResultDTO;
-import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
-import org.apache.nifi.web.api.entity.ClusterConnectionStatusEntity;
 import org.apache.nifi.web.api.entity.ClusterEntity;
-import org.apache.nifi.web.api.entity.ClusterPortStatusEntity;
-import org.apache.nifi.web.api.entity.ClusterProcessorStatusEntity;
-import org.apache.nifi.web.api.entity.ClusterRemoteProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity;
-import org.apache.nifi.web.api.entity.ClusterStatusEntity;
-import org.apache.nifi.web.api.entity.ClusterStatusHistoryEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-
-import org.apache.commons.lang3.StringUtils;
 import org.springframework.security.access.prepost.PreAuthorize;
 
-import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
-import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
-import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HEAD;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * RESTful endpoint for managing a cluster.
@@ -108,62 +91,6 @@ public class ClusterResource extends ApplicationResource {
     }
 
     /**
-     * Gets the status of this NiFi cluster.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @return A clusterStatusEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/status")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets the status of the cluster",
-            response = ClusterStatusEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getClusterStatus(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
-
-        if (properties.isClusterManager()) {
-
-            ClusterStatusDTO dto = serviceFacade.getClusterStatus();
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterStatusEntity entity = new ClusterStatusEntity();
-            entity.setClusterStatus(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-
-    }
-
-    /**
      * Returns a 200 OK response to indicate this is a valid cluster endpoint.
      *
      * @return An OK response with an empty entity body.
@@ -519,622 +446,6 @@ public class ClusterResource extends ApplicationResource {
         throw new IllegalClusterResourceRequestException("Only a node can process the request.");
     }
 
-    /**
-     * Gets the processor status for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the processor
-     * @return A clusterProcessorStatusEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/processors/{id}/status")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets the processor status across the cluster",
-            response = ClusterProcessorStatusEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getProcessorStatus(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The processor id",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-
-            final ClusterProcessorStatusDTO dto = serviceFacade.getClusterProcessorStatus(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterProcessorStatusEntity entity = new ClusterProcessorStatusEntity();
-            entity.setClusterProcessorStatus(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
-
-    /**
-     * Gets the processor status history for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the processor
-     * @return A clusterProcessorStatusHistoryEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/processors/{id}/status/history")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets processor status history across the cluster",
-            response = ClusterStatusHistoryEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getProcessorStatusHistory(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The processor id",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-            final ClusterStatusHistoryDTO dto = serviceFacade.getClusterProcessorStatusHistory(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
-            entity.setClusterStatusHistory(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
-
-    /**
-     * Gets the connection status for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the processor
-     * @return A clusterProcessorStatusEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/connections/{id}/status")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets connection status across the cluster",
-            response = ClusterConnectionStatusEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getConnectionStatus(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The connection id",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-
-            final ClusterConnectionStatusDTO dto = serviceFacade.getClusterConnectionStatus(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterConnectionStatusEntity entity = new ClusterConnectionStatusEntity();
-            entity.setClusterConnectionStatus(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
-
-    /**
-     * Gets the connections status history for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the processor
-     * @return A clusterProcessorStatusHistoryEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/connections/{id}/status/history")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets connection status history across the cluster",
-            response = ClusterStatusHistoryEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getConnectionStatusHistory(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The connection id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-            final ClusterStatusHistoryDTO dto = serviceFacade.getClusterConnectionStatusHistory(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
-            entity.setClusterStatusHistory(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
-
-    /**
-     * Gets the process group status for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the process group
-     * @return A clusterProcessGroupStatusEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/process-groups/{id}/status")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets process group status across the cluster",
-            response = ClusterProcessGroupStatusEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getProcessGroupStatus(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The process group id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-
-            final ClusterProcessGroupStatusDTO dto = serviceFacade.getClusterProcessGroupStatus(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterProcessGroupStatusEntity entity = new ClusterProcessGroupStatusEntity();
-            entity.setClusterProcessGroupStatus(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
-
-    /**
-     * Gets the process group status history for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the process group
-     * @return A clusterProcessGroupStatusHistoryEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/process-groups/{id}/status/history")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets process group status history across the cluster",
-            response = ClusterStatusHistoryEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getProcessGroupStatusHistory(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The process group id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-            final ClusterStatusHistoryDTO dto = serviceFacade.getClusterProcessGroupStatusHistory(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
-            entity.setClusterStatusHistory(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
-
-    /**
-     * Gets the remote process group status for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the remote process group
-     * @return A clusterRemoteProcessGroupStatusEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/remote-process-groups/{id}/status")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets remote process group status across the cluster",
-            response = ClusterRemoteProcessGroupStatusEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getRemoteProcessGroupStatus(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The remote process group id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-
-            final ClusterRemoteProcessGroupStatusDTO dto = serviceFacade.getClusterRemoteProcessGroupStatus(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterRemoteProcessGroupStatusEntity entity = new ClusterRemoteProcessGroupStatusEntity();
-            entity.setClusterRemoteProcessGroupStatus(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
-
-    /**
-     * Gets the input port status for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the input port
-     * @return A clusterPortStatusEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/input-ports/{id}/status")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets input port status across the cluster",
-            response = ClusterPortStatusEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getInputPortStatus(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The input port id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-
-            final ClusterPortStatusDTO dto = serviceFacade.getClusterInputPortStatus(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterPortStatusEntity entity = new ClusterPortStatusEntity();
-            entity.setClusterPortStatus(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
-
-    /**
-     * Gets the output port status for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the output port
-     * @return A clusterPortStatusEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/output-ports/{id}/status")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets output port status across the cluster",
-            response = ClusterPortStatusEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getOutputPortStatus(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The output port id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-
-            final ClusterPortStatusDTO dto = serviceFacade.getClusterOutputPortStatus(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterPortStatusEntity entity = new ClusterPortStatusEntity();
-            entity.setClusterPortStatus(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
-
-    /**
-     * Gets the remote process group status history for every node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the processor
-     * @return A clusterRemoteProcessGroupStatusHistoryEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
-    @Path("/remote-process-groups/{id}/status/history")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets the remote process group status history across the cluster",
-            response = ClusterStatusHistoryEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "DFM", type = "ROLE_DFM"),
-                @Authorization(value = "Admin", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getRemoteProcessGroupStatusHistory(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The remote process group id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-            final ClusterStatusHistoryDTO dto = serviceFacade.getClusterRemoteProcessGroupStatusHistory(id);
-
-            // create the revision
-            RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create entity
-            final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
-            entity.setClusterStatusHistory(dto);
-            entity.setRevision(revision);
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
-
     // setters
     public void setServiceFacade(NiFiServiceFacade serviceFacade) {
         this.serviceFacade = serviceFacade;

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index 738da04..712233f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -25,6 +25,7 @@ import com.wordnik.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.context.ClusterContext;
 import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
+import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.node.Node;
@@ -33,7 +34,6 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
 import org.apache.nifi.web.DownloadableContent;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
@@ -44,8 +44,10 @@ import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
 import org.apache.nifi.web.api.dto.ListingRequestDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
 import org.apache.nifi.web.api.entity.ConnectionsEntity;
 import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.apache.nifi.web.api.entity.FlowFileEntity;
@@ -55,8 +57,6 @@ import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.ConnectableTypeParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;
 
 import javax.servlet.http.HttpServletRequest;
@@ -99,8 +99,6 @@ import java.util.UUID;
 @Api(hidden = true)
 public class ConnectionResource extends ApplicationResource {
 
-    private static final Logger logger = LoggerFactory.getLogger(ConnectionResource.class);
-
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
     private NiFiProperties properties;
@@ -284,6 +282,106 @@ public class ConnectionResource extends ApplicationResource {
     }
 
     /**
+     * Retrieves the specified connection status.
+     *
+     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+     * @param id The id of the connection history to retrieve.
+     * @return A connectionStatusEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("/{id}/status")
+    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+        value = "Gets status for a connection",
+        response = ConnectionStatusEntity.class,
+        authorizations = {
+            @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+            @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response getConnectionStatus(
+        @ApiParam(
+            value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+            required = false
+        )
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+        @ApiParam(
+            value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
+            value = "The connection id.",
+            required = true
+        )
+        @PathParam("id") String id) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+        }
+
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+                final ConnectionStatusEntity entity = (ConnectionStatusEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getConnectionStatus().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+            }
+        }
+
+        // get the specified connection status
+        final ConnectionStatusDTO connectionStatus = serviceFacade.getConnectionStatus(groupId, id);
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // generate the response entity
+        final ConnectionStatusEntity entity = new ConnectionStatusEntity();
+        entity.setRevision(revision);
+        entity.setConnectionStatus(connectionStatus);
+
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
+    }
+
+    /**
      * Retrieves the specified connection status history.
      *
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
@@ -327,7 +425,7 @@ public class ConnectionResource extends ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            throw new IllegalClusterResourceRequestException("This request is only supported in standalone mode.");
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
         // get the specified processor status history
@@ -565,7 +663,7 @@ public class ConnectionResource extends ApplicationResource {
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
 
             // replicate put request
-            return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(connectionEntity), getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(connectionEntity), getHeaders(headersToOverride)).getResponse();
         }
 
         // get the connection

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index ef62a62..a3d0dc1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -23,34 +23,17 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HEAD;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-import org.apache.nifi.web.security.user.NiFiUserUtils;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.user.NiFiUser;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
-import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.IllegalClusterResourceRequestException;
+import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.AboutDTO;
 import org.apache.nifi.web.api.dto.BannerDTO;
@@ -66,23 +49,46 @@ import org.apache.nifi.web.api.entity.AuthorityEntity;
 import org.apache.nifi.web.api.entity.BannerEntity;
 import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
 import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
 import org.apache.nifi.web.api.entity.ControllerStatusEntity;
 import org.apache.nifi.web.api.entity.CounterEntity;
 import org.apache.nifi.web.api.entity.CountersEntity;
 import org.apache.nifi.web.api.entity.Entity;
+import org.apache.nifi.web.api.entity.IdentityEntity;
 import org.apache.nifi.web.api.entity.PrioritizerTypesEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessorTypesEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskTypesEntity;
 import org.apache.nifi.web.api.entity.SearchResultsEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
-import org.apache.nifi.web.api.entity.IdentityEntity;
-import org.apache.nifi.web.api.entity.ReportingTaskTypesEntity;
+import org.apache.nifi.web.security.user.NiFiUserUtils;
 import org.springframework.security.access.prepost.PreAuthorize;
 
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HEAD;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * RESTful endpoint for managing a Flow Controller.
  */
@@ -524,6 +530,10 @@ public class ControllerResource extends ApplicationResource {
             )
             @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
 
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+        }
+
         final ControllerStatusDTO controllerStatus = serviceFacade.getControllerStatus();
 
         // create the revision
@@ -572,7 +582,50 @@ public class ControllerResource extends ApplicationResource {
                     value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
                     required = false
             )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+            @ApiParam(
+                value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+                required = false
+            )
+            @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+            @ApiParam(
+                value = "The id of the node where to get the status.",
+                required = false
+            )
+            @QueryParam("clusterNodeId") String clusterNodeId) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+        }
+
+        // replicate if cluster manager
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+                final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getCounters().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+            }
+        }
 
         final CountersDTO countersReport = serviceFacade.getCounters();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
index d2be69d..2f7eed6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
@@ -22,12 +22,29 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.ConfigurationSnapshot;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.dto.PositionDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+import org.apache.nifi.web.api.entity.InputPortEntity;
+import org.apache.nifi.web.api.entity.InputPortsEntity;
+import org.apache.nifi.web.api.entity.PortStatusEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.apache.nifi.web.api.request.DoubleParameter;
+import org.apache.nifi.web.api.request.IntegerParameter;
+import org.apache.nifi.web.api.request.LongParameter;
+import org.springframework.security.access.prepost.PreAuthorize;
+
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
@@ -46,24 +63,13 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
-import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.ConfigurationSnapshot;
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.apache.nifi.web.api.dto.PositionDTO;
-import org.apache.nifi.web.api.dto.RevisionDTO;
-import org.apache.nifi.web.api.entity.InputPortEntity;
-import org.apache.nifi.web.api.entity.InputPortsEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.nifi.web.api.request.DoubleParameter;
-import org.apache.nifi.web.api.request.IntegerParameter;
-import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.security.access.prepost.PreAuthorize;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 
 /**
  * RESTful endpoint for managing an Input Port.
@@ -71,8 +77,6 @@ import org.springframework.security.access.prepost.PreAuthorize;
 @Api(hidden = true)
 public class InputPortResource extends ApplicationResource {
 
-    private static final Logger logger = LoggerFactory.getLogger(InputPortResource.class);
-
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
     private NiFiProperties properties;
@@ -276,7 +280,7 @@ public class InputPortResource extends ApplicationResource {
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
 
             // replicate put request
-            return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(portEntity), getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(portEntity), getHeaders(headersToOverride)).getResponse();
 
         }
 
@@ -370,6 +374,106 @@ public class InputPortResource extends ApplicationResource {
     }
 
     /**
+     * Retrieves the specified input port status.
+     *
+     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+     * @param id The id of the processor history to retrieve.
+     * @return A portStatusEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("/{id}/status")
+    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+        value = "Gets status for an input port",
+        response = PortStatusEntity.class,
+        authorizations = {
+            @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+            @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response getInputPortStatus(
+        @ApiParam(
+            value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+            required = false
+        )
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+        @ApiParam(
+            value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
+            value = "The input port id.",
+            required = true
+        )
+        @PathParam("id") String id) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+        }
+
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+                final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getPortStatus().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+            }
+        }
+
+        // get the specified input port status
+        final PortStatusDTO portStatus = serviceFacade.getInputPortStatus(groupId, id);
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // generate the response entity
+        final PortStatusEntity entity = new PortStatusEntity();
+        entity.setRevision(revision);
+        entity.setPortStatus(portStatus);
+
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
+    }
+
+    /**
      * Updates the specified input port.
      *
      * @param httpServletRequest request

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
index c88cc68..d3eb77a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
@@ -22,6 +22,16 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.IllegalClusterResourceRequestException;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.api.dto.NodeDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.entity.NodeEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.springframework.security.access.prepost.PreAuthorize;
+
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -34,19 +44,6 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.api.dto.NodeDTO;
-import org.apache.nifi.web.api.entity.NodeEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
-import org.apache.nifi.web.api.dto.RevisionDTO;
-import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
-import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
-import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
-import org.springframework.security.access.prepost.PreAuthorize;
 
 /**
  * RESTful endpoint for managing a cluster connection.
@@ -121,129 +118,6 @@ public class NodeResource extends ApplicationResource {
         throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
     }
 
-    /**
-     * Gets the status for the specified node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the node.
-     * @return A processGroupStatusEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/{id}/status")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets process group status for a node in the cluster",
-            response = ProcessGroupStatusEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
-                @Authorization(value = "Administrator", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getNodeStatus(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The node id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-            // get the node statistics
-            final NodeStatusDTO nodeStatus = serviceFacade.getNodeStatus(id);
-
-            // create the revision
-            final RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create the node statics entity
-            final ProcessGroupStatusEntity entity = new ProcessGroupStatusEntity();
-            entity.setRevision(revision);
-            entity.setProcessGroupStatus(nodeStatus.getControllerStatus());
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
-
-    /**
-     * Gets the system diagnositics for the specified node.
-     *
-     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param id The id of the node.
-     * @return A systemDiagnosticsEntity
-     */
-    @GET
-    @Consumes(MediaType.WILDCARD)
-    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/{id}/system-diagnostics")
-    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
-    @ApiOperation(
-            value = "Gets system diagnostics for a node in the cluester",
-            response = SystemDiagnosticsEntity.class,
-            authorizations = {
-                @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
-                @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
-                @Authorization(value = "Administrator", type = "ROLE_ADMIN")
-            }
-    )
-    @ApiResponses(
-            value = {
-                @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
-                @ApiResponse(code = 401, message = "Client could not be authenticated."),
-                @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
-                @ApiResponse(code = 404, message = "The specified resource could not be found."),
-                @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
-            }
-    )
-    public Response getNodeSystemDiagnostics(
-            @ApiParam(
-                    value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
-                    required = false
-            )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @ApiParam(
-                    value = "The node id.",
-                    required = true
-            )
-            @PathParam("id") String id) {
-
-        if (properties.isClusterManager()) {
-            // get the node statistics
-            final NodeSystemDiagnosticsDTO nodeSystemDiagnostics = serviceFacade.getNodeSystemDiagnostics(id);
-
-            // create the revision
-            final RevisionDTO revision = new RevisionDTO();
-            revision.setClientId(clientId.getClientId());
-
-            // create the node statics entity
-            final SystemDiagnosticsEntity entity = new SystemDiagnosticsEntity();
-            entity.setRevision(revision);
-            entity.setSystemDiagnostics(nodeSystemDiagnostics.getSystemDiagnostics());
-
-            // generate the response
-            return generateOkResponse(entity).build();
-        }
-
-        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-    }
 
     /**
      * Updates the contents of the specified node in this NiFi cluster.

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
index ccd08db..e76fcf0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
@@ -25,6 +25,7 @@ import com.wordnik.swagger.annotations.Authorization;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -46,7 +47,12 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
 import org.apache.nifi.web.NiFiServiceFacade;
@@ -54,8 +60,10 @@ import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.PortDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
 import org.apache.nifi.web.api.entity.OutputPortEntity;
 import org.apache.nifi.web.api.entity.OutputPortsEntity;
+import org.apache.nifi.web.api.entity.PortStatusEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.DoubleParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
@@ -370,6 +378,106 @@ public class OutputPortResource extends ApplicationResource {
     }
 
     /**
+     * Retrieves the specified output port status.
+     *
+     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+     * @param id The id of the processor history to retrieve.
+     * @return A portStatusEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("/{id}/status")
+    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+        value = "Gets status for an output port",
+        response = PortStatusEntity.class,
+        authorizations = {
+            @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+            @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response getOutputPortStatus(
+        @ApiParam(
+            value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+            required = false
+        )
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+        @ApiParam(
+            value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
+            value = "The output port id.",
+            required = true
+        )
+        @PathParam("id") String id) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+        }
+
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+                final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getPortStatus().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+            }
+        }
+
+        // get the specified output port status
+        final PortStatusDTO portStatus = serviceFacade.getOutputPortStatus(groupId, id);
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // generate the response entity
+        final PortStatusEntity entity = new PortStatusEntity();
+        entity.setRevision(revision);
+        entity.setPortStatus(portStatus);
+
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
+    }
+
+    /**
      * Updates the specified output port.
      *
      * @param httpServletRequest request