You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/07/21 18:18:57 UTC

[1/2] nifi git commit: NIFI-106: - Expose processors' counters in Stats History - Only include counters in Processors' Status History if user has read access to corresponding Processor - Addressed review feedback. Found and addressed bug where a counter

Repository: nifi
Updated Branches:
  refs/heads/master c54b2ad81 -> 695e8aa98


http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java
index f861c7a..c253a2c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java
@@ -16,8 +16,7 @@
  */
 package org.apache.nifi.spring;
 
-import org.apache.nifi.controller.repository.RingBufferEventRepository;
-
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
 import org.springframework.beans.factory.FactoryBean;
 
 public class RingBufferEventRepositoryBean implements FactoryBean<RingBufferEventRepository> {

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
index a160b9e..cb5c306 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
@@ -16,10 +16,12 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.repository.RingBufferEventRepository;
 import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
 import org.apache.nifi.controller.repository.FlowFileEvent;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
@@ -133,6 +135,11 @@ public class TestRingBufferEventRepository {
             public long getBytesSent() {
                 return 0;
             }
+
+            @Override
+            public Map<String, Long> getCounters() {
+                return Collections.emptyMap();
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 420022f..66a5073 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -289,10 +289,12 @@ public class ControllerFacade implements Authorizable {
             throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId));
         }
 
-        final StatusHistoryDTO statusHistory = flowController.getProcessorStatusHistory(processorId);
+        final boolean authorized = processor.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+
+        final StatusHistoryDTO statusHistory = flowController.getProcessorStatusHistory(processorId, authorized);
 
         // if not authorized
-        if (!processor.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser())) {
+        if (!authorized) {
             statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_NAME, processorId);
             statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_TYPE, "Processor");
         }


[2/2] nifi git commit: NIFI-106: - Expose processors' counters in Stats History - Only include counters in Processors' Status History if user has read access to corresponding Processor - Addressed review feedback. Found and addressed bug where a counter

Posted by mc...@apache.org.
NIFI-106:
- Expose processors' counters in Stats History
- Only include counters in Processors' Status History if user has read access to corresponding Processor
- Addressed review feedback. Found and addressed bug where a counter is not present in all of the aggregate snaphot values for status history, resulting in the UI not rendering the chart properly
- This closes #1872


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/695e8aa9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/695e8aa9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/695e8aa9

Branch: refs/heads/master
Commit: 695e8aa98f1d9cce5a9b3025193ac57f9acd598e
Parents: c54b2ad
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue May 30 14:40:30 2017 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Jul 21 14:18:11 2017 -0400

----------------------------------------------------------------------
 .../nifi/controller/status/ProcessorStatus.java |  14 +
 .../history/ComponentStatusRepository.java      |   4 +-
 .../status/history/StatusHistory.java           |   1 +
 .../web/api/dto/status/StatusDescriptorDTO.java |  22 ++
 .../endpoints/StatusHistoryEndpointMerger.java  |  87 +++++-
 .../controller/repository/FlowFileEvent.java    |   5 +
 .../apache/nifi/controller/FlowController.java  |  12 +-
 .../repository/RingBufferEventRepository.java   | 312 -------------------
 .../repository/StandardFlowFileEvent.java       | 237 --------------
 .../repository/StandardProcessSession.java      |  74 ++++-
 .../repository/metrics/EventContainer.java      |  28 ++
 .../controller/repository/metrics/EventSum.java |  53 ++++
 .../repository/metrics/EventSumValue.java       | 207 ++++++++++++
 .../metrics/RingBufferEventRepository.java      |  67 ++++
 .../metrics/SecondPrecisionEventContainer.java  |  69 ++++
 .../metrics/StandardFlowFileEvent.java          | 205 ++++++++++++
 .../scheduling/EventDrivenSchedulingAgent.java  |   2 +-
 .../status/history/StatusHistoryUtil.java       |  17 +-
 .../VolatileComponentStatusRepository.java      |  19 +-
 .../tasks/ContinuallyRunProcessorTask.java      |   2 +-
 .../spring/RingBufferEventRepositoryBean.java   |   3 +-
 .../TestRingBufferEventRepository.java          |   9 +-
 .../nifi/web/controller/ControllerFacade.java   |   6 +-
 23 files changed, 870 insertions(+), 585 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
index 54be7ba..5a4f64a 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.controller.status;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -42,6 +44,7 @@ public class ProcessorStatus implements Cloneable {
     private long bytesReceived;
     private int flowFilesSent;
     private long bytesSent;
+    private Map<String, Long> counters;
 
     public String getId() {
         return id;
@@ -211,6 +214,14 @@ public class ProcessorStatus implements Cloneable {
         this.bytesSent = bytesSent;
     }
 
+    public Map<String, Long> getCounters() {
+        return counters;
+    }
+
+    public void setCounters(final Map<String, Long> counters) {
+        this.counters = counters;
+    }
+
     @Override
     public ProcessorStatus clone() {
         final ProcessorStatus clonedObj = new ProcessorStatus();
@@ -234,6 +245,7 @@ public class ProcessorStatus implements Cloneable {
         clonedObj.flowFilesRemoved = flowFilesRemoved;
         clonedObj.runStatus = runStatus;
         clonedObj.type = type;
+        clonedObj.counters = counters == null ? null : new HashMap<>(counters);
         return clonedObj;
     }
 
@@ -268,6 +280,8 @@ public class ProcessorStatus implements Cloneable {
         builder.append(processingNanos);
         builder.append(", activeThreadCount=");
         builder.append(activeThreadCount);
+        builder.append(", counters=");
+        builder.append(counters);
         builder.append("]");
         return builder.toString();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java
index 1042c3f..553903f 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java
@@ -103,10 +103,12 @@ public interface ComponentStatusRepository {
      * If the date range is large, the total number of data points could be far
      * too many to process. Therefore, this parameter allows the requestor to
      * indicate how many samples to return.
+     * @param includeCounters specifies whether or not metrics from Processor counters
+     * should be included in the StatusHistory.
      * @return a {@link StatusHistory} that provides the status information
      * about the Processor with the given ID during the given time period
      */
-    StatusHistory getProcessorStatusHistory(String processorId, Date start, Date end, int preferredDataPoints);
+    StatusHistory getProcessorStatusHistory(String processorId, Date start, Date end, int preferredDataPoints, boolean includeCounters);
 
     /**
      * @param remoteGroupId to get history of

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java
index f1bb946..75c609d 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java
@@ -41,4 +41,5 @@ public interface StatusHistory {
      * @return List of snapshots for a given component
      */
     List<StatusSnapshot> getStatusSnapshots();
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java
index 0a3c418..34ee315 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java
@@ -17,6 +17,9 @@
 package org.apache.nifi.web.api.dto.status;
 
 import com.wordnik.swagger.annotations.ApiModelProperty;
+
+import java.util.Objects;
+
 import javax.xml.bind.annotation.XmlType;
 
 /**
@@ -102,4 +105,23 @@ public class StatusDescriptorDTO {
         this.formatter = formatter;
     }
 
+    @Override
+    public int hashCode() {
+        return 31 + 41 * (field == null ? 0 : field.hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof StatusDescriptorDTO)) {
+            return false;
+        }
+        final StatusDescriptorDTO other = (StatusDescriptorDTO) obj;
+        return Objects.equals(field, other.field);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
index 60b40f7..eb96adb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
@@ -16,32 +16,39 @@
  */
 package org.apache.nifi.cluster.coordination.http.endpoints;
 
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.regex.Pattern;
 import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
 import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
 import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
 import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
 import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
 import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
 import org.apache.nifi.controller.status.history.StatusHistoryUtil;
 import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.apache.nifi.controller.status.history.ValueMapper;
 import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO;
+import org.apache.nifi.web.api.dto.status.StatusDescriptorDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.regex.Pattern;
+
 public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
 
     public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status/history");
@@ -55,7 +62,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
         this.componentStatusSnapshotMillis = componentStatusSnapshotMillis;
     }
 
-    private Map<String, MetricDescriptor<?>> getMetricDescriptors(final URI uri) {
+    private Map<String, MetricDescriptor<?>> getStandardMetricDescriptors(final URI uri) {
         final String path = uri.getPath();
 
         final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();
@@ -87,16 +94,19 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
             return false;
         }
 
-        final Map<String, MetricDescriptor<?>> descriptors = getMetricDescriptors(uri);
+        final Map<String, MetricDescriptor<?>> descriptors = getStandardMetricDescriptors(uri);
         return descriptors != null && !descriptors.isEmpty();
     }
 
     @Override
     public NodeResponse merge(URI uri, String method, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse clientResponse) {
-        final Map<String, MetricDescriptor<?>> metricDescriptors = getMetricDescriptors(uri);
+        final Map<String, MetricDescriptor<?>> metricDescriptors = getStandardMetricDescriptors(uri);
 
         final StatusHistoryEntity responseEntity = clientResponse.getClientResponse().getEntity(StatusHistoryEntity.class);
 
+        final Set<StatusDescriptorDTO> fieldDescriptors = new LinkedHashSet<>();
+
+        boolean includeCounters = true;
         StatusHistoryDTO lastStatusHistory = null;
         final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots = new ArrayList<>(successfulResponses.size());
         LinkedHashMap<String, String> noReadPermissionsComponentDetails = null;
@@ -109,6 +119,10 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
                 noReadPermissionsComponentDetails = nodeStatus.getComponentDetails();
             }
 
+            if (!Boolean.TRUE.equals(nodeResponseEntity.getCanRead())) {
+                includeCounters = false;
+            }
+
             final NodeIdentifier nodeId = nodeResponse.getNodeId();
             final NodeStatusSnapshotsDTO nodeStatusSnapshot = new NodeStatusSnapshotsDTO();
             nodeStatusSnapshot.setNodeId(nodeId.getId());
@@ -116,6 +130,38 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
             nodeStatusSnapshot.setApiPort(nodeId.getApiPort());
             nodeStatusSnapshot.setStatusSnapshots(nodeStatus.getAggregateSnapshots());
             nodeStatusSnapshots.add(nodeStatusSnapshot);
+
+            final List<StatusDescriptorDTO> descriptors = nodeStatus.getFieldDescriptors();
+            if (descriptors != null) {
+                fieldDescriptors.addAll(descriptors);
+            }
+        }
+
+        // If there's a status descriptor that is in the fieldDescriptors, but is not in the standard metric descriptors that we find,
+        // then it is a counter metric and should be included only if all StatusHistoryDTO's include counter metrics. This is done because
+        // we include counters in the status history only if the user is authorized to read the Processor. Since it's possible for the nodes
+        // to disagree about who is authorized (if, for example, the authorizer is asynchronously updated), then if any node indicates that
+        // the user is not authorized, we want to assume that the user is, in fact, not authorized.
+        if (includeCounters) {
+            for (final StatusDescriptorDTO descriptorDto : fieldDescriptors) {
+                final String fieldName = descriptorDto.getField();
+
+                if (!metricDescriptors.containsKey(fieldName)) {
+                    final ValueMapper<ProcessorStatus> valueMapper = s -> {
+                        final Map<String, Long> counters = s.getCounters();
+                        if (counters == null) {
+                            return 0L;
+                        }
+
+                        return counters.getOrDefault(descriptorDto.getField(), 0L);
+                    };
+
+                    final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(descriptorDto.getField(),
+                        descriptorDto.getLabel(), descriptorDto.getDescription(), Formatter.COUNT, valueMapper);
+
+                    metricDescriptors.put(fieldName, metricDescriptor);
+                }
+            }
         }
 
         final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
@@ -124,8 +170,8 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
         clusterStatusHistory.setNodeSnapshots(nodeStatusSnapshots);
         if (lastStatusHistory != null) {
             clusterStatusHistory.setComponentDetails(noReadPermissionsComponentDetails == null ? lastStatusHistory.getComponentDetails() : noReadPermissionsComponentDetails);
-            clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors());
         }
+        clusterStatusHistory.setFieldDescriptors(new ArrayList<>(fieldDescriptors));
 
         final StatusHistoryEntity clusterEntity = new StatusHistoryEntity();
         clusterEntity.setStatusHistory(clusterStatusHistory);
@@ -177,6 +223,19 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
         final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
         snapshot.setTimestamp(snapshotDto.getTimestamp());
 
+        // Default all metrics to 0 so that if a counter has not yet been registered, it will have a value of 0 instead
+        // of missing all together.
+        for (final MetricDescriptor<?> descriptor : metricDescriptors.values()) {
+            snapshot.addStatusMetric(descriptor, 0L);
+
+            // If the DTO doesn't have an entry for the metric, add with a value of 0.
+            final Map<String, Long> dtoMetrics = snapshotDto.getStatusMetrics();
+            final String field = descriptor.getField();
+            if (!dtoMetrics.containsKey(field)) {
+                dtoMetrics.put(field, 0L);
+            }
+        }
+
         final Map<String, Long> metrics = snapshotDto.getStatusMetrics();
         for (final Map.Entry<String, Long> entry : metrics.entrySet()) {
             final String metricId = entry.getKey();

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
index f07a530..26cea50 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.controller.repository;
 
+import java.util.Map;
+
 public interface FlowFileEvent {
 
     String getComponentIdentifier();
@@ -51,4 +53,7 @@ public interface FlowFileEvent {
     long getBytesSent();
 
     int getInvocations();
+
+    Map<String, Long> getCounters();
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 9438946..8da894f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2943,6 +2943,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             status.setFlowFilesSent(entry.getFlowFilesSent());
             status.setBytesSent(entry.getBytesSent());
             status.setFlowFilesRemoved(entry.getFlowFilesRemoved());
+
+            if (isProcessorAuthorized) {
+                status.setCounters(entry.getCounters());
+            }
         }
 
         // Determine the run status and get any validation error... only validating while STOPPED
@@ -4482,12 +4486,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getConnectionStatusHistory(connectionId, startTime, endTime, preferredDataPoints));
     }
 
-    public StatusHistoryDTO getProcessorStatusHistory(final String processorId) {
-        return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE);
+    public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final boolean includeCounters) {
+        return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE, includeCounters);
     }
 
-    public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startTime, final Date endTime, final int preferredDataPoints) {
-        return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getProcessorStatusHistory(processorId, startTime, endTime, preferredDataPoints));
+    public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startTime, final Date endTime, final int preferredDataPoints, final boolean includeCounters) {
+        return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getProcessorStatusHistory(processorId, startTime, endTime, preferredDataPoints, includeCounters));
     }
 
     public StatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java
deleted file mode 100644
index fb3cdd2..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class RingBufferEventRepository implements FlowFileEventRepository {
-
-    private final int numMinutes;
-    private final ConcurrentMap<String, EventContainer> componentEventMap = new ConcurrentHashMap<>();
-
-    public RingBufferEventRepository(final int numMinutes) {
-        this.numMinutes = numMinutes;
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public void updateRepository(final FlowFileEvent event) {
-        final String componentId = event.getComponentIdentifier();
-        EventContainer eventContainer = componentEventMap.get(componentId);
-        if (eventContainer == null) {
-            eventContainer = new SecondPrecisionEventContainer(numMinutes);
-            final EventContainer oldEventContainer = componentEventMap.putIfAbsent(componentId, eventContainer);
-            if (oldEventContainer != null) {
-                eventContainer = oldEventContainer;
-            }
-        }
-
-        eventContainer.addEvent(event);
-    }
-
-    @Override
-    public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) {
-        final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport();
-
-        for (final Map.Entry<String, EventContainer> entry : componentEventMap.entrySet()) {
-            final String consumerId = entry.getKey();
-            final EventContainer container = entry.getValue();
-
-            final FlowFileEvent reportEntry = container.generateReport(consumerId, sinceEpochMillis);
-            report.addReportEntry(reportEntry);
-        }
-
-        return report;
-    }
-
-    @Override
-    public void purgeTransferEvents(final long cutoffEpochMilliseconds) {
-        // This is done so that if a processor is removed from the graph, its events
-        // will be removed rather than being kept in memory
-        for (final EventContainer container : componentEventMap.values()) {
-            container.purgeEvents(cutoffEpochMilliseconds);
-        }
-    }
-
-    private static interface EventContainer {
-
-        public void addEvent(FlowFileEvent event);
-
-        public void purgeEvents(long cutoffEpochMillis);
-
-        public FlowFileEvent generateReport(String consumerId, long sinceEpochMillis);
-    }
-
-    private class EventSum {
-
-        private final AtomicReference<EventSumValue> ref = new AtomicReference<>(new EventSumValue());
-
-        private void add(final FlowFileEvent event) {
-            EventSumValue newValue;
-            EventSumValue value;
-            do {
-                value = ref.get();
-                newValue = new EventSumValue(value,
-                        event.getFlowFilesIn(), event.getFlowFilesOut(), event.getFlowFilesRemoved(),
-                        event.getContentSizeIn(), event.getContentSizeOut(), event.getContentSizeRemoved(),
-                        event.getBytesRead(), event.getBytesWritten(),
-                        event.getFlowFilesReceived(), event.getBytesReceived(),
-                        event.getFlowFilesSent(), event.getBytesSent(),
-                        event.getProcessingNanoseconds(), event.getInvocations(), event.getAggregateLineageMillis());
-            } while (!ref.compareAndSet(value, newValue));
-        }
-
-        public EventSumValue getValue() {
-            return ref.get();
-        }
-
-        public void addOrReset(final FlowFileEvent event) {
-            final long expectedMinute = System.currentTimeMillis() / 60000;
-
-            final EventSumValue curValue = ref.get();
-            if (curValue.getMinuteTimestamp() != expectedMinute) {
-                ref.compareAndSet(curValue, new EventSumValue());
-            }
-            add(event);
-        }
-    }
-
-    private static class EventSumValue {
-
-        private final int flowFilesIn, flowFilesOut, flowFilesRemoved;
-        private final long contentSizeIn, contentSizeOut, contentSizeRemoved;
-        private final long bytesRead, bytesWritten;
-        private final int flowFilesReceived, flowFilesSent;
-        private final long bytesReceived, bytesSent;
-        private final long processingNanos;
-        private final long aggregateLineageMillis;
-        private final int invocations;
-
-        private final long minuteTimestamp;
-        private final long millisecondTimestamp;
-
-        public EventSumValue() {
-            flowFilesIn = flowFilesOut = flowFilesRemoved = 0;
-            contentSizeIn = contentSizeOut = contentSizeRemoved = 0;
-            bytesRead = bytesWritten = 0;
-            flowFilesReceived = flowFilesSent = 0;
-            bytesReceived = bytesSent = 0L;
-            processingNanos = invocations = 0;
-            aggregateLineageMillis = 0L;
-            this.millisecondTimestamp = System.currentTimeMillis();
-            this.minuteTimestamp = millisecondTimestamp / 60000;
-        }
-
-        public EventSumValue(final EventSumValue base, final int flowFilesIn, final int flowFilesOut, final int flowFilesRemoved,
-                final long contentSizeIn, final long contentSizeOut, final long contentSizeRemoved,
-                final long bytesRead, final long bytesWritten,
-                final int flowFilesReceived, final long bytesReceived,
-                final int flowFilesSent, final long bytesSent,
-                final long processingNanos, final int invocations, final long aggregateLineageMillis) {
-            this.flowFilesIn = base.flowFilesIn + flowFilesIn;
-            this.flowFilesOut = base.flowFilesOut + flowFilesOut;
-            this.flowFilesRemoved = base.flowFilesRemoved + flowFilesRemoved;
-            this.contentSizeIn = base.contentSizeIn + contentSizeIn;
-            this.contentSizeOut = base.contentSizeOut + contentSizeOut;
-            this.contentSizeRemoved = base.contentSizeRemoved + contentSizeRemoved;
-            this.bytesRead = base.bytesRead + bytesRead;
-            this.bytesWritten = base.bytesWritten + bytesWritten;
-            this.flowFilesReceived = base.flowFilesReceived + flowFilesReceived;
-            this.bytesReceived = base.bytesReceived + bytesReceived;
-            this.flowFilesSent = base.flowFilesSent + flowFilesSent;
-            this.bytesSent = base.bytesSent + bytesSent;
-            this.processingNanos = base.processingNanos + processingNanos;
-            this.invocations = base.invocations + invocations;
-            this.aggregateLineageMillis = base.aggregateLineageMillis + aggregateLineageMillis;
-            this.millisecondTimestamp = System.currentTimeMillis();
-            this.minuteTimestamp = millisecondTimestamp / 60000;
-        }
-
-        public long getTimestamp() {
-            return millisecondTimestamp;
-        }
-
-        public long getMinuteTimestamp() {
-            return minuteTimestamp;
-        }
-
-        public long getBytesRead() {
-            return bytesRead;
-        }
-
-        public long getBytesWritten() {
-            return bytesWritten;
-        }
-
-        public int getFlowFilesIn() {
-            return flowFilesIn;
-        }
-
-        public int getFlowFilesOut() {
-            return flowFilesOut;
-        }
-
-        public long getContentSizeIn() {
-            return contentSizeIn;
-        }
-
-        public long getContentSizeOut() {
-            return contentSizeOut;
-        }
-
-        public int getFlowFilesRemoved() {
-            return flowFilesRemoved;
-        }
-
-        public long getContentSizeRemoved() {
-            return contentSizeRemoved;
-        }
-
-        public long getProcessingNanoseconds() {
-            return processingNanos;
-        }
-
-        public int getInvocations() {
-            return invocations;
-        }
-
-        public long getAggregateLineageMillis() {
-            return aggregateLineageMillis;
-        }
-
-        public int getFlowFilesReceived() {
-            return flowFilesReceived;
-        }
-
-        public int getFlowFilesSent() {
-            return flowFilesSent;
-        }
-
-        public long getBytesReceived() {
-            return bytesReceived;
-        }
-
-        public long getBytesSent() {
-            return bytesSent;
-        }
-    }
-
-    private class SecondPrecisionEventContainer implements EventContainer {
-
-        private final int numBins;
-        private final EventSum[] sums;
-
-        public SecondPrecisionEventContainer(final int numMinutes) {
-            numBins = 1 + numMinutes * 60;
-            sums = new EventSum[numBins];
-
-            for (int i = 0; i < numBins; i++) {
-                sums[i] = new EventSum();
-            }
-        }
-
-        @Override
-        public void addEvent(final FlowFileEvent event) {
-            final int second = (int) (System.currentTimeMillis() / 1000);
-            final int binIdx = (int) (second % numBins);
-            final EventSum sum = sums[binIdx];
-
-            sum.addOrReset(event);
-        }
-
-        @Override
-        public void purgeEvents(final long cutoffEpochMilliseconds) {
-            // no need to do anything
-        }
-
-        @Override
-        public FlowFileEvent generateReport(final String consumerId, final long sinceEpochMillis) {
-            int flowFilesIn = 0, flowFilesOut = 0, flowFilesRemoved = 0;
-            long contentSizeIn = 0L, contentSizeOut = 0L, contentSizeRemoved = 0L;
-            long bytesRead = 0L, bytesWritten = 0L;
-            int invocations = 0;
-            long processingNanos = 0L;
-            long aggregateLineageMillis = 0L;
-            int flowFilesReceived = 0, flowFilesSent = 0;
-            long bytesReceived = 0L, bytesSent = 0L;
-
-            final long second = sinceEpochMillis / 1000;
-            final int startBinIdx = (int) (second % numBins);
-
-            for (int i = 0; i < numBins; i++) {
-                int binIdx = (startBinIdx + i) % numBins;
-                final EventSum sum = sums[binIdx];
-
-                final EventSumValue sumValue = sum.getValue();
-                if (sumValue.getTimestamp() >= sinceEpochMillis) {
-                    flowFilesIn += sumValue.getFlowFilesIn();
-                    flowFilesOut += sumValue.getFlowFilesOut();
-                    flowFilesRemoved += sumValue.getFlowFilesRemoved();
-                    contentSizeIn += sumValue.getContentSizeIn();
-                    contentSizeOut += sumValue.getContentSizeOut();
-                    contentSizeRemoved += sumValue.getContentSizeRemoved();
-                    bytesRead += sumValue.getBytesRead();
-                    bytesWritten += sumValue.getBytesWritten();
-                    flowFilesReceived += sumValue.getFlowFilesReceived();
-                    bytesReceived += sumValue.getBytesReceived();
-                    flowFilesSent += sumValue.getFlowFilesSent();
-                    bytesSent += sumValue.getBytesSent();
-                    invocations += sumValue.getInvocations();
-                    processingNanos += sumValue.getProcessingNanoseconds();
-                    aggregateLineageMillis += sumValue.getAggregateLineageMillis();
-                }
-            }
-
-            return new StandardFlowFileEvent(consumerId, flowFilesIn, contentSizeIn,
-                    flowFilesOut, contentSizeOut, flowFilesRemoved, contentSizeRemoved,
-                    bytesRead, bytesWritten, flowFilesReceived, bytesReceived, flowFilesSent, bytesSent,
-                    invocations, aggregateLineageMillis, processingNanos);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileEvent.java
deleted file mode 100644
index d584735..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileEvent.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
-
-    private final String componentId;
-
-    private int flowFilesIn;
-    private int flowFilesOut;
-    private int flowFilesRemoved;
-    private long contentSizeIn;
-    private long contentSizeOut;
-    private long contentSizeRemoved;
-    private long bytesRead;
-    private long bytesWritten;
-    private long processingNanos;
-    private long aggregateLineageMillis;
-    private int flowFilesReceived;
-    private long bytesReceived;
-    private int flowFilesSent;
-    private long bytesSent;
-    private int invocations;
-
-    public StandardFlowFileEvent(final String componentId) {
-        this.componentId = componentId;
-    }
-
-    public StandardFlowFileEvent(final String componentId,
-            final int flowFilesIn, final long contentSizeIn,
-            final int flowFilesOut, final long contentSizeOut,
-            final int flowFilesRemoved, final long contentSizeRemoved,
-            final long bytesRead, final long bytesWritten,
-            final int flowFilesReceived, final long bytesReceived,
-            final int flowFilesSent, final long bytesSent,
-            final int invocations, final long averageLineageMillis, final long processingNanos) {
-        this.componentId = componentId;
-        this.flowFilesIn = flowFilesIn;
-        this.contentSizeIn = contentSizeIn;
-        this.flowFilesOut = flowFilesOut;
-        this.contentSizeOut = contentSizeOut;
-        this.flowFilesRemoved = flowFilesRemoved;
-        this.contentSizeRemoved = contentSizeRemoved;
-        this.bytesRead = bytesRead;
-        this.bytesWritten = bytesWritten;
-        this.invocations = invocations;
-        this.flowFilesReceived = flowFilesReceived;
-        this.bytesReceived = bytesReceived;
-        this.flowFilesSent = flowFilesSent;
-        this.bytesSent = bytesSent;
-        this.aggregateLineageMillis = averageLineageMillis;
-        this.processingNanos = processingNanos;
-    }
-
-    public StandardFlowFileEvent(final FlowFileEvent other) {
-        this.componentId = other.getComponentIdentifier();
-        this.flowFilesIn = other.getFlowFilesIn();
-        this.contentSizeIn = other.getContentSizeIn();
-        this.flowFilesOut = other.getFlowFilesOut();
-        this.contentSizeOut = other.getContentSizeOut();
-        this.flowFilesRemoved = other.getFlowFilesRemoved();
-        this.contentSizeRemoved = other.getContentSizeRemoved();
-        this.bytesRead = other.getBytesRead();
-        this.bytesWritten = other.getBytesWritten();
-        this.invocations = other.getInvocations();
-        this.flowFilesReceived = other.getFlowFilesReceived();
-        this.bytesReceived = other.getBytesReceived();
-        this.flowFilesSent = other.getFlowFilesSent();
-        this.bytesSent = other.getBytesSent();
-        this.aggregateLineageMillis = other.getAggregateLineageMillis();
-        this.processingNanos = other.getProcessingNanoseconds();
-    }
-
-    @Override
-    public String getComponentIdentifier() {
-        return componentId;
-    }
-
-    @Override
-    public int getFlowFilesIn() {
-        return flowFilesIn;
-    }
-
-    public void setFlowFilesIn(int flowFilesIn) {
-        this.flowFilesIn = flowFilesIn;
-    }
-
-    @Override
-    public int getFlowFilesOut() {
-        return flowFilesOut;
-    }
-
-    public void setFlowFilesOut(int flowFilesOut) {
-        this.flowFilesOut = flowFilesOut;
-    }
-
-    @Override
-    public long getContentSizeIn() {
-        return contentSizeIn;
-    }
-
-    public void setContentSizeIn(long contentSizeIn) {
-        this.contentSizeIn = contentSizeIn;
-    }
-
-    @Override
-    public long getContentSizeOut() {
-        return contentSizeOut;
-    }
-
-    public void setContentSizeOut(long contentSizeOut) {
-        this.contentSizeOut = contentSizeOut;
-    }
-
-    @Override
-    public long getContentSizeRemoved() {
-        return contentSizeRemoved;
-    }
-
-    public void setContentSizeRemoved(final long contentSizeRemoved) {
-        this.contentSizeRemoved = contentSizeRemoved;
-    }
-
-    @Override
-    public int getFlowFilesRemoved() {
-        return flowFilesRemoved;
-    }
-
-    public void setFlowFilesRemoved(final int flowFilesRemoved) {
-        this.flowFilesRemoved = flowFilesRemoved;
-    }
-
-    @Override
-    public long getBytesRead() {
-        return bytesRead;
-    }
-
-    public void setBytesRead(long bytesRead) {
-        this.bytesRead = bytesRead;
-    }
-
-    @Override
-    public long getBytesWritten() {
-        return bytesWritten;
-    }
-
-    public void setBytesWritten(long bytesWritten) {
-        this.bytesWritten = bytesWritten;
-    }
-
-    @Override
-    public long getProcessingNanoseconds() {
-        return processingNanos;
-    }
-
-    public void setProcessingNanos(final long processingNanos) {
-        this.processingNanos = processingNanos;
-    }
-
-    @Override
-    public int getInvocations() {
-        return invocations;
-    }
-
-    public void setInvocations(final int invocations) {
-        this.invocations = invocations;
-    }
-
-    @Override
-    public int getFlowFilesReceived() {
-        return flowFilesReceived;
-    }
-
-    public void setFlowFilesReceived(int flowFilesReceived) {
-        this.flowFilesReceived = flowFilesReceived;
-    }
-
-    @Override
-    public long getBytesReceived() {
-        return bytesReceived;
-    }
-
-    public void setBytesReceived(long bytesReceived) {
-        this.bytesReceived = bytesReceived;
-    }
-
-    @Override
-    public int getFlowFilesSent() {
-        return flowFilesSent;
-    }
-
-    public void setFlowFilesSent(int flowFilesSent) {
-        this.flowFilesSent = flowFilesSent;
-    }
-
-    @Override
-    public long getBytesSent() {
-        return bytesSent;
-    }
-
-    public void setBytesSent(long bytesSent) {
-        this.bytesSent = bytesSent;
-    }
-
-    @Override
-    public long getAverageLineageMillis() {
-        if (flowFilesOut == 0 && flowFilesRemoved == 0) {
-            return 0L;
-        }
-
-        return aggregateLineageMillis / (flowFilesOut + flowFilesRemoved);
-    }
-
-    public void setAggregateLineageMillis(long lineageMilliseconds) {
-        this.aggregateLineageMillis = lineageMilliseconds;
-    }
-
-    @Override
-    public long getAggregateLineageMillis() {
-        return aggregateLineageMillis;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index d2a6af6..6393014 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -57,6 +57,7 @@ import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
 import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
 import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
+import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.FlowFileFilter;
@@ -108,7 +109,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
     private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<>();
     private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
-    private final Map<String, Long> counters = new HashMap<>();
     private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>();
     private final ProcessContext context;
     private final Map<FlowFile, Integer> readRecursionSet = new HashMap<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
@@ -117,6 +117,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     private final long sessionId;
     private final String connectableDescription;
 
+    private Map<String, Long> countersOnCommit;
+    private Map<String, Long> immediateCounters;
+
     private final Set<String> removedFlowFiles = new HashSet<>();
     private final Set<String> createdFlowFiles = new HashSet<>();
 
@@ -438,8 +441,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 }
             }
 
-            for (final Map.Entry<String, Long> entry : checkpoint.counters.entrySet()) {
-                adjustCounter(entry.getKey(), entry.getValue(), true);
+            for (final Map.Entry<String, Long> entry : checkpoint.countersOnCommit.entrySet()) {
+                context.adjustCounter(entry.getKey(), entry.getValue());
             }
 
             acknowledgeRecords();
@@ -533,6 +536,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             }
             flowFileEvent.setAggregateLineageMillis(lineageMillis);
 
+            final Map<String, Long> counters = combineCounters(checkpoint.countersOnCommit, checkpoint.immediateCounters);
+            flowFileEvent.setCounters(counters);
+
             context.getFlowFileEventRepository().updateRepository(flowFileEvent);
 
             for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) {
@@ -543,6 +549,23 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
     }
 
+    private Map<String, Long> combineCounters(final Map<String, Long> first, final Map<String, Long> second) {
+        if (first == null && second == null) {
+            return null;
+        }
+        if (first == null) {
+            return second;
+        }
+        if (second == null) {
+            return first;
+        }
+
+        final Map<String, Long> combined = new HashMap<>();
+        combined.putAll(first);
+        combined.putAll(second);
+        return combined;
+    }
+
     private void addEventType(final Map<String, Set<ProvenanceEventType>> map, final String id, final ProvenanceEventType eventType) {
         Set<ProvenanceEventType> eventTypes = map.get(id);
         if (eventTypes == null) {
@@ -1013,6 +1036,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
         flowFileEvent.setBytesRead(bytesRead);
         flowFileEvent.setBytesWritten(bytesWritten);
+        flowFileEvent.setCounters(immediateCounters);
 
         // update event repository
         try {
@@ -1106,7 +1130,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         connectionCounts.clear();
         createdFlowFiles.clear();
         removedFlowFiles.clear();
-        counters.clear();
+        if (countersOnCommit != null) {
+            countersOnCommit.clear();
+        }
+        if (immediateCounters != null) {
+            immediateCounters.clear();
+        }
 
         generatedProvenanceEvents.clear();
         forkEventBuilders.clear();
@@ -1441,12 +1470,24 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
     @Override
     public void adjustCounter(final String name, final long delta, final boolean immediate) {
+        final Map<String, Long> counters;
         if (immediate) {
-            context.adjustCounter(name, delta);
-            return;
+            if (immediateCounters == null) {
+                immediateCounters = new HashMap<>();
+            }
+            counters = immediateCounters;
+        } else {
+            if (countersOnCommit == null) {
+                countersOnCommit = new HashMap<>();
+            }
+            counters = countersOnCommit;
         }
 
         adjustCounter(name, delta, counters);
+
+        if (immediate) {
+            context.adjustCounter(name, delta);
+        }
     }
 
     private void adjustCounter(final String name, final long delta, final Map<String, Long> map) {
@@ -3216,7 +3257,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
         private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<>();
         private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
-        private final Map<String, Long> counters = new HashMap<>();
+
+        private Map<String, Long> countersOnCommit = new HashMap<>();
+        private Map<String, Long> immediateCounters = new HashMap<>();
 
         private final Map<FlowFile, Path> deleteOnCommit = new HashMap<>();
         private final Set<String> removedFlowFiles = new HashSet<>();
@@ -3242,7 +3285,22 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             this.records.putAll(session.records);
             this.connectionCounts.putAll(session.connectionCounts);
             this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
-            this.counters.putAll(session.counters);
+
+            if (session.countersOnCommit != null) {
+                if (this.countersOnCommit == null) {
+                    this.countersOnCommit = new HashMap<>();
+                }
+
+                this.countersOnCommit.putAll(session.countersOnCommit);
+            }
+
+            if (session.immediateCounters != null) {
+                if (this.immediateCounters == null) {
+                    this.immediateCounters = new HashMap<>();
+                }
+
+                this.immediateCounters.putAll(session.immediateCounters);
+            }
 
             this.deleteOnCommit.putAll(session.deleteOnCommit);
             this.removedFlowFiles.addAll(session.removedFlowFiles);

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
new file mode 100644
index 0000000..9dd3c8e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+public interface EventContainer {
+    public void addEvent(FlowFileEvent event);
+
+    public void purgeEvents(long cutoffEpochMillis);
+
+    public FlowFileEvent generateReport(String componentId, long sinceEpochMillis);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
new file mode 100644
index 0000000..b1c9120
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+public class EventSum {
+
+    private final AtomicReference<EventSumValue> ref = new AtomicReference<>();
+
+    public EventSumValue getValue() {
+        final EventSumValue value = ref.get();
+        return value == null ? new EventSumValue() : value;
+    }
+
+    public void addOrReset(final FlowFileEvent event) {
+        final long expectedMinute = System.currentTimeMillis() / 60000;
+
+        EventSumValue curValue;
+        while (true) {
+            curValue = ref.get();
+            if (curValue == null || curValue.getMinuteTimestamp() != expectedMinute) {
+                final EventSumValue newValue = new EventSumValue();
+                final boolean replaced = ref.compareAndSet(curValue, newValue);
+                if (replaced) {
+                    curValue = newValue;
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+
+        curValue.add(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
new file mode 100644
index 0000000..3306e2b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+public class EventSumValue {
+
+    private final AtomicInteger flowFilesIn = new AtomicInteger(0);
+    private final AtomicInteger flowFilesOut = new AtomicInteger(0);
+    private final AtomicInteger flowFilesRemoved = new AtomicInteger(0);
+    private final AtomicInteger flowFilesReceived = new AtomicInteger(0);
+    private final AtomicInteger flowFilesSent = new AtomicInteger(0);
+
+    private final AtomicLong contentSizeIn = new AtomicLong(0L);
+    private final AtomicLong contentSizeOut = new AtomicLong(0L);
+    private final AtomicLong contentSizeRemoved = new AtomicLong(0L);
+    private final AtomicLong bytesRead = new AtomicLong(0L);
+    private final AtomicLong bytesWritten = new AtomicLong(0L);
+
+    private final AtomicLong bytesReceived = new AtomicLong(0L);
+    private final AtomicLong bytesSent = new AtomicLong(0L);
+    private final AtomicLong processingNanos = new AtomicLong(0L);
+    private final AtomicLong aggregateLineageMillis = new AtomicLong(0L);
+    private final AtomicInteger invocations = new AtomicInteger(0);
+    private final ConcurrentMap<String, Long> counters = new ConcurrentHashMap<>();
+
+    private final long minuteTimestamp;
+    private final long millisecondTimestamp;
+
+
+    public EventSumValue() {
+        this.millisecondTimestamp = System.currentTimeMillis();
+        this.minuteTimestamp = millisecondTimestamp / 60000;
+    }
+
+    public void add(final FlowFileEvent flowFileEvent) {
+        this.aggregateLineageMillis.addAndGet(flowFileEvent.getAggregateLineageMillis());
+        this.bytesRead.addAndGet(flowFileEvent.getBytesRead());
+        this.bytesReceived.addAndGet(flowFileEvent.getBytesReceived());
+        this.bytesSent.addAndGet(flowFileEvent.getBytesSent());
+        this.bytesWritten.addAndGet(flowFileEvent.getBytesWritten());
+        this.contentSizeIn.addAndGet(flowFileEvent.getContentSizeIn());
+        this.contentSizeOut.addAndGet(flowFileEvent.getContentSizeOut());
+        this.contentSizeRemoved.addAndGet(flowFileEvent.getContentSizeRemoved());
+        this.flowFilesIn.addAndGet(flowFileEvent.getFlowFilesIn());
+        this.flowFilesOut.addAndGet(flowFileEvent.getFlowFilesOut());
+        this.flowFilesReceived.addAndGet(flowFileEvent.getFlowFilesReceived());
+        this.flowFilesRemoved.addAndGet(flowFileEvent.getFlowFilesRemoved());
+        this.flowFilesSent.addAndGet(flowFileEvent.getFlowFilesSent());
+        this.invocations.addAndGet(flowFileEvent.getInvocations());
+        this.processingNanos.addAndGet(flowFileEvent.getProcessingNanoseconds());
+
+        final Map<String, Long> eventCounters = flowFileEvent.getCounters();
+        if (eventCounters != null) {
+            for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
+                final String counterName = entry.getKey();
+                final Long counterValue = entry.getValue();
+
+                counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue);
+            }
+        }
+    }
+
+    public FlowFileEvent toFlowFileEvent(final String componentId) {
+        final StandardFlowFileEvent event = new StandardFlowFileEvent(componentId);
+        event.setAggregateLineageMillis(getAggregateLineageMillis());
+        event.setBytesRead(getBytesRead());
+        event.setBytesReceived(getBytesReceived());
+        event.setBytesSent(getBytesSent());
+        event.setBytesWritten(getBytesWritten());
+        event.setContentSizeIn(getContentSizeIn());
+        event.setContentSizeOut(getContentSizeOut());
+        event.setContentSizeRemoved(getContentSizeRemoved());
+        event.setFlowFilesIn(getFlowFilesIn());
+        event.setFlowFilesOut(getFlowFilesOut());
+        event.setFlowFilesReceived(getFlowFilesReceived());
+        event.setFlowFilesRemoved(getFlowFilesRemoved());
+        event.setFlowFilesSent(getFlowFilesSent());
+        event.setInvocations(getInvocations());
+        event.setProcessingNanos(getProcessingNanoseconds());
+        event.setCounters(Collections.unmodifiableMap(this.counters));
+        return event;
+    }
+
+    public void add(final EventSumValue other) {
+        this.aggregateLineageMillis.addAndGet(other.getAggregateLineageMillis());
+        this.bytesRead.addAndGet(other.getBytesRead());
+        this.bytesReceived.addAndGet(other.getBytesReceived());
+        this.bytesSent.addAndGet(other.getBytesSent());
+        this.bytesWritten.addAndGet(other.getBytesWritten());
+        this.contentSizeIn.addAndGet(other.getContentSizeIn());
+        this.contentSizeOut.addAndGet(other.getContentSizeOut());
+        this.contentSizeRemoved.addAndGet(other.getContentSizeRemoved());
+        this.flowFilesIn.addAndGet(other.getFlowFilesIn());
+        this.flowFilesOut.addAndGet(other.getFlowFilesOut());
+        this.flowFilesReceived.addAndGet(other.getFlowFilesReceived());
+        this.flowFilesRemoved.addAndGet(other.getFlowFilesRemoved());
+        this.flowFilesSent.addAndGet(other.getFlowFilesSent());
+        this.invocations.addAndGet(other.getInvocations());
+        this.processingNanos.addAndGet(other.getProcessingNanoseconds());
+
+        final Map<String, Long> eventCounters = other.getCounters();
+        if (eventCounters != null) {
+            for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
+                final String counterName = entry.getKey();
+                final Long counterValue = entry.getValue();
+
+                counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue);
+            }
+        }
+    }
+
+    public long getTimestamp() {
+        return millisecondTimestamp;
+    }
+
+    public long getMinuteTimestamp() {
+        return minuteTimestamp;
+    }
+
+    public long getBytesRead() {
+        return bytesRead.get();
+    }
+
+    public long getBytesWritten() {
+        return bytesWritten.get();
+    }
+
+    public int getFlowFilesIn() {
+        return flowFilesIn.get();
+    }
+
+    public int getFlowFilesOut() {
+        return flowFilesOut.get();
+    }
+
+    public long getContentSizeIn() {
+        return contentSizeIn.get();
+    }
+
+    public long getContentSizeOut() {
+        return contentSizeOut.get();
+    }
+
+    public int getFlowFilesRemoved() {
+        return flowFilesRemoved.get();
+    }
+
+    public long getContentSizeRemoved() {
+        return contentSizeRemoved.get();
+    }
+
+    public long getProcessingNanoseconds() {
+        return processingNanos.get();
+    }
+
+    public int getInvocations() {
+        return invocations.get();
+    }
+
+    public long getAggregateLineageMillis() {
+        return aggregateLineageMillis.get();
+    }
+
+    public int getFlowFilesReceived() {
+        return flowFilesReceived.get();
+    }
+
+    public int getFlowFilesSent() {
+        return flowFilesSent.get();
+    }
+
+    public long getBytesReceived() {
+        return bytesReceived.get();
+    }
+
+    public long getBytesSent() {
+        return bytesSent.get();
+    }
+
+    public Map<String, Long> getCounters() {
+        return counters;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
new file mode 100644
index 0000000..b9a82ed
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.metrics;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
+
+public class RingBufferEventRepository implements FlowFileEventRepository {
+
+    private final int numMinutes;
+    private final ConcurrentMap<String, EventContainer> componentEventMap = new ConcurrentHashMap<>();
+
+    public RingBufferEventRepository(final int numMinutes) {
+        this.numMinutes = numMinutes;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void updateRepository(final FlowFileEvent event) {
+        final String componentId = event.getComponentIdentifier();
+        final EventContainer eventContainer = componentEventMap.computeIfAbsent(componentId, id -> new SecondPrecisionEventContainer(numMinutes));
+        eventContainer.addEvent(event);
+    }
+
+    @Override
+    public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) {
+        final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport();
+
+        componentEventMap.entrySet().stream()
+            .map(entry -> entry.getValue().generateReport(entry.getKey(), sinceEpochMillis))
+            .forEach(event -> report.addReportEntry(event));
+
+        return report;
+    }
+
+    @Override
+    public void purgeTransferEvents(final long cutoffEpochMilliseconds) {
+        // This is done so that if a processor is removed from the graph, its events
+        // will be removed rather than being kept in memory
+        for (final EventContainer container : componentEventMap.values()) {
+            container.purgeEvents(cutoffEpochMilliseconds);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
new file mode 100644
index 0000000..72a8cfc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
@@ -0,0 +1,69 @@
+/*
+e * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+public class SecondPrecisionEventContainer implements EventContainer {
+    private final int numBins;
+    private final EventSum[] sums;
+
+    public SecondPrecisionEventContainer(final int numMinutes) {
+        numBins = 1 + numMinutes * 60;
+        sums = new EventSum[numBins];
+
+        for (int i = 0; i < numBins; i++) {
+            sums[i] = new EventSum();
+        }
+    }
+
+    @Override
+    public void addEvent(final FlowFileEvent event) {
+        final int second = (int) (System.currentTimeMillis() / 1000);
+        final int binIdx = second % numBins;
+        final EventSum sum = sums[binIdx];
+
+        sum.addOrReset(event);
+    }
+
+    @Override
+    public void purgeEvents(final long cutoffEpochMilliseconds) {
+        // no need to do anything
+    }
+
+    @Override
+    public FlowFileEvent generateReport(final String componentId, final long sinceEpochMillis) {
+        final EventSumValue eventSumValue = new EventSumValue();
+        final long second = sinceEpochMillis / 1000;
+        final int startBinIdx = (int) (second % numBins);
+
+        for (int i = 0; i < numBins; i++) {
+            int binIdx = (startBinIdx + i) % numBins;
+            final EventSum sum = sums[binIdx];
+
+            final EventSumValue sumValue = sum.getValue();
+            if (sumValue.getTimestamp() >= sinceEpochMillis) {
+                eventSumValue.add(sumValue);
+            }
+        }
+
+        final FlowFileEvent flowFileEvent = eventSumValue.toFlowFileEvent(componentId);
+        return flowFileEvent;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
new file mode 100644
index 0000000..40ec983
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.metrics;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
+
+    private final String componentId;
+
+    private int flowFilesIn;
+    private int flowFilesOut;
+    private int flowFilesRemoved;
+    private long contentSizeIn;
+    private long contentSizeOut;
+    private long contentSizeRemoved;
+    private long bytesRead;
+    private long bytesWritten;
+    private long processingNanos;
+    private long aggregateLineageMillis;
+    private int flowFilesReceived;
+    private long bytesReceived;
+    private int flowFilesSent;
+    private long bytesSent;
+    private int invocations;
+    private Map<String, Long> counters;
+
+    public StandardFlowFileEvent(final String componentId) {
+        this.componentId = componentId;
+    }
+
+    @Override
+    public String getComponentIdentifier() {
+        return componentId;
+    }
+
+    @Override
+    public int getFlowFilesIn() {
+        return flowFilesIn;
+    }
+
+    public void setFlowFilesIn(int flowFilesIn) {
+        this.flowFilesIn = flowFilesIn;
+    }
+
+    @Override
+    public int getFlowFilesOut() {
+        return flowFilesOut;
+    }
+
+    public void setFlowFilesOut(int flowFilesOut) {
+        this.flowFilesOut = flowFilesOut;
+    }
+
+    @Override
+    public long getContentSizeIn() {
+        return contentSizeIn;
+    }
+
+    public void setContentSizeIn(long contentSizeIn) {
+        this.contentSizeIn = contentSizeIn;
+    }
+
+    @Override
+    public long getContentSizeOut() {
+        return contentSizeOut;
+    }
+
+    public void setContentSizeOut(long contentSizeOut) {
+        this.contentSizeOut = contentSizeOut;
+    }
+
+    @Override
+    public long getContentSizeRemoved() {
+        return contentSizeRemoved;
+    }
+
+    public void setContentSizeRemoved(final long contentSizeRemoved) {
+        this.contentSizeRemoved = contentSizeRemoved;
+    }
+
+    @Override
+    public int getFlowFilesRemoved() {
+        return flowFilesRemoved;
+    }
+
+    public void setFlowFilesRemoved(final int flowFilesRemoved) {
+        this.flowFilesRemoved = flowFilesRemoved;
+    }
+
+    @Override
+    public long getBytesRead() {
+        return bytesRead;
+    }
+
+    public void setBytesRead(long bytesRead) {
+        this.bytesRead = bytesRead;
+    }
+
+    @Override
+    public long getBytesWritten() {
+        return bytesWritten;
+    }
+
+    public void setBytesWritten(long bytesWritten) {
+        this.bytesWritten = bytesWritten;
+    }
+
+    @Override
+    public long getProcessingNanoseconds() {
+        return processingNanos;
+    }
+
+    public void setProcessingNanos(final long processingNanos) {
+        this.processingNanos = processingNanos;
+    }
+
+    @Override
+    public int getInvocations() {
+        return invocations;
+    }
+
+    public void setInvocations(final int invocations) {
+        this.invocations = invocations;
+    }
+
+    @Override
+    public int getFlowFilesReceived() {
+        return flowFilesReceived;
+    }
+
+    public void setFlowFilesReceived(int flowFilesReceived) {
+        this.flowFilesReceived = flowFilesReceived;
+    }
+
+    @Override
+    public long getBytesReceived() {
+        return bytesReceived;
+    }
+
+    public void setBytesReceived(long bytesReceived) {
+        this.bytesReceived = bytesReceived;
+    }
+
+    @Override
+    public int getFlowFilesSent() {
+        return flowFilesSent;
+    }
+
+    public void setFlowFilesSent(int flowFilesSent) {
+        this.flowFilesSent = flowFilesSent;
+    }
+
+    @Override
+    public long getBytesSent() {
+        return bytesSent;
+    }
+
+    public void setBytesSent(long bytesSent) {
+        this.bytesSent = bytesSent;
+    }
+
+    @Override
+    public long getAverageLineageMillis() {
+        if (flowFilesOut == 0 && flowFilesRemoved == 0) {
+            return 0L;
+        }
+
+        return aggregateLineageMillis / (flowFilesOut + flowFilesRemoved);
+    }
+
+    public void setAggregateLineageMillis(long lineageMilliseconds) {
+        this.aggregateLineageMillis = lineageMilliseconds;
+    }
+
+    @Override
+    public long getAggregateLineageMillis() {
+        return aggregateLineageMillis;
+    }
+
+    @Override
+    public Map<String, Long> getCounters() {
+        return counters;
+    }
+
+    public void setCounters(final Map<String, Long> counters) {
+        this.counters = counters;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 22684a6..0af8657 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -33,9 +33,9 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.repository.BatchingSessionFactory;
 import org.apache.nifi.controller.repository.ProcessContext;
-import org.apache.nifi.controller.repository.StandardFlowFileEvent;
 import org.apache.nifi.controller.repository.StandardProcessSession;
 import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
+import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
index 756e576..a126589 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -37,11 +38,25 @@ public class StatusHistoryUtil {
         final Set<MetricDescriptor<?>> metricDescriptors = new LinkedHashSet<>();
         final LinkedHashMap<String, String> componentDetails = new LinkedHashMap<>(statusHistory.getComponentDetails());
 
+        final Set<String> metricNames = new HashSet<>();
         for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
-            snapshotDtos.add(StatusHistoryUtil.createStatusSnapshotDto(snapshot));
+            final StatusSnapshotDTO snapshotDto = StatusHistoryUtil.createStatusSnapshotDto(snapshot);
+            snapshotDtos.add(snapshotDto);
+            metricNames.addAll(snapshotDto.getStatusMetrics().keySet());
             metricDescriptors.addAll(snapshot.getStatusMetrics().keySet());
         }
 
+        // We need to ensure that the 'aggregate snapshot' has an entry for every metric, including counters.
+        // So for any metric that has is not in the aggregate snapshot, add it with a value of 0
+        for (final StatusSnapshotDTO snapshotDto : snapshotDtos) {
+            final Map<String, Long> metrics = snapshotDto.getStatusMetrics();
+            for (final String metricName : metricNames) {
+                if (!metrics.containsKey(metricName)) {
+                    metrics.put(metricName, 0L);
+                }
+            }
+        }
+
         final StatusHistoryDTO dto = new StatusHistoryDTO();
         dto.setGenerated(new Date());
         dto.setComponentDetails(componentDetails);

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
index 3203972..50a5123 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
@@ -20,6 +20,7 @@ import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
 import org.apache.nifi.util.ComponentStatusReport;
 import org.apache.nifi.util.ComponentStatusReport.ComponentType;
 import org.apache.nifi.util.NiFiProperties;
@@ -29,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Date;
+import java.util.Map;
 
 public class VolatileComponentStatusRepository implements ComponentStatusRepository {
 
@@ -72,7 +74,7 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
     }
 
     @Override
-    public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints) {
+    public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints, final boolean includeCounters) {
         final StandardStatusHistory history = new StandardStatusHistory();
         history.setComponentDetail(COMPONENT_DETAIL_ID, processorId);
 
@@ -98,6 +100,21 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
                     }
                 }
 
+                if (includeCounters) {
+                    final Map<String, Long> counters = status.getCounters();
+                    if (counters != null) {
+                        for (final Map.Entry<String, Long> entry : counters.entrySet()) {
+                            final String counterName = entry.getKey();
+
+                            final String label = entry.getKey() + " (5 mins)";
+                            final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(entry.getKey(), label, label, Formatter.COUNT,
+                                s -> s.getCounters() == null ? null : s.getCounters().get(counterName));
+
+                            snapshot.addStatusMetric(metricDescriptor, entry.getValue());
+                        }
+                    }
+                }
+
                 history.addStatusSnapshot(snapshot);
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index f2a7eee..8c27078 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -25,9 +25,9 @@ import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.repository.BatchingSessionFactory;
 import org.apache.nifi.controller.repository.ProcessContext;
-import org.apache.nifi.controller.repository.StandardFlowFileEvent;
 import org.apache.nifi.controller.repository.StandardProcessSession;
 import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
+import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
 import org.apache.nifi.controller.scheduling.ProcessContextFactory;
 import org.apache.nifi.controller.scheduling.ScheduleState;
 import org.apache.nifi.controller.scheduling.SchedulingAgent;