You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:42 UTC

[06/18] nifi git commit: NIFI-1563: - Federate requests and merge responses from nodes instead of storing bulletins and stats at NCM - Updating UI to support restructured status history DTO. - Return 'Insufficient History' message if aggregate stats don'

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index a67e74b..96beff5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -23,41 +23,23 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
-import static org.apache.nifi.web.api.ApplicationResource.CLIENT_ID;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.entity.FlowSnippetEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
@@ -67,9 +49,33 @@ import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.DoubleParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
 import org.springframework.security.access.prepost.PreAuthorize;
 
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 /**
  * RESTful endpoint for managing a Group.
  */
@@ -979,7 +985,7 @@ public class ProcessGroupResource extends ApplicationResource {
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
 
             // replicate put request
-            return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(processGroupEntity), getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(processGroupEntity), getHeaders(headersToOverride)).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)
@@ -1251,7 +1257,7 @@ public class ProcessGroupResource extends ApplicationResource {
      * Retrieves the status report for this NiFi.
      *
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
-     * @param recursive Optional recursive flag that defaults to false. If set to true, all descendent groups and their content will be included if the verbose flag is also set to true.
+     * @param recursive Optional recursive flag that defaults to false. If set to true, all descendant groups and the status of their content will be included.
      * @return A processGroupStatusEntity.
      */
     @GET
@@ -1281,21 +1287,69 @@ public class ProcessGroupResource extends ApplicationResource {
             }
     )
     public Response getProcessGroupStatus(
+            @ApiParam(
+                value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+                required = false
+            )
             @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive) {
+            @ApiParam(
+                value = "Whether all descendant groups and the status of their content will be included. Optional, defaults to false",
+                required = false
+            )
+            @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive,
+            @ApiParam(
+                value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+                required = false
+            )
+            @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+            @ApiParam(
+                value = "The id of the node where to get the status.",
+                required = false
+            )
+            @QueryParam("clusterNodeId") String clusterNodeId) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+        }
+
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+                final ProcessGroupStatusEntity entity = (ProcessGroupStatusEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getProcessGroupStatus().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+            }
+        }
 
         // get the status
         final ProcessGroupStatusDTO statusReport = serviceFacade.getProcessGroupStatus(groupId);
 
         // prune the response as necessary
         if (!recursive) {
-            for (final ProcessGroupStatusDTO childProcessGroupStatus : statusReport.getProcessGroupStatus()) {
-                childProcessGroupStatus.setConnectionStatus(null);
-                childProcessGroupStatus.setProcessGroupStatus(null);
-                childProcessGroupStatus.setInputPortStatus(null);
-                childProcessGroupStatus.setOutputPortStatus(null);
-                childProcessGroupStatus.setProcessorStatus(null);
-                childProcessGroupStatus.setRemoteProcessGroupStatus(null);
+            pruneChildGroups(statusReport.getAggregateSnapshot());
+            if (statusReport.getNodeSnapshots() != null) {
+                for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : statusReport.getNodeSnapshots()) {
+                    pruneChildGroups(nodeSnapshot.getStatusSnapshot());
+                }
             }
         }
 
@@ -1312,6 +1366,17 @@ public class ProcessGroupResource extends ApplicationResource {
         return clusterContext(generateOkResponse(entity)).build();
     }
 
+    private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO snapshot) {
+        for (final ProcessGroupStatusSnapshotDTO childProcessGroupStatus : snapshot.getProcessGroupStatusSnapshots()) {
+            childProcessGroupStatus.setConnectionStatusSnapshots(null);
+            childProcessGroupStatus.setProcessGroupStatusSnapshots(null);
+            childProcessGroupStatus.setInputPortStatusSnapshots(null);
+            childProcessGroupStatus.setOutputPortStatusSnapshots(null);
+            childProcessGroupStatus.setProcessorStatusSnapshots(null);
+            childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null);
+        }
+    }
+
     /**
      * Retrieves the specified remote process groups status history.
      *
@@ -1345,7 +1410,7 @@ public class ProcessGroupResource extends ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            throw new IllegalClusterResourceRequestException("This request is only supported in standalone mode.");
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
         // get the specified processor status history

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
index 9fb90b5..adede7b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
@@ -22,80 +22,77 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.ui.extension.UiExtension;
+import org.apache.nifi.ui.extension.UiExtensionMapping;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
-import static org.apache.nifi.web.api.ApplicationResource.CLIENT_ID;
-
+import org.apache.nifi.web.UiExtensionType;
 import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
 import org.apache.nifi.web.api.entity.ProcessorsEntity;
+import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.DoubleParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.ui.extension.UiExtension;
-import org.apache.nifi.ui.extension.UiExtensionMapping;
-import org.apache.nifi.web.UiExtensionType;
-import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
-import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;
 
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 /**
  * RESTful endpoint for managing a Processor.
  */
 @Api(hidden = true)
 public class ProcessorResource extends ApplicationResource {
 
-    private static final Logger logger = LoggerFactory.getLogger(ProcessorResource.class);
-
     private static final List<Long> POSSIBLE_RUN_DURATIONS = Arrays.asList(0L, 25L, 50L, 100L, 250L, 500L, 1000L, 2000L);
 
     private NiFiServiceFacade serviceFacade;
@@ -328,7 +325,7 @@ public class ProcessorResource extends ApplicationResource {
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
 
             // replicate put request
-            return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(processorEntity), getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(processorEntity), getHeaders(headersToOverride)).getResponse();
 
         }
 
@@ -424,6 +421,106 @@ public class ProcessorResource extends ApplicationResource {
     }
 
     /**
+     * Retrieves the specified processor status.
+     *
+     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+     * @param id The id of the processor history to retrieve.
+     * @return A processorStatusEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("/{id}/status")
+    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+        value = "Gets status for a processor",
+        response = ProcessorStatusEntity.class,
+        authorizations = {
+            @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+            @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response getProcessorStatus(
+        @ApiParam(
+            value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+            required = false
+        )
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+        @ApiParam(
+            value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
+            value = "The processor id.",
+            required = true
+        )
+        @PathParam("id") String id) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+        }
+
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+                final ProcessorStatusEntity entity = (ProcessorStatusEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getProcessorStatus().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+            }
+        }
+
+        // get the specified processor status
+        final ProcessorStatusDTO processorStatus = serviceFacade.getProcessorStatus(groupId, id);
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // generate the response entity
+        final ProcessorStatusEntity entity = new ProcessorStatusEntity();
+        entity.setRevision(revision);
+        entity.setProcessorStatus(processorStatus);
+
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
+    }
+
+    /**
      * Retrieves the specified processor status history.
      *
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
@@ -467,7 +564,7 @@ public class ProcessorResource extends ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            throw new IllegalClusterResourceRequestException("This request is only supported in standalone mode.");
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
         // get the specified processor status history

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
index e466666..8fc6a2c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
@@ -22,64 +22,66 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
-import static org.apache.nifi.web.api.ApplicationResource.CLIENT_ID;
-import static org.apache.nifi.web.api.ApplicationResource.VERSION;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.entity.ConnectionsEntity;
+import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.DoubleParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.web.api.entity.ConnectionsEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;
 
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 /**
  * RESTful endpoint for managing a Remote group.
  */
 @Api(hidden = true)
 public class RemoteProcessGroupResource extends ApplicationResource {
 
-    private static final Logger logger = LoggerFactory.getLogger(RemoteProcessGroupResource.class);
-
     private static final String VERBOSE_DEFAULT_VALUE = "false";
 
     private NiFiServiceFacade serviceFacade;
@@ -257,6 +259,106 @@ public class RemoteProcessGroupResource extends ApplicationResource {
     }
 
     /**
+     * Retrieves the specified remote process group status.
+     *
+     * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+     * @param id The id of the processor history to retrieve.
+     * @return A remoteProcessGroupStatusEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("/{id}/status")
+    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+        value = "Gets status for a remote process group",
+        response = ProcessorStatusEntity.class,
+        authorizations = {
+            @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+            @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+        }
+    )
+    public Response getRemoteProcessGroupStatus(
+        @ApiParam(
+            value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+            required = false
+        )
+        @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+        @ApiParam(
+            value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+            required = false
+        )
+        @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+        @ApiParam(
+            value = "The id of the node where to get the status.",
+            required = false
+        )
+        @QueryParam("clusterNodeId") String clusterNodeId,
+        @ApiParam(
+            value = "The remote process group id.",
+            required = true
+        )
+        @PathParam("id") String id) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+        }
+
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+                final RemoteProcessGroupStatusEntity entity = (RemoteProcessGroupStatusEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getRemoteProcessGroupStatus().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+            }
+        }
+
+        // get the specified remote process group status
+        final RemoteProcessGroupStatusDTO remoteProcessGroupStatus = serviceFacade.getRemoteProcessGroupStatus(groupId, id);
+
+        // create the revision
+        final RevisionDTO revision = new RevisionDTO();
+        revision.setClientId(clientId.getClientId());
+
+        // generate the response entity
+        final RemoteProcessGroupStatusEntity entity = new RemoteProcessGroupStatusEntity();
+        entity.setRevision(revision);
+        entity.setRemoteProcessGroupStatus(remoteProcessGroupStatus);
+
+        // generate the response
+        return clusterContext(generateOkResponse(entity)).build();
+    }
+
+    /**
      * Retrieves the specified remote process groups status history.
      *
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
@@ -300,7 +402,7 @@ public class RemoteProcessGroupResource extends ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            throw new IllegalClusterResourceRequestException("This request is only supported in standalone mode.");
+            return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
         // get the specified processor status history
@@ -446,7 +548,7 @@ public class RemoteProcessGroupResource extends ApplicationResource {
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
 
             // replicate put request
-            return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(remoteProcessGroupEntity), getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(remoteProcessGroupEntity), getHeaders(headersToOverride)).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
index d7b77b2..802f46f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
@@ -16,12 +16,6 @@
  */
 package org.apache.nifi.web.api;
 
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
@@ -30,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -49,39 +44,42 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.ui.extension.UiExtension;
+import org.apache.nifi.ui.extension.UiExtensionMapping;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ConfigurationSnapshot;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.api.dto.ComponentStateDTO;
-import org.apache.nifi.web.api.dto.RevisionDTO;
-import org.apache.nifi.web.api.entity.ComponentStateEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.ui.extension.UiExtension;
-import org.apache.nifi.ui.extension.UiExtensionMapping;
 import org.apache.nifi.web.UiExtensionType;
-import static org.apache.nifi.web.api.ApplicationResource.CLIENT_ID;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
 import org.apache.nifi.web.api.entity.ReportingTasksEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.apache.nifi.web.api.request.LongParameter;
 import org.apache.nifi.web.util.Availability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;
 
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+
 /**
  * RESTful endpoint for managing a Reporting Task.
  */
 @Api(hidden = true)
 public class ReportingTaskResource extends ApplicationResource {
 
-    private static final Logger logger = LoggerFactory.getLogger(ReportingTaskResource.class);
-
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
     private NiFiProperties properties;
@@ -344,7 +342,7 @@ public class ReportingTaskResource extends ApplicationResource {
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
 
             // replicate put request
-            return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(reportingTaskEntity), getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(reportingTaskEntity), getHeaders(headersToOverride)).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
index 5c3c03a..1bde7bf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
@@ -22,25 +22,31 @@ import com.wordnik.swagger.annotations.ApiParam;
 import com.wordnik.swagger.annotations.ApiResponse;
 import com.wordnik.swagger.annotations.ApiResponses;
 import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
+import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.springframework.security.access.prepost.PreAuthorize;
+
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.api.dto.RevisionDTO;
-import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
-import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.security.access.prepost.PreAuthorize;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * RESTful endpoint for retrieving system diagnostics.
@@ -52,9 +58,10 @@ import org.springframework.security.access.prepost.PreAuthorize;
 )
 public class SystemDiagnosticsResource extends ApplicationResource {
 
-    private static final Logger logger = LoggerFactory.getLogger(SystemDiagnosticsResource.class);
-
     private NiFiServiceFacade serviceFacade;
+    private WebClusterManager clusterManager;
+    private NiFiProperties properties;
+
 
     /**
      * Gets the system diagnostics for this NiFi instance.
@@ -87,7 +94,49 @@ public class SystemDiagnosticsResource extends ApplicationResource {
                     value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
                     required = false
             )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+            @ApiParam(
+                value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+                required = false
+            )
+            @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+            @ApiParam(
+                value = "The id of the node where to get the status.",
+                required = false
+            )
+            @QueryParam("clusterNodeId") String clusterNodeId) {
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+        }
+
+        if (properties.isClusterManager()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+                final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getSystemDiagnostics().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                // get the target node and ensure it exists
+                final Node targetNode = clusterManager.getNode(clusterNodeId);
+                if (targetNode == null) {
+                    throw new UnknownNodeException("The specified cluster node does not exist.");
+                }
+
+                final Set<NodeIdentifier> targetNodes = new HashSet<>();
+                targetNodes.add(targetNode.getNodeId());
+
+                // replicate the request to the specific node
+                return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+            }
+        }
 
         final SystemDiagnosticsDTO systemDiagnosticsDto = serviceFacade.getSystemDiagnostics();
 
@@ -108,4 +157,12 @@ public class SystemDiagnosticsResource extends ApplicationResource {
     public void setServiceFacade(NiFiServiceFacade serviceFacade) {
         this.serviceFacade = serviceFacade;
     }
+
+    public void setClusterManager(WebClusterManager clusterManager) {
+        this.clusterManager = clusterManager;
+    }
+
+    public void setProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 6157285..5e7a902 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -39,7 +39,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.WebApplicationException;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.component.details.ComponentDetails;
 import org.apache.nifi.action.component.details.ExtensionDetails;
@@ -61,6 +60,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.authorization.Authority;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.event.Event;
+import org.apache.nifi.cluster.manager.StatusMerger;
 import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.components.AllowableValue;
@@ -145,11 +145,15 @@ import org.apache.nifi.web.api.dto.provenance.lineage.LineageResultsDTO;
 import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceLinkDTO;
 import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceNodeDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.StatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
 
 public final class DtoFactory {
 
@@ -161,8 +165,6 @@ public final class DtoFactory {
         }
     };
 
-    final static int MAX_BULLETINS_PER_COMPONENT = 5;
-
     private ControllerServiceLookup controllerServiceLookup;
 
     /**
@@ -323,7 +325,7 @@ public final class DtoFactory {
         final StateMapDTO dto = new StateMapDTO();
         dto.setScope(scope.toString());
 
-        final TreeMap<String, String> sortedState = new TreeMap(SortedStateUtils.getKeyComparator());
+        final TreeMap<String, String> sortedState = new TreeMap<>(SortedStateUtils.getKeyComparator());
         final Map<String, String> state = stateMap.toMap();
         sortedState.putAll(state);
 
@@ -349,8 +351,8 @@ public final class DtoFactory {
      * @param counterDtos dtos
      * @return dto
      */
-    public CountersDTO createCountersDto(final Collection<CounterDTO> counterDtos) {
-        final CountersDTO dto = new CountersDTO();
+    public CountersSnapshotDTO createCountersDto(final Collection<CounterDTO> counterDtos) {
+        final CountersSnapshotDTO dto = new CountersSnapshotDTO();
         dto.setCounters(counterDtos);
         dto.setGenerated(new Date());
         return dto;
@@ -709,13 +711,6 @@ public final class DtoFactory {
         return copy;
     }
 
-    private String formatCount(final Integer intStatus) {
-        return intStatus == null ? "-" : FormatUtils.formatCount(intStatus);
-    }
-
-    private String formatDataSize(final Long longStatus) {
-        return longStatus == null ? "-" : FormatUtils.formatDataSize(longStatus);
-    }
 
     public RemoteProcessGroupStatusDTO createRemoteProcessGroupStatusDto(final RemoteProcessGroupStatus remoteProcessGroupStatus) {
         final RemoteProcessGroupStatusDTO dto = new RemoteProcessGroupStatusDTO();
@@ -724,172 +719,120 @@ public final class DtoFactory {
         dto.setTargetUri(remoteProcessGroupStatus.getTargetUri());
         dto.setName(remoteProcessGroupStatus.getName());
         dto.setTransmissionStatus(remoteProcessGroupStatus.getTransmissionStatus().toString());
-        dto.setActiveThreadCount(remoteProcessGroupStatus.getActiveThreadCount());
-        dto.setSent(formatCount(remoteProcessGroupStatus.getSentCount()) + " / " + formatDataSize(remoteProcessGroupStatus.getSentContentSize()));
-        dto.setReceived(formatCount(remoteProcessGroupStatus.getReceivedCount()) + " / " + formatDataSize(remoteProcessGroupStatus.getReceivedContentSize()));
-        dto.setAuthorizationIssues(remoteProcessGroupStatus.getAuthorizationIssues());
+        dto.setStatsLastRefreshed(new Date());
+
+        final RemoteProcessGroupStatusSnapshotDTO snapshot = new RemoteProcessGroupStatusSnapshotDTO();
+        dto.setAggregateSnapshot(snapshot);
 
+        snapshot.setId(remoteProcessGroupStatus.getId());
+        snapshot.setGroupId(remoteProcessGroupStatus.getGroupId());
+        snapshot.setName(remoteProcessGroupStatus.getName());
+        snapshot.setTargetUri(remoteProcessGroupStatus.getTargetUri());
+        snapshot.setTransmissionStatus(remoteProcessGroupStatus.getTransmissionStatus().toString());
+
+        snapshot.setActiveThreadCount(remoteProcessGroupStatus.getActiveThreadCount());
+        snapshot.setFlowFilesSent(remoteProcessGroupStatus.getSentCount());
+        snapshot.setBytesSent(remoteProcessGroupStatus.getSentContentSize());
+        snapshot.setFlowFilesReceived(remoteProcessGroupStatus.getReceivedCount());
+        snapshot.setBytesReceived(remoteProcessGroupStatus.getReceivedContentSize());
+        snapshot.setAuthorizationIssues(remoteProcessGroupStatus.getAuthorizationIssues());
+
+        StatusMerger.updatePrettyPrintedFields(snapshot);
         return dto;
     }
 
     public ProcessGroupStatusDTO createProcessGroupStatusDto(final BulletinRepository bulletinRepository, final ProcessGroupStatus processGroupStatus) {
-
         final ProcessGroupStatusDTO processGroupStatusDto = new ProcessGroupStatusDTO();
         processGroupStatusDto.setId(processGroupStatus.getId());
         processGroupStatusDto.setName(processGroupStatus.getName());
-        processGroupStatusDto.setStatsLastRefreshed(new Date(processGroupStatus.getCreationTimestamp()));
-        processGroupStatusDto.setRead(formatDataSize(processGroupStatus.getBytesRead()));
-        processGroupStatusDto.setWritten(formatDataSize(processGroupStatus.getBytesWritten()));
-        processGroupStatusDto.setInput(formatCount(processGroupStatus.getInputCount()) + " / " + formatDataSize(processGroupStatus.getInputContentSize()));
-        processGroupStatusDto.setOutput(formatCount(processGroupStatus.getOutputCount()) + " / " + formatDataSize(processGroupStatus.getOutputContentSize()));
-        processGroupStatusDto.setTransferred(formatCount(processGroupStatus.getFlowFilesTransferred()) + " / " + formatDataSize(processGroupStatus.getBytesTransferred()));
-        processGroupStatusDto.setSent(formatCount(processGroupStatus.getFlowFilesSent()) + " / " + formatDataSize(processGroupStatus.getBytesSent()));
-        processGroupStatusDto.setReceived(formatCount(processGroupStatus.getFlowFilesReceived()) + " / " + formatDataSize(processGroupStatus.getBytesReceived()));
-        processGroupStatusDto.setActiveThreadCount(processGroupStatus.getActiveThreadCount());
-
-        final String queuedCount = FormatUtils.formatCount(processGroupStatus.getQueuedCount());
-        final String queuedSize = FormatUtils.formatDataSize(processGroupStatus.getQueuedContentSize());
-        processGroupStatusDto.setQueuedCount(queuedCount);
-        processGroupStatusDto.setQueuedSize(queuedSize);
-        processGroupStatusDto.setQueued(queuedCount + " / " + queuedSize);
-
-        final Map<String, StatusDTO> componentStatusDtoMap = new HashMap<>();
+        processGroupStatusDto.setStatsLastRefreshed(new Date());
+
+        final ProcessGroupStatusSnapshotDTO snapshot = new ProcessGroupStatusSnapshotDTO();
+        processGroupStatusDto.setAggregateSnapshot(snapshot);
+
+        snapshot.setId(processGroupStatus.getId());
+        snapshot.setName(processGroupStatus.getName());
+
+        snapshot.setFlowFilesQueued(processGroupStatus.getQueuedCount());
+        snapshot.setBytesQueued(processGroupStatus.getQueuedContentSize());
+        snapshot.setBytesRead(processGroupStatus.getBytesRead());
+        snapshot.setBytesWritten(processGroupStatus.getBytesWritten());
+        snapshot.setFlowFilesIn(processGroupStatus.getInputCount());
+        snapshot.setBytesIn(processGroupStatus.getInputContentSize());
+        snapshot.setFlowFilesOut(processGroupStatus.getOutputCount());
+        snapshot.setBytesOut(processGroupStatus.getOutputContentSize());
+        snapshot.setFlowFilesTransferred(processGroupStatus.getFlowFilesTransferred());
+        snapshot.setBytesTransferred(processGroupStatus.getBytesTransferred());
+        snapshot.setFlowFilesSent(processGroupStatus.getFlowFilesSent());
+        snapshot.setBytesSent(processGroupStatus.getBytesSent());
+        snapshot.setFlowFilesReceived(processGroupStatus.getFlowFilesReceived());
+        snapshot.setBytesReceived(processGroupStatus.getBytesReceived());
+        snapshot.setActiveThreadCount(processGroupStatus.getActiveThreadCount());
+        StatusMerger.updatePrettyPrintedFields(snapshot);
 
         // processor status
-        final Collection<ProcessorStatusDTO> processorStatDtoCollection = new ArrayList<>();
-        processGroupStatusDto.setProcessorStatus(processorStatDtoCollection);
+        final Collection<ProcessorStatusSnapshotDTO> processorStatDtoCollection = new ArrayList<>();
+        snapshot.setProcessorStatusSnapshots(processorStatDtoCollection);
         final Collection<ProcessorStatus> processorStatusCollection = processGroupStatus.getProcessorStatus();
         if (processorStatusCollection != null) {
             for (final ProcessorStatus processorStatus : processorStatusCollection) {
                 final ProcessorStatusDTO processorStatusDto = createProcessorStatusDto(processorStatus);
-                processorStatDtoCollection.add(processorStatusDto);
-                componentStatusDtoMap.put(processorStatusDto.getId(), processorStatusDto);
+                processorStatDtoCollection.add(processorStatusDto.getAggregateSnapshot());
             }
         }
 
         // connection status
-        final Collection<ConnectionStatusDTO> connectionStatusDtoCollection = new ArrayList<>();
-        processGroupStatusDto.setConnectionStatus(connectionStatusDtoCollection);
+        final Collection<ConnectionStatusSnapshotDTO> connectionStatusDtoCollection = new ArrayList<>();
+        snapshot.setConnectionStatusSnapshots(connectionStatusDtoCollection);
         final Collection<ConnectionStatus> connectionStatusCollection = processGroupStatus.getConnectionStatus();
         if (connectionStatusCollection != null) {
             for (final ConnectionStatus connectionStatus : connectionStatusCollection) {
                 final ConnectionStatusDTO connectionStatusDto = createConnectionStatusDto(connectionStatus);
-                connectionStatusDtoCollection.add(connectionStatusDto);
+                connectionStatusDtoCollection.add(connectionStatusDto.getAggregateSnapshot());
             }
         }
 
         // local child process groups
-        final Collection<ProcessGroupStatusDTO> childProcessGroupStatusDtoCollection = new ArrayList<>();
-        processGroupStatusDto.setProcessGroupStatus(childProcessGroupStatusDtoCollection);
+        final Collection<ProcessGroupStatusSnapshotDTO> childProcessGroupStatusDtoCollection = new ArrayList<>();
+        snapshot.setProcessGroupStatusSnapshots(childProcessGroupStatusDtoCollection);
         final Collection<ProcessGroupStatus> childProcessGroupStatusCollection = processGroupStatus.getProcessGroupStatus();
         if (childProcessGroupStatusCollection != null) {
             for (final ProcessGroupStatus childProcessGroupStatus : childProcessGroupStatusCollection) {
                 final ProcessGroupStatusDTO childProcessGroupStatusDto = createProcessGroupStatusDto(bulletinRepository, childProcessGroupStatus);
-                childProcessGroupStatusDtoCollection.add(childProcessGroupStatusDto);
+                childProcessGroupStatusDtoCollection.add(childProcessGroupStatusDto.getAggregateSnapshot());
             }
         }
 
         // remote child process groups
-        final Collection<RemoteProcessGroupStatusDTO> childRemoteProcessGroupStatusDtoCollection = new ArrayList<>();
-        processGroupStatusDto.setRemoteProcessGroupStatus(childRemoteProcessGroupStatusDtoCollection);
+        final Collection<RemoteProcessGroupStatusSnapshotDTO> childRemoteProcessGroupStatusDtoCollection = new ArrayList<>();
+        snapshot.setRemoteProcessGroupStatusSnapshots(childRemoteProcessGroupStatusDtoCollection);
         final Collection<RemoteProcessGroupStatus> childRemoteProcessGroupStatusCollection = processGroupStatus.getRemoteProcessGroupStatus();
         if (childRemoteProcessGroupStatusCollection != null) {
             for (final RemoteProcessGroupStatus childRemoteProcessGroupStatus : childRemoteProcessGroupStatusCollection) {
                 final RemoteProcessGroupStatusDTO childRemoteProcessGroupStatusDto = createRemoteProcessGroupStatusDto(childRemoteProcessGroupStatus);
-                childRemoteProcessGroupStatusDtoCollection.add(childRemoteProcessGroupStatusDto);
-                componentStatusDtoMap.put(childRemoteProcessGroupStatusDto.getId(), childRemoteProcessGroupStatusDto);
+                childRemoteProcessGroupStatusDtoCollection.add(childRemoteProcessGroupStatusDto.getAggregateSnapshot());
             }
         }
 
         // input ports
-        final Collection<PortStatusDTO> inputPortStatusDtoCollection = new ArrayList<>();
-        processGroupStatusDto.setInputPortStatus(inputPortStatusDtoCollection);
+        final Collection<PortStatusSnapshotDTO> inputPortStatusDtoCollection = new ArrayList<>();
+        snapshot.setInputPortStatusSnapshots(inputPortStatusDtoCollection);
         final Collection<PortStatus> inputPortStatusCollection = processGroupStatus.getInputPortStatus();
         if (inputPortStatusCollection != null) {
             for (final PortStatus portStatus : inputPortStatusCollection) {
                 final PortStatusDTO portStatusDto = createPortStatusDto(portStatus);
-                inputPortStatusDtoCollection.add(portStatusDto);
-                componentStatusDtoMap.put(portStatusDto.getId(), portStatusDto);
+                inputPortStatusDtoCollection.add(portStatusDto.getAggregateSnapshot());
             }
         }
 
         // output ports
-        final Collection<PortStatusDTO> outputPortStatusDtoCollection = new ArrayList<>();
-        processGroupStatusDto.setOutputPortStatus(outputPortStatusDtoCollection);
+        final Collection<PortStatusSnapshotDTO> outputPortStatusDtoCollection = new ArrayList<>();
+        snapshot.setOutputPortStatusSnapshots(outputPortStatusDtoCollection);
         final Collection<PortStatus> outputPortStatusCollection = processGroupStatus.getOutputPortStatus();
         if (outputPortStatusCollection != null) {
             for (final PortStatus portStatus : outputPortStatusCollection) {
                 final PortStatusDTO portStatusDto = createPortStatusDto(portStatus);
-                outputPortStatusDtoCollection.add(portStatusDto);
-                componentStatusDtoMap.put(portStatusDto.getId(), portStatusDto);
-            }
-        }
-
-        // get the bulletins for this group and associate with the specific child component
-        if (bulletinRepository != null) {
-            if (processGroupStatusDto.getBulletins() == null) {
-                processGroupStatusDto.setBulletins(new ArrayList<BulletinDTO>());
-            }
-
-            // locate bulletins for this process group
-            final List<Bulletin> results = bulletinRepository.findBulletinsForGroupBySource(processGroupStatus.getId(), MAX_BULLETINS_PER_COMPONENT);
-            for (final Bulletin bulletin : results) {
-                final StatusDTO status = componentStatusDtoMap.get(bulletin.getSourceId());
-
-                // ensure this connectable is still in the flow
-                if (status != null) {
-                    if (status.getBulletins() == null) {
-                        status.setBulletins(new ArrayList<BulletinDTO>());
-                    }
-
-                    // convert the result into a dto
-                    final BulletinDTO bulletinDto = createBulletinDto(bulletin);
-                    status.getBulletins().add(bulletinDto);
-
-                    // create a copy for the parent group
-                    final BulletinDTO copy = copy(bulletinDto);
-                    copy.setGroupId(StringUtils.EMPTY);
-                    copy.setSourceId(processGroupStatus.getId());
-                    copy.setSourceName(processGroupStatus.getName());
-                    processGroupStatusDto.getBulletins().add(copy);
-                }
-            }
-
-            // copy over descendant bulletins
-            for (final ProcessGroupStatusDTO childProcessGroupStatusDto : processGroupStatusDto.getProcessGroupStatus()) {
-                if (childProcessGroupStatusDto.getBulletins() != null) {
-                    for (final BulletinDTO descendantBulletinDto : childProcessGroupStatusDto.getBulletins()) {
-                        // create a copy for the parent group
-                        final BulletinDTO copy = copy(descendantBulletinDto);
-                        copy.setGroupId(StringUtils.EMPTY);
-                        copy.setSourceId(processGroupStatus.getId());
-                        copy.setSourceName(processGroupStatus.getName());
-                        processGroupStatusDto.getBulletins().add(copy);
-                    }
-                }
-            }
-
-            // sort the bulletins
-            Collections.sort(processGroupStatusDto.getBulletins(), new Comparator<BulletinDTO>() {
-                @Override
-                public int compare(BulletinDTO o1, BulletinDTO o2) {
-                    if (o1 == null && o2 == null) {
-                        return 0;
-                    }
-                    if (o1 == null) {
-                        return 1;
-                    }
-                    if (o2 == null) {
-                        return -1;
-                    }
-
-                    return -Long.compare(o1.getId(), o2.getId());
-                }
-            });
-
-            // prune the response to only include the max number of bulletins
-            if (processGroupStatusDto.getBulletins().size() > MAX_BULLETINS_PER_COMPONENT) {
-                processGroupStatusDto.setBulletins(processGroupStatusDto.getBulletins().subList(0, MAX_BULLETINS_PER_COMPONENT));
+                outputPortStatusDtoCollection.add(portStatusDto.getAggregateSnapshot());
             }
         }
 
@@ -897,7 +840,6 @@ public final class DtoFactory {
     }
 
     public ConnectionStatusDTO createConnectionStatusDto(final ConnectionStatus connectionStatus) {
-
         final ConnectionStatusDTO connectionStatusDto = new ConnectionStatusDTO();
         connectionStatusDto.setGroupId(connectionStatus.getGroupId());
         connectionStatusDto.setId(connectionStatus.getId());
@@ -906,54 +848,64 @@ public final class DtoFactory {
         connectionStatusDto.setSourceName(connectionStatus.getSourceName());
         connectionStatusDto.setDestinationId(connectionStatus.getDestinationId());
         connectionStatusDto.setDestinationName(connectionStatus.getDestinationName());
+        connectionStatusDto.setStatsLastRefreshed(new Date());
+
+        final ConnectionStatusSnapshotDTO snapshot = new ConnectionStatusSnapshotDTO();
+        connectionStatusDto.setAggregateSnapshot(snapshot);
+
+        snapshot.setId(connectionStatus.getId());
+        snapshot.setGroupId(connectionStatus.getGroupId());
+        snapshot.setName(connectionStatus.getName());
+        snapshot.setSourceName(connectionStatus.getSourceName());
+        snapshot.setDestinationName(connectionStatus.getDestinationName());
 
-        final String queuedCount = FormatUtils.formatCount(connectionStatus.getQueuedCount());
-        final String queuedSize = FormatUtils.formatDataSize(connectionStatus.getQueuedBytes());
-        connectionStatusDto.setQueuedCount(queuedCount);
-        connectionStatusDto.setQueuedSize(queuedSize);
-        connectionStatusDto.setQueued(queuedCount + " / " + queuedSize);
+        snapshot.setFlowFilesQueued(connectionStatus.getQueuedCount());
+        snapshot.setBytesQueued(connectionStatus.getQueuedBytes());
 
-        final int inputCount = connectionStatus.getInputCount();
-        final long inputBytes = connectionStatus.getInputBytes();
-        connectionStatusDto.setInput(FormatUtils.formatCount(inputCount) + " / " + FormatUtils.formatDataSize(inputBytes));
+        snapshot.setFlowFilesIn(connectionStatus.getInputCount());
+        snapshot.setBytesIn(connectionStatus.getInputBytes());
 
-        final int outputCount = connectionStatus.getOutputCount();
-        final long outputBytes = connectionStatus.getOutputBytes();
-        connectionStatusDto.setOutput(FormatUtils.formatCount(outputCount) + " / " + FormatUtils.formatDataSize(outputBytes));
+        snapshot.setFlowFilesOut(connectionStatus.getOutputCount());
+        snapshot.setBytesOut(connectionStatus.getOutputBytes());
+        StatusMerger.updatePrettyPrintedFields(snapshot);
 
         return connectionStatusDto;
     }
 
     public ProcessorStatusDTO createProcessorStatusDto(final ProcessorStatus procStatus) {
-
         final ProcessorStatusDTO dto = new ProcessorStatusDTO();
         dto.setId(procStatus.getId());
         dto.setGroupId(procStatus.getGroupId());
         dto.setName(procStatus.getName());
+        dto.setStatsLastRefreshed(new Date());
+
+        final ProcessorStatusSnapshotDTO snapshot = new ProcessorStatusSnapshotDTO();
+        dto.setAggregateSnapshot(snapshot);
 
-        final int processedCount = procStatus.getOutputCount();
-        final long numProcessedBytes = procStatus.getOutputBytes();
-        dto.setOutput(FormatUtils.formatCount(processedCount) + " / " + FormatUtils.formatDataSize(numProcessedBytes));
+        snapshot.setId(procStatus.getId());
+        snapshot.setGroupId(procStatus.getGroupId());
+        snapshot.setName(procStatus.getName());
 
-        final int inputCount = procStatus.getInputCount();
-        final long inputBytes = procStatus.getInputBytes();
-        dto.setInput(FormatUtils.formatCount(inputCount) + " / " + FormatUtils.formatDataSize(inputBytes));
+        snapshot.setFlowFilesOut(procStatus.getOutputCount());
+        snapshot.setBytesOut(procStatus.getOutputBytes());
 
-        final long readBytes = procStatus.getBytesRead();
-        dto.setRead(FormatUtils.formatDataSize(readBytes));
+        snapshot.setFlowFilesIn(procStatus.getInputCount());
+        snapshot.setBytesIn(procStatus.getInputBytes());
 
-        final long writtenBytes = procStatus.getBytesWritten();
-        dto.setWritten(FormatUtils.formatDataSize(writtenBytes));
+        snapshot.setBytesRead(procStatus.getBytesRead());
+        snapshot.setBytesWritten(procStatus.getBytesWritten());
 
-        dto.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(procStatus.getProcessingNanos(), TimeUnit.NANOSECONDS));
-        dto.setTasks(FormatUtils.formatCount(procStatus.getInvocations()));
+        snapshot.setTaskCount(procStatus.getInvocations());
+        snapshot.setTasksDurationNanos(procStatus.getProcessingNanos());
+        snapshot.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(procStatus.getProcessingNanos(), TimeUnit.NANOSECONDS));
 
         // determine the run status
-        dto.setRunStatus(procStatus.getRunStatus().toString());
+        snapshot.setRunStatus(procStatus.getRunStatus().toString());
 
-        dto.setActiveThreadCount(procStatus.getActiveThreadCount());
-        dto.setType(procStatus.getType());
+        snapshot.setActiveThreadCount(procStatus.getActiveThreadCount());
+        snapshot.setType(procStatus.getType());
 
+        StatusMerger.updatePrettyPrintedFields(snapshot);
         return dto;
     }
 
@@ -968,17 +920,25 @@ public final class DtoFactory {
         dto.setId(portStatus.getId());
         dto.setGroupId(portStatus.getGroupId());
         dto.setName(portStatus.getName());
-        dto.setActiveThreadCount(portStatus.getActiveThreadCount());
         dto.setRunStatus(portStatus.getRunStatus().toString());
         dto.setTransmitting(portStatus.isTransmitting());
+        dto.setStatsLastRefreshed(new Date());
 
-        final int processedCount = portStatus.getOutputCount();
-        final long numProcessedBytes = portStatus.getOutputBytes();
-        dto.setOutput(FormatUtils.formatCount(processedCount) + " / " + FormatUtils.formatDataSize(numProcessedBytes));
+        final PortStatusSnapshotDTO snapshot = new PortStatusSnapshotDTO();
+        dto.setAggregateSnapshot(snapshot);
 
-        final int inputCount = portStatus.getInputCount();
-        final long inputBytes = portStatus.getInputBytes();
-        dto.setInput(FormatUtils.formatCount(inputCount) + " / " + FormatUtils.formatDataSize(inputBytes));
+        snapshot.setId(portStatus.getId());
+        snapshot.setGroupId(portStatus.getGroupId());
+        snapshot.setName(portStatus.getName());
+        snapshot.setRunStatus(portStatus.getRunStatus().toString());
+
+        snapshot.setActiveThreadCount(portStatus.getActiveThreadCount());
+        snapshot.setFlowFilesOut(portStatus.getOutputCount());
+        snapshot.setBytesOut(portStatus.getOutputBytes());
+
+        snapshot.setFlowFilesIn(portStatus.getInputCount());
+        snapshot.setBytesIn(portStatus.getInputBytes());
+        StatusMerger.updatePrettyPrintedFields(snapshot);
 
         return dto;
     }
@@ -1766,6 +1726,7 @@ public final class DtoFactory {
      * @param node node
      * @return dto
      */
+    @SuppressWarnings("deprecation")
     public ProvenanceNodeDTO createProvenanceEventNodeDTO(final ProvenanceEventLineageNode node) {
         final ProvenanceNodeDTO dto = new ProvenanceNodeDTO();
         dto.setId(node.getIdentifier());
@@ -1786,6 +1747,7 @@ public final class DtoFactory {
      * @param node node
      * @return dto
      */
+    @SuppressWarnings("deprecation")
     public ProvenanceNodeDTO createFlowFileNodeDTO(final LineageNode node) {
         final ProvenanceNodeDTO dto = new ProvenanceNodeDTO();
         dto.setId(node.getIdentifier());
@@ -1906,48 +1868,59 @@ public final class DtoFactory {
     public SystemDiagnosticsDTO createSystemDiagnosticsDto(final SystemDiagnostics sysDiagnostics) {
 
         final SystemDiagnosticsDTO dto = new SystemDiagnosticsDTO();
-        dto.setStatsLastRefreshed(new Date(sysDiagnostics.getCreationTimestamp()));
+        final SystemDiagnosticsSnapshotDTO snapshot = new SystemDiagnosticsSnapshotDTO();
+        dto.setAggregateSnapshot(snapshot);
+
+        snapshot.setStatsLastRefreshed(new Date(sysDiagnostics.getCreationTimestamp()));
 
         // processors
-        dto.setAvailableProcessors(sysDiagnostics.getAvailableProcessors());
-        dto.setProcessorLoadAverage(sysDiagnostics.getProcessorLoadAverage());
+        snapshot.setAvailableProcessors(sysDiagnostics.getAvailableProcessors());
+        snapshot.setProcessorLoadAverage(sysDiagnostics.getProcessorLoadAverage());
 
         // threads
-        dto.setDaemonThreads(sysDiagnostics.getDaemonThreads());
-        dto.setTotalThreads(sysDiagnostics.getTotalThreads());
+        snapshot.setDaemonThreads(sysDiagnostics.getDaemonThreads());
+        snapshot.setTotalThreads(sysDiagnostics.getTotalThreads());
 
         // heap
-        dto.setMaxHeap(FormatUtils.formatDataSize(sysDiagnostics.getMaxHeap()));
-        dto.setTotalHeap(FormatUtils.formatDataSize(sysDiagnostics.getTotalHeap()));
-        dto.setUsedHeap(FormatUtils.formatDataSize(sysDiagnostics.getUsedHeap()));
-        dto.setFreeHeap(FormatUtils.formatDataSize(sysDiagnostics.getFreeHeap()));
+        snapshot.setMaxHeap(FormatUtils.formatDataSize(sysDiagnostics.getMaxHeap()));
+        snapshot.setMaxHeapBytes(sysDiagnostics.getMaxHeap());
+        snapshot.setTotalHeap(FormatUtils.formatDataSize(sysDiagnostics.getTotalHeap()));
+        snapshot.setTotalHeapBytes(sysDiagnostics.getTotalHeap());
+        snapshot.setUsedHeap(FormatUtils.formatDataSize(sysDiagnostics.getUsedHeap()));
+        snapshot.setUsedHeapBytes(sysDiagnostics.getUsedHeap());
+        snapshot.setFreeHeap(FormatUtils.formatDataSize(sysDiagnostics.getFreeHeap()));
+        snapshot.setFreeHeapBytes(sysDiagnostics.getFreeHeap());
         if (sysDiagnostics.getHeapUtilization() != -1) {
-            dto.setHeapUtilization(FormatUtils.formatUtilization(sysDiagnostics.getHeapUtilization()));
+            snapshot.setHeapUtilization(FormatUtils.formatUtilization(sysDiagnostics.getHeapUtilization()));
         }
 
         // non heap
-        dto.setMaxNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getMaxNonHeap()));
-        dto.setTotalNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getTotalNonHeap()));
-        dto.setUsedNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getUsedNonHeap()));
-        dto.setFreeNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getFreeNonHeap()));
+        snapshot.setMaxNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getMaxNonHeap()));
+        snapshot.setMaxNonHeapBytes(sysDiagnostics.getMaxNonHeap());
+        snapshot.setTotalNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getTotalNonHeap()));
+        snapshot.setTotalNonHeapBytes(sysDiagnostics.getTotalNonHeap());
+        snapshot.setUsedNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getUsedNonHeap()));
+        snapshot.setUsedNonHeapBytes(sysDiagnostics.getUsedNonHeap());
+        snapshot.setFreeNonHeap(FormatUtils.formatDataSize(sysDiagnostics.getFreeNonHeap()));
+        snapshot.setFreeNonHeapBytes(sysDiagnostics.getFreeNonHeap());
         if (sysDiagnostics.getNonHeapUtilization() != -1) {
-            dto.setNonHeapUtilization(FormatUtils.formatUtilization(sysDiagnostics.getNonHeapUtilization()));
+            snapshot.setNonHeapUtilization(FormatUtils.formatUtilization(sysDiagnostics.getNonHeapUtilization()));
         }
 
         // flow file disk usage
-        final SystemDiagnosticsDTO.StorageUsageDTO flowFileRepositoryStorageUsageDto = createStorageUsageDTO(null, sysDiagnostics.getFlowFileRepositoryStorageUsage());
-        dto.setFlowFileRepositoryStorageUsage(flowFileRepositoryStorageUsageDto);
+        final SystemDiagnosticsSnapshotDTO.StorageUsageDTO flowFileRepositoryStorageUsageDto = createStorageUsageDTO(null, sysDiagnostics.getFlowFileRepositoryStorageUsage());
+        snapshot.setFlowFileRepositoryStorageUsage(flowFileRepositoryStorageUsageDto);
 
         // content disk usage
-        final Set<SystemDiagnosticsDTO.StorageUsageDTO> contentRepositoryStorageUsageDtos = new LinkedHashSet<>();
-        dto.setContentRepositoryStorageUsage(contentRepositoryStorageUsageDtos);
+        final Set<SystemDiagnosticsSnapshotDTO.StorageUsageDTO> contentRepositoryStorageUsageDtos = new LinkedHashSet<>();
+        snapshot.setContentRepositoryStorageUsage(contentRepositoryStorageUsageDtos);
         for (final Map.Entry<String, StorageUsage> entry : sysDiagnostics.getContentRepositoryStorageUsage().entrySet()) {
             contentRepositoryStorageUsageDtos.add(createStorageUsageDTO(entry.getKey(), entry.getValue()));
         }
 
         // garbage collection
-        final Set<SystemDiagnosticsDTO.GarbageCollectionDTO> garbageCollectionDtos = new LinkedHashSet<>();
-        dto.setGarbageCollection(garbageCollectionDtos);
+        final Set<SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO> garbageCollectionDtos = new LinkedHashSet<>();
+        snapshot.setGarbageCollection(garbageCollectionDtos);
         for (final Map.Entry<String, GarbageCollection> entry : sysDiagnostics.getGarbageCollection().entrySet()) {
             garbageCollectionDtos.add(createGarbageCollectionDTO(entry.getKey(), entry.getValue()));
         }
@@ -1962,8 +1935,8 @@ public final class DtoFactory {
      * @param storageUsage usage
      * @return dto
      */
-    public SystemDiagnosticsDTO.StorageUsageDTO createStorageUsageDTO(final String identifier, final StorageUsage storageUsage) {
-        final SystemDiagnosticsDTO.StorageUsageDTO dto = new SystemDiagnosticsDTO.StorageUsageDTO();
+    public SystemDiagnosticsSnapshotDTO.StorageUsageDTO createStorageUsageDTO(final String identifier, final StorageUsage storageUsage) {
+        final SystemDiagnosticsSnapshotDTO.StorageUsageDTO dto = new SystemDiagnosticsSnapshotDTO.StorageUsageDTO();
         dto.setIdentifier(identifier);
         dto.setFreeSpace(FormatUtils.formatDataSize(storageUsage.getFreeSpace()));
         dto.setTotalSpace(FormatUtils.formatDataSize(storageUsage.getTotalSpace()));
@@ -1982,11 +1955,12 @@ public final class DtoFactory {
      * @param garbageCollection gc
      * @return dto
      */
-    public SystemDiagnosticsDTO.GarbageCollectionDTO createGarbageCollectionDTO(final String name, final GarbageCollection garbageCollection) {
-        final SystemDiagnosticsDTO.GarbageCollectionDTO dto = new SystemDiagnosticsDTO.GarbageCollectionDTO();
+    public SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO createGarbageCollectionDTO(final String name, final GarbageCollection garbageCollection) {
+        final SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO dto = new SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO();
         dto.setName(name);
         dto.setCollectionCount(garbageCollection.getCollectionCount());
         dto.setCollectionTime(FormatUtils.formatHoursMinutesSeconds(garbageCollection.getCollectionTime(), TimeUnit.MILLISECONDS));
+        dto.setCollectionMillis(garbageCollection.getCollectionTime());
         return dto;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index a349f2a..68d0dbe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -37,7 +37,11 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.ContentNotFoundException;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.diagnostics.SystemDiagnostics;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -89,8 +93,12 @@ import org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO;
 import org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO.LineageRequestType;
 import org.apache.nifi.web.api.dto.search.ComponentSearchResultDTO;
 import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.security.ProxiedEntitiesUtils;
 import org.apache.nifi.web.security.user.NiFiUserUtils;
@@ -439,6 +447,8 @@ public class ControllerFacade {
         final ControllerStatusDTO controllerStatus = new ControllerStatusDTO();
         controllerStatus.setActiveThreadCount(flowController.getActiveThreadCount());
         controllerStatus.setQueued(FormatUtils.formatCount(controllerQueueSize.getObjectCount()) + " / " + FormatUtils.formatDataSize(controllerQueueSize.getByteCount()));
+        controllerStatus.setBytesQueued(controllerQueueSize.getByteCount());
+        controllerStatus.setFlowFilesQueued(controllerQueueSize.getObjectCount());
 
         final BulletinRepository bulletinRepository = getBulletinRepository();
         controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()));
@@ -477,6 +487,116 @@ public class ControllerFacade {
     }
 
     /**
+     * Gets the status for the specified processor.
+     *
+     * @param groupId group id
+     * @param processorId processor id
+     * @return the status for the specified processor
+     */
+    public ProcessorStatusDTO getProcessorStatus(final String groupId, final String processorId) {
+        final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId);
+        if (processGroupStatus == null) {
+            throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
+        }
+
+        for (final ProcessorStatus processorStatus : processGroupStatus.getProcessorStatus()) {
+            if (processorId.equals(processorStatus.getId())) {
+                return dtoFactory.createProcessorStatusDto(processorStatus);
+            }
+        }
+
+        throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId));
+    }
+
+    /**
+     * Gets the status for the specified connection.
+     *
+     * @param groupId group id
+     * @param connectionId connection id
+     * @return the status for the specified connection
+     */
+    public ConnectionStatusDTO getConnectionStatus(final String groupId, final String connectionId) {
+        final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId);
+        if (processGroupStatus == null) {
+            throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
+        }
+
+        for (final ConnectionStatus connectionStatus : processGroupStatus.getConnectionStatus()) {
+            if (connectionId.equals(connectionStatus.getId())) {
+                return dtoFactory.createConnectionStatusDto(connectionStatus);
+            }
+        }
+
+        throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId));
+    }
+
+    /**
+     * Gets the status for the specified input port.
+     *
+     * @param groupId group id
+     * @param portId input port id
+     * @return the status for the specified input port
+     */
+    public PortStatusDTO getInputPortStatus(final String groupId, final String portId) {
+        final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId);
+        if (processGroupStatus == null) {
+            throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
+        }
+
+        for (final PortStatus portStatus : processGroupStatus.getInputPortStatus()) {
+            if (portId.equals(portStatus.getId())) {
+                return dtoFactory.createPortStatusDto(portStatus);
+            }
+        }
+
+        throw new ResourceNotFoundException(String.format("Unable to locate input port with id '%s'.", portId));
+    }
+
+    /**
+     * Gets the status for the specified output port.
+     *
+     * @param groupId group id
+     * @param portId output port id
+     * @return the status for the specified output port
+     */
+    public PortStatusDTO getOutputPortStatus(final String groupId, final String portId) {
+        final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId);
+        if (processGroupStatus == null) {
+            throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
+        }
+
+        for (final PortStatus portStatus : processGroupStatus.getOutputPortStatus()) {
+            if (portId.equals(portStatus.getId())) {
+                return dtoFactory.createPortStatusDto(portStatus);
+            }
+        }
+
+        throw new ResourceNotFoundException(String.format("Unable to locate output port with id '%s'.", portId));
+    }
+
+    /**
+     * Gets the status for the specified remote process group.
+     *
+     * @param groupId group id
+     * @param remoteProcessGroupId remote process group id
+     * @return the status for the specified remote process group
+     */
+    public RemoteProcessGroupStatusDTO getRemoteProcessGroupStatus(final String groupId, final String remoteProcessGroupId) {
+        final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId);
+        if (processGroupStatus == null) {
+            throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
+        }
+
+        for (final RemoteProcessGroupStatus remoteProcessGroupStatus : processGroupStatus.getRemoteProcessGroupStatus()) {
+            if (remoteProcessGroupId.equals(remoteProcessGroupStatus.getId())) {
+                return dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupStatus);
+            }
+        }
+
+        throw new ResourceNotFoundException(String.format("Unable to locate remote process group with id '%s'.", remoteProcessGroupId));
+    }
+
+    /**
      * Gets the BulletinRepository.
      *
      * @return the BulletinRepository