You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/08/21 15:44:55 UTC

[2/2] nifi git commit: NIFI-5466: Keep a running total of stats for each component. Refactored FlowFileEvent and repository in order to provide more efficient storage of objects on Java heap by allowing the same 'EMPTY' object to be reused - Refactored

NIFI-5466: Keep a running total of stats for each component. Refactored FlowFileEvent and repository in order to provide more efficient storage of objects on Java heap by allowing the same 'EMPTY' object to be reused
 - Refactored VolatileComponentStatusRepository to avoid holding on to ProcessorStatus objects, etc, and only keep what they need
 - Updated VolatileComponentStatusRepository to ensure that we are efficiently storing metrics for processors, etc. that are not running

This closes #2939

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7bbb5a82
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7bbb5a82
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7bbb5a82

Branch: refs/heads/master
Commit: 7bbb5a823aa3d338cecf45bd683f9f34a1338d02
Parents: 410176e
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Aug 2 14:17:36 2018 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Aug 21 11:44:25 2018 -0400

----------------------------------------------------------------------
 .../status/history/MetricDescriptor.java        |   5 +-
 .../status/history/StatusSnapshot.java          |  12 +-
 .../endpoints/StatusHistoryEndpointMerger.java  |  11 +-
 .../controller/repository/FlowFileEvent.java    |   2 -
 .../repository/FlowFileEventRepository.java     |   7 +-
 .../repository/RepositoryStatusReport.java      |   2 +-
 .../apache/nifi/controller/FlowController.java  |  26 +-
 .../repository/StandardProcessSession.java      |  16 +-
 .../StandardRepositoryStatusReport.java         |   7 +-
 .../repository/metrics/EmptyFlowFileEvent.java  | 114 ++++++++
 .../repository/metrics/EventContainer.java      |   6 +-
 .../controller/repository/metrics/EventSum.java |  38 ++-
 .../repository/metrics/EventSumValue.java       |  63 +++-
 .../metrics/RingBufferEventRepository.java      |  16 +-
 .../metrics/SecondPrecisionEventContainer.java  |  96 +++++--
 .../metrics/StandardFlowFileEvent.java          |  14 +-
 .../scheduling/EventDrivenSchedulingAgent.java  |  18 +-
 .../status/history/ComponentDetails.java        | 123 ++++++++
 .../status/history/ComponentStatusHistory.java  |  53 ++++
 .../history/ConnectionStatusDescriptor.java     |  30 +-
 .../status/history/EmptyStatusSnapshot.java     |  66 +++++
 .../status/history/IndexableMetric.java         |  21 ++
 .../status/history/MetricRollingBuffer.java     | 196 +++++++++++++
 .../history/ProcessGroupStatusDescriptor.java   |  59 ++--
 .../history/ProcessorStatusDescriptor.java      |  96 ++++---
 .../RemoteProcessGroupStatusDescriptor.java     |  53 ++--
 .../history/StandardMetricDescriptor.java       |  18 +-
 .../status/history/StandardStatusHistory.java   |  34 +--
 .../status/history/StandardStatusSnapshot.java  |  58 +++-
 .../status/history/StatusHistoryUtil.java       |  18 +-
 .../VolatileComponentStatusRepository.java      | 285 +++++++------------
 .../nifi/controller/tasks/ConnectableTask.java  |  14 +-
 .../nifi/groups/StandardProcessGroup.java       |   2 +-
 .../org/apache/nifi/util/ComponentMetrics.java  | 188 ++++++++++++
 .../apache/nifi/util/ComponentStatusReport.java | 137 ---------
 .../TestRingBufferEventRepository.java          |  31 +-
 .../TestSecondPrecisionEventContainer.java      | 135 +++++++++
 .../status/history/TestMetricRollingBuffer.java | 126 ++++++++
 38 files changed, 1589 insertions(+), 607 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java
index 8fdce05..c0c52b6 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java
@@ -23,13 +23,14 @@ package org.apache.nifi.controller.status.history;
  */
 public interface MetricDescriptor<T> {
 
-    public enum Formatter {
-
+    enum Formatter {
         COUNT,
         DURATION,
         DATA_SIZE
     };
 
+    int getMetricIdentifier();
+
     /**
      * Specifies how the values should be formatted
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java
index 551ceb2..da794ee 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java
@@ -17,7 +17,7 @@
 package org.apache.nifi.controller.status.history;
 
 import java.util.Date;
-import java.util.Map;
+import java.util.Set;
 
 /**
  * A StatusSnapshot represents a Component's status report at some point in time
@@ -29,10 +29,16 @@ public interface StatusSnapshot {
      */
     Date getTimestamp();
 
+    Set<MetricDescriptor<?>> getMetricDescriptors();
+
+    Long getStatusMetric(MetricDescriptor<?> descriptor);
+
     /**
-     * @return a Map of MetricDescriptor to value
+     * Returns an instance of StatusSnapshot that has all the same information as {@code this} except for
+     * Counters. If {@code this} does not contain any counters, the object returned may (or may not) be {@code this}.
+     * @return a StatusSnapshot without counters
      */
-    Map<MetricDescriptor<?>, Long> getStatusMetrics();
+    StatusSnapshot withoutCounters();
 
     /**
      * @return a {@link ValueReducer} that is capable of merging multiple

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
index 49e952b..8e4c26b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
@@ -41,6 +41,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -156,7 +157,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
                         return counters.getOrDefault(descriptorDto.getField(), 0L);
                     };
 
-                    final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(descriptorDto.getField(),
+                    final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(() -> 0, descriptorDto.getField(),
                         descriptorDto.getLabel(), descriptorDto.getDescription(), Formatter.COUNT, valueMapper);
 
                     metricDescriptors.put(fieldName, metricDescriptor);
@@ -197,11 +198,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
                 final StatusSnapshot snapshot = createSnapshot(snapshotDto, metricDescriptors);
                 final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
 
-                Map<String, StatusSnapshot> nodeToSnapshotMap = dateToNodeSnapshots.get(normalizedDate);
-                if (nodeToSnapshotMap == null) {
-                    nodeToSnapshotMap = new HashMap<>();
-                    dateToNodeSnapshots.put(normalizedDate, nodeToSnapshotMap);
-                }
+                Map<String, StatusSnapshot> nodeToSnapshotMap = dateToNodeSnapshots.computeIfAbsent(normalizedDate, k -> new HashMap<>());
                 nodeToSnapshotMap.put(nodeStatusSnapshot.getNodeId(), snapshot);
             }
         }
@@ -220,7 +217,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
     }
 
     private StatusSnapshot createSnapshot(final StatusSnapshotDTO snapshotDto, final Map<String, MetricDescriptor<?>> metricDescriptors) {
-        final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
+        final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(new HashSet<>(metricDescriptors.values()));
         snapshot.setTimestamp(snapshotDto.getTimestamp());
 
         // Default all metrics to 0 so that if a counter has not yet been registered, it will have a value of 0 instead

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
index 26cea50..7b131cc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
@@ -20,8 +20,6 @@ import java.util.Map;
 
 public interface FlowFileEvent {
 
-    String getComponentIdentifier();
-
     int getFlowFilesIn();
 
     int getFlowFilesOut();

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
index 1781d18..8fac237 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
@@ -25,15 +25,16 @@ public interface FlowFileEventRepository extends Closeable {
      * Updates the repository to include a new FlowFile processing event
      *
      * @param event new event
+     * @param  componentIdentifier the ID of the component that the event belongs to
      * @throws java.io.IOException ioe
      */
-    void updateRepository(FlowFileEvent event) throws IOException;
+    void updateRepository(FlowFileEvent event, String componentIdentifier) throws IOException;
 
     /**
-     * @param sinceEpochMillis age of report
+     * @param now the current time
      * @return a report of processing activity since the given time
      */
-    RepositoryStatusReport reportTransferEvents(long sinceEpochMillis);
+    RepositoryStatusReport reportTransferEvents(long now);
 
     /**
      * Causes any flow file events of the given entry age in epoch milliseconds

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
index e434905..5479e26 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
@@ -20,7 +20,7 @@ import java.util.Map;
 
 public interface RepositoryStatusReport {
 
-    void addReportEntry(FlowFileEvent entry);
+    void addReportEntry(FlowFileEvent entry, String componentId);
 
     Map<String, FlowFileEvent> getReportEntries();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index b83749c..21c61e9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -103,6 +103,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
+import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent;
 import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
 import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
 import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
@@ -630,7 +631,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
-                componentStatusRepository.capture(getControllerStatus(), getGarbageCollectionStatus());
+                try {
+                    componentStatusRepository.capture(getControllerStatus(), getGarbageCollectionStatus());
+                } catch (final Exception e) {
+                    LOG.error("Failed to capture component stats for Stats History", e);
+                }
             }
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
@@ -3333,18 +3338,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         status.setType(isProcessorAuthorized ? procNode.getComponentType() : "Processor");
 
         final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier());
-        if (entry == null) {
-            status.setInputBytes(0L);
-            status.setInputCount(0);
-            status.setOutputBytes(0L);
-            status.setOutputCount(0);
-            status.setBytesWritten(0L);
-            status.setBytesRead(0L);
-            status.setProcessingNanos(0);
-            status.setInvocations(0);
-            status.setAverageLineageDuration(0L);
-            status.setFlowFilesRemoved(0);
-        } else {
+        if (entry != null && entry != EmptyFlowFileEvent.INSTANCE) {
             final int processedCount = entry.getFlowFilesOut();
             final long numProcessedBytes = entry.getContentSizeOut();
             status.setOutputBytes(numProcessedBytes);
@@ -4117,13 +4111,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     private RepositoryStatusReport getProcessorStats() {
-        // processed in last 5 minutes
-        return getProcessorStats(System.currentTimeMillis() - 300000);
+        return flowFileEventRepository.reportTransferEvents(System.currentTimeMillis());
     }
 
-    private RepositoryStatusReport getProcessorStats(final long since) {
-        return flowFileEventRepository.reportTransferEvents(since);
-    }
 
     //
     // Clustering methods

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 7255645..9741cff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -528,7 +528,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         try {
             // update event repository
             final Connectable connectable = context.getConnectable();
-            final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
+            final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent();
             flowFileEvent.setBytesRead(checkpoint.bytesRead);
             flowFileEvent.setBytesWritten(checkpoint.bytesWritten);
             flowFileEvent.setContentSizeIn(checkpoint.contentSizeIn);
@@ -553,10 +553,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final Map<String, Long> counters = combineCounters(checkpoint.countersOnCommit, checkpoint.immediateCounters);
             flowFileEvent.setCounters(counters);
 
-            context.getFlowFileEventRepository().updateRepository(flowFileEvent);
+            context.getFlowFileEventRepository().updateRepository(flowFileEvent, connectable.getIdentifier());
 
-            for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) {
-                context.getFlowFileEventRepository().updateRepository(connectionEvent);
+            for (final Map.Entry<String, StandardFlowFileEvent> entry : checkpoint.connectionCounts.entrySet()) {
+                context.getFlowFileEventRepository().updateRepository(entry.getValue(), entry.getKey());
             }
         } catch (final IOException ioe) {
             LOG.error("FlowFile Event Repository failed to update", ioe);
@@ -1052,14 +1052,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         final Connectable connectable = context.getConnectable();
-        final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
+        final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent();
         flowFileEvent.setBytesRead(bytesRead);
         flowFileEvent.setBytesWritten(bytesWritten);
         flowFileEvent.setCounters(immediateCounters);
 
         // update event repository
         try {
-            context.getFlowFileEventRepository().updateRepository(flowFileEvent);
+            context.getFlowFileEventRepository().updateRepository(flowFileEvent, connectable.getIdentifier());
         } catch (final Exception e) {
             LOG.error("Failed to update FlowFileEvent Repository due to " + e);
             if (LOG.isDebugEnabled()) {
@@ -1458,7 +1458,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     private void incrementConnectionInputCounts(final String connectionId, final int flowFileCount, final long bytes) {
-        final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent(id));
+        final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent());
         connectionEvent.setContentSizeIn(connectionEvent.getContentSizeIn() + bytes);
         connectionEvent.setFlowFilesIn(connectionEvent.getFlowFilesIn() + flowFileCount);
     }
@@ -1468,7 +1468,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     private void incrementConnectionOutputCounts(final String connectionId, final int flowFileCount, final long bytes) {
-        final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent(id));
+        final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent());
         connectionEvent.setContentSizeOut(connectionEvent.getContentSizeOut() + bytes);
         connectionEvent.setFlowFilesOut(connectionEvent.getFlowFilesOut() + flowFileCount);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java
index 3e30059..a367168 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java
@@ -54,13 +54,14 @@ public class StandardRepositoryStatusReport implements RepositoryStatusReport {
      * Adds an entry to the report.
      *
      * @param entry an entry
+     * @param componentId the id of the component that the entry belongs to
      */
     @Override
-    public void addReportEntry(FlowFileEvent entry) {
+    public void addReportEntry(FlowFileEvent entry, final String componentId) {
         if (entry == null) {
             throw new NullPointerException("report entry may not be null");
         }
-        this.entries.put(entry.getComponentIdentifier(), entry);
+        this.entries.put(componentId, entry);
     }
 
     @Override
@@ -69,7 +70,7 @@ public class StandardRepositoryStatusReport implements RepositoryStatusReport {
         for (final String key : this.entries.keySet()) {
             final FlowFileEvent entry = this.entries.get(key);
             strb.append("[")
-                    .append(entry.getComponentIdentifier()).append(", ")
+                    .append(key).append(", ")
                     .append(entry.getFlowFilesIn()).append(", ")
                     .append(entry.getContentSizeIn()).append(", ")
                     .append(entry.getFlowFilesOut()).append(", ")

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java
new file mode 100644
index 0000000..3c14140
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.metrics;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EmptyFlowFileEvent implements FlowFileEvent {
+    public static final EmptyFlowFileEvent INSTANCE = new EmptyFlowFileEvent();
+
+    private EmptyFlowFileEvent() {
+    }
+
+    @Override
+    public int getFlowFilesIn() {
+        return 0;
+    }
+
+    @Override
+    public int getFlowFilesOut() {
+        return 0;
+    }
+
+    @Override
+    public int getFlowFilesRemoved() {
+        return 0;
+    }
+
+    @Override
+    public long getContentSizeIn() {
+        return 0;
+    }
+
+    @Override
+    public long getContentSizeOut() {
+        return 0;
+    }
+
+    @Override
+    public long getContentSizeRemoved() {
+        return 0;
+    }
+
+    @Override
+    public long getBytesRead() {
+        return 0;
+    }
+
+    @Override
+    public long getBytesWritten() {
+        return 0;
+    }
+
+    @Override
+    public long getProcessingNanoseconds() {
+        return 0;
+    }
+
+    @Override
+    public long getAverageLineageMillis() {
+        return 0;
+    }
+
+    @Override
+    public long getAggregateLineageMillis() {
+        return 0;
+    }
+
+    @Override
+    public int getFlowFilesReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getBytesReceived() {
+        return 0;
+    }
+
+    @Override
+    public int getFlowFilesSent() {
+        return 0;
+    }
+
+    @Override
+    public long getBytesSent() {
+        return 0;
+    }
+
+    @Override
+    public int getInvocations() {
+        return 0;
+    }
+
+    @Override
+    public Map<String, Long> getCounters() {
+        return Collections.emptyMap();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
index 9dd3c8e..d193b7d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
@@ -20,9 +20,9 @@ package org.apache.nifi.controller.repository.metrics;
 import org.apache.nifi.controller.repository.FlowFileEvent;
 
 public interface EventContainer {
-    public void addEvent(FlowFileEvent event);
+    void addEvent(FlowFileEvent event);
 
-    public void purgeEvents(long cutoffEpochMillis);
+    void purgeEvents(long cutoffEpochMillis);
 
-    public FlowFileEvent generateReport(String componentId, long sinceEpochMillis);
+    FlowFileEvent generateReport(long sinceEpochMillis);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
index b1c9120..5fef08b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
@@ -17,31 +17,30 @@
 
 package org.apache.nifi.controller.repository.metrics;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.nifi.controller.repository.FlowFileEvent;
 
-public class EventSum {
+import java.util.concurrent.atomic.AtomicReference;
 
+public class EventSum {
     private final AtomicReference<EventSumValue> ref = new AtomicReference<>();
 
     public EventSumValue getValue() {
         final EventSumValue value = ref.get();
-        return value == null ? new EventSumValue() : value;
+        return value == null ? new EventSumValue(System.currentTimeMillis()) : value;
     }
 
-    public void addOrReset(final FlowFileEvent event) {
-        final long expectedMinute = System.currentTimeMillis() / 60000;
+    public EventSumValue addOrReset(final FlowFileEvent event, final long timestamp) {
+        final long expectedSecond = timestamp / 1000;
 
         EventSumValue curValue;
         while (true) {
             curValue = ref.get();
-            if (curValue == null || curValue.getMinuteTimestamp() != expectedMinute) {
-                final EventSumValue newValue = new EventSumValue();
+            if (curValue == null || (curValue.getTimestamp() / 1000) != expectedSecond) {
+                final EventSumValue newValue = new EventSumValue(timestamp);
                 final boolean replaced = ref.compareAndSet(curValue, newValue);
                 if (replaced) {
-                    curValue = newValue;
-                    break;
+                    newValue.add(event);
+                    return curValue;
                 }
             } else {
                 break;
@@ -49,5 +48,24 @@ public class EventSum {
         }
 
         curValue.add(event);
+        return null;
+    }
+
+
+    public EventSumValue reset(final long ifOlderThan) {
+        while (true) {
+            final EventSumValue curValue = ref.get();
+            if (curValue == null) {
+                return null;
+            }
+
+            if (curValue.getTimestamp() < ifOlderThan) {
+                if (ref.compareAndSet(curValue, null)) {
+                    return curValue;
+                }
+            } else {
+                return null;
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
index 90990df..210f7ac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
@@ -17,13 +17,14 @@
 
 package org.apache.nifi.controller.repository.metrics;
 
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.nifi.controller.repository.FlowFileEvent;
-
 public class EventSumValue {
+    private volatile boolean empty = true;
 
     private int flowFilesIn = 0;
     private int flowFilesOut = 0;
@@ -44,16 +45,16 @@ public class EventSumValue {
     private int invocations = 0;
     private Map<String, Long> counters;
 
-    private final long minuteTimestamp;
     private final long millisecondTimestamp;
 
 
-    public EventSumValue() {
-        this.millisecondTimestamp = System.currentTimeMillis();
-        this.minuteTimestamp = millisecondTimestamp / 60000;
+    public EventSumValue(final long timestamp) {
+        this.millisecondTimestamp = timestamp;
     }
 
     public synchronized void add(final FlowFileEvent flowFileEvent) {
+        empty = false;
+
         this.aggregateLineageMillis += flowFileEvent.getAggregateLineageMillis();
         this.bytesRead += flowFileEvent.getBytesRead();
         this.bytesReceived += flowFileEvent.getBytesReceived();
@@ -84,8 +85,12 @@ public class EventSumValue {
         }
     }
 
-    public synchronized FlowFileEvent toFlowFileEvent(final String componentId) {
-        final StandardFlowFileEvent event = new StandardFlowFileEvent(componentId);
+    public synchronized FlowFileEvent toFlowFileEvent() {
+        if (empty) {
+            return EmptyFlowFileEvent.INSTANCE;
+        }
+
+        final StandardFlowFileEvent event = new StandardFlowFileEvent();
         event.setAggregateLineageMillis(aggregateLineageMillis);
         event.setBytesRead(bytesRead);
         event.setBytesReceived(bytesReceived);
@@ -106,6 +111,10 @@ public class EventSumValue {
     }
 
     public synchronized void add(final EventSumValue other) {
+        if (other.empty) {
+            return;
+        }
+
         synchronized (other) {
             this.aggregateLineageMillis += other.aggregateLineageMillis;
             this.bytesRead += other.bytesRead;
@@ -139,8 +148,42 @@ public class EventSumValue {
         }
     }
 
-    public long getMinuteTimestamp() {
-        return minuteTimestamp;
+    public synchronized void subtract(final EventSumValue other) {
+        if (other.empty) {
+            return;
+        }
+
+        synchronized (other) {
+            this.aggregateLineageMillis -= other.aggregateLineageMillis;
+            this.bytesRead -= other.bytesRead;
+            this.bytesReceived -= other.bytesReceived;
+            this.bytesSent -= other.bytesSent;
+            this.bytesWritten -= other.bytesWritten;
+            this.contentSizeIn -= other.contentSizeIn;
+            this.contentSizeOut -= other.contentSizeOut;
+            this.contentSizeRemoved -= other.contentSizeRemoved;
+            this.flowFilesIn -= other.flowFilesIn;
+            this.flowFilesOut -= other.flowFilesOut;
+            this.flowFilesReceived -= other.flowFilesReceived;
+            this.flowFilesRemoved -= other.flowFilesRemoved;
+            this.flowFilesSent -= other.flowFilesSent;
+            this.invocations -= other.invocations;
+            this.processingNanos -= other.processingNanos;
+
+            final Map<String, Long> eventCounters = other.counters;
+            if (eventCounters != null) {
+                if (counters == null) {
+                    counters = new HashMap<>();
+                }
+
+                for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
+                    final String counterName = entry.getKey();
+                    final Long counterValue = entry.getValue();
+
+                    counters.compute(counterName, (key, value) -> value == null ? counterValue : counterValue - value);
+                }
+            }
+        }
     }
 
     public long getTimestamp() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
index c60f98d..bcd0344 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
@@ -16,14 +16,14 @@
  */
 package org.apache.nifi.controller.repository.metrics;
 
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.nifi.controller.repository.FlowFileEvent;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
 
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 public class RingBufferEventRepository implements FlowFileEventRepository {
 
     private final int numMinutes;
@@ -38,8 +38,7 @@ public class RingBufferEventRepository implements FlowFileEventRepository {
     }
 
     @Override
-    public void updateRepository(final FlowFileEvent event) {
-        final String componentId = event.getComponentIdentifier();
+    public void updateRepository(final FlowFileEvent event, final String componentId) {
         final EventContainer eventContainer = componentEventMap.computeIfAbsent(componentId, id -> new SecondPrecisionEventContainer(numMinutes));
         eventContainer.addEvent(event);
     }
@@ -48,10 +47,7 @@ public class RingBufferEventRepository implements FlowFileEventRepository {
     public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) {
         final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport();
 
-        componentEventMap.entrySet().stream()
-            .map(entry -> entry.getValue().generateReport(entry.getKey(), sinceEpochMillis))
-            .forEach(event -> report.addReportEntry(event));
-
+        componentEventMap.forEach((componentId, container) -> report.addReportEntry(container.generateReport(sinceEpochMillis), componentId));
         return report;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
index 72a8cfc..2482177 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
@@ -18,13 +18,24 @@ e * Licensed to the Apache Software Foundation (ASF) under one or more
 package org.apache.nifi.controller.repository.metrics;
 
 import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
 
 public class SecondPrecisionEventContainer implements EventContainer {
+    private static final Logger logger = LoggerFactory.getLogger(SecondPrecisionEventContainer.class);
+
     private final int numBins;
     private final EventSum[] sums;
+    private final EventSumValue aggregateValue = new EventSumValue(0);
+    private final AtomicLong lastUpdateSecond = new AtomicLong(0);
 
     public SecondPrecisionEventContainer(final int numMinutes) {
-        numBins = 1 + numMinutes * 60;
+        // number of bins is number of seconds in 'numMinutes' plus 1. We add one because
+        // we want to have the 'current bin' that we are adding values to, in addition to the
+        // previous (X = numMinutes * 60) bins of values that have completed
+        numBins = numMinutes * 60 + 1;
         sums = new EventSum[numBins];
 
         for (int i = 0; i < numBins; i++) {
@@ -34,11 +45,62 @@ public class SecondPrecisionEventContainer implements EventContainer {
 
     @Override
     public void addEvent(final FlowFileEvent event) {
-        final int second = (int) (System.currentTimeMillis() / 1000);
-        final int binIdx = second % numBins;
+        addEvent(event, System.currentTimeMillis());
+    }
+
+    protected void addEvent(final FlowFileEvent event, final long timestamp) {
+        final long second = timestamp / 1000;
+        final int binIdx = (int) (second % numBins);
         final EventSum sum = sums[binIdx];
 
-        sum.addOrReset(event);
+        final EventSumValue replaced = sum.addOrReset(event, timestamp);
+
+        aggregateValue.add(event);
+
+        if (replaced == null) {
+            logger.debug("Updated bin {}. Did NOT replace.", binIdx);
+        } else {
+            logger.debug("Replaced bin {}", binIdx);
+            aggregateValue.subtract(replaced);
+        }
+
+        // If there are any buckets that have expired, we need to update our aggregate value to reflect that.
+        processExpiredBuckets(second);
+    }
+
+    private void processExpiredBuckets(final long currentSecond) {
+        final long lastUpdate = lastUpdateSecond.get();
+        if (currentSecond > lastUpdate) {
+            final boolean updated = lastUpdateSecond.compareAndSet(lastUpdate, currentSecond);
+            if (updated) {
+                if (lastUpdate == 0L) {
+                    // First update, so nothing to expire
+                    return;
+                }
+
+                final int secondsElapsed = (int) (currentSecond - lastUpdate);
+
+                int index = (int) (currentSecond % numBins);
+                final long expirationTimestamp = 1000 * (currentSecond - numBins);
+
+                int expired = 0;
+                for (int i=0; i < secondsElapsed; i++) {
+                    index--;
+                    if (index < 0) {
+                        index = sums.length - 1;
+                    }
+
+                    final EventSum expiredSum = sums[index];
+                    final EventSumValue expiredValue = expiredSum.reset(expirationTimestamp);
+                    if (expiredValue != null) {
+                        aggregateValue.subtract(expiredValue);
+                        expired++;
+                    }
+                }
+
+                logger.debug("Expired {} bins", expired);
+            }
+        }
     }
 
     @Override
@@ -47,23 +109,17 @@ public class SecondPrecisionEventContainer implements EventContainer {
     }
 
     @Override
-    public FlowFileEvent generateReport(final String componentId, final long sinceEpochMillis) {
-        final EventSumValue eventSumValue = new EventSumValue();
-        final long second = sinceEpochMillis / 1000;
-        final int startBinIdx = (int) (second % numBins);
-
-        for (int i = 0; i < numBins; i++) {
-            int binIdx = (startBinIdx + i) % numBins;
-            final EventSum sum = sums[binIdx];
-
-            final EventSumValue sumValue = sum.getValue();
-            if (sumValue.getTimestamp() >= sinceEpochMillis) {
-                eventSumValue.add(sumValue);
-            }
+    public FlowFileEvent generateReport(final long now) {
+        final long second = now / 1000 + 1;
+        final long lastUpdate = lastUpdateSecond.get();
+        final long secondsSinceUpdate = second - lastUpdate;
+        if (secondsSinceUpdate > numBins) {
+            logger.debug("EventContainer hasn't been updated in {} seconds so will generate report as Empty FlowFile Event", secondsSinceUpdate);
+            return EmptyFlowFileEvent.INSTANCE;
         }
 
-        final FlowFileEvent flowFileEvent = eventSumValue.toFlowFileEvent(componentId);
-        return flowFileEvent;
+        logger.debug("Will expire up to {} bins", secondsSinceUpdate);
+        processExpiredBuckets(second);
+        return aggregateValue.toFlowFileEvent();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
index 40ec983..fc00675 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
@@ -16,13 +16,11 @@
  */
 package org.apache.nifi.controller.repository.metrics;
 
-import java.util.Map;
-
 import org.apache.nifi.controller.repository.FlowFileEvent;
 
-public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
+import java.util.Map;
 
-    private final String componentId;
+public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
 
     private int flowFilesIn;
     private int flowFilesOut;
@@ -41,13 +39,7 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
     private int invocations;
     private Map<String, Long> counters;
 
-    public StandardFlowFileEvent(final String componentId) {
-        this.componentId = componentId;
-    }
-
-    @Override
-    public String getComponentIdentifier() {
-        return componentId;
+    public StandardFlowFileEvent() {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 8b960fc..de97225 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -16,13 +16,6 @@
  */
 package org.apache.nifi.controller.scheduling;
 
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
@@ -54,6 +47,13 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
 
     private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
@@ -255,10 +255,10 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
                             }
                             try {
                                 final long processingNanos = System.nanoTime() - startNanos;
-                                final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier());
+                                final StandardFlowFileEvent procEvent = new StandardFlowFileEvent();
                                 procEvent.setProcessingNanos(processingNanos);
                                 procEvent.setInvocations(invocationCount);
-                                context.getFlowFileEventRepository().updateRepository(procEvent);
+                                context.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier());
                             } catch (final IOException e) {
                                 logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString());
                                 logger.error("", e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java
new file mode 100644
index 0000000..68a79b9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ComponentDetails {
+    private final String componentId;
+    private final String groupId;
+    private final String componentName;
+    private final String componentType;
+    private final String sourceName;
+    private final String destinationName;
+    private final String targetUri;
+
+
+    public ComponentDetails(final String id, final String groupId, final String componentName, final String componentType,
+                            final String sourceName, final String destinationName, final String remoteUri) {
+        this.componentId = id;
+        this.groupId = groupId;
+        this.componentName = componentName;
+        this.componentType = componentType;
+        this.sourceName = sourceName;
+        this.destinationName = destinationName;
+        this.targetUri = remoteUri;
+    }
+
+    public static ComponentDetails forProcessor(final ProcessorStatus status) {
+        return forProcessor(status.getId(), status.getGroupId(), status.getName(), status.getType());
+    }
+
+    public static ComponentDetails forProcessor(final String id, final String groupId, final String processorName, final String processorType) {
+        return new ComponentDetails(id, groupId, processorName, processorType, null, null, null);
+    }
+
+    public static ComponentDetails forConnection(final ConnectionStatus status) {
+        return forConnection(status.getId(), status.getGroupId(), status.getName(), status.getSourceName(), status.getDestinationName());
+    }
+
+    public static ComponentDetails forConnection(final String id, final String groupId, final String connectionName, final String sourceName, final String destinationName) {
+        return new ComponentDetails(id, groupId, connectionName, sourceName, destinationName, null, null);
+    }
+
+    public static ComponentDetails forProcessGroup(final ProcessGroupStatus status) {
+        return forProcessGroup(status.getId(), status.getName());
+    }
+
+    public static ComponentDetails forProcessGroup(final String id, final String groupName) {
+        return new ComponentDetails(id,null, groupName, null, null, null, null);
+    }
+
+    public static ComponentDetails forRemoteProcessGroup(final RemoteProcessGroupStatus status) {
+        return forRemoteProcessGroup(status.getId(), status.getGroupId(), status.getName(), status.getTargetUri());
+    }
+
+    public static ComponentDetails forRemoteProcessGroup(final String id, final String parentGroupId, final String rpgName, final String remoteUri) {
+        return new ComponentDetails(id, parentGroupId, rpgName, null, null, null, remoteUri);
+    }
+
+    public String getComponentId() {
+        return componentId;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public String getComponentName() {
+        return componentName;
+    }
+
+    public String getComponentType() {
+        return componentType;
+    }
+
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public String getTargetUri() {
+        return targetUri;
+    }
+
+    /**
+     * Returns a {@Link Map} whose keys are those values defined by {@link ComponentStatusRepository#COMPONENT_DETAIL_GROUP_ID ComponentStatusRepository.COMPONENT_DETAIL_*}
+     * and values are the values that are populated for this ComponentDetails object.
+     */
+    public Map<String, String> toMap() {
+        final Map<String, String> map = new HashMap<>();
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_ID, componentId);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_GROUP_ID, groupId);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_NAME, componentName);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_TYPE, componentType);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_SOURCE_NAME, sourceName);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_DESTINATION_NAME, destinationName);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_URI, targetUri);
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java
new file mode 100644
index 0000000..cd4b76b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ComponentStatusHistory {
+
+    private final MetricRollingBuffer snapshots;
+    private ComponentDetails componentDetails;
+
+    public ComponentStatusHistory(final ComponentDetails details, final int maxCapacity) {
+        this.componentDetails = details;
+        snapshots = new MetricRollingBuffer(maxCapacity);
+    }
+
+    public void expireBefore(final Date timestamp) {
+        snapshots.expireBefore(timestamp);
+    }
+
+    public void update(final StatusSnapshot snapshot, final ComponentDetails details) {
+        if (snapshot == null) {
+            return;
+        }
+
+        snapshots.update(snapshot);
+        componentDetails = details;
+    }
+
+    public StatusHistory toStatusHistory(final List<Date> timestamps, final boolean includeCounters, final Set<MetricDescriptor<?>> defaultStatusMetrics) {
+        final Date dateGenerated = new Date();
+        final Map<String, String> componentDetailsMap = componentDetails.toMap();
+        final List<StatusSnapshot> snapshotList = snapshots.getSnapshots(timestamps, includeCounters, defaultStatusMetrics);
+        return new StandardStatusHistory(snapshotList, componentDetailsMap, dateGenerated);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
index c298803..ac738b3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
@@ -21,53 +21,55 @@ import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
 
 public enum ConnectionStatusDescriptor {
-    INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+    INPUT_BYTES(
         "inputBytes",
         "Bytes In (5 mins)",
         "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getInputBytes())),
+        ConnectionStatus::getInputBytes),
 
-    INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+    INPUT_COUNT(
         "inputCount",
         "FlowFiles In (5 mins)",
         "The number of FlowFiles that were transferred to this Connection in the past 5 minutes",
         Formatter.COUNT,
-        s -> Long.valueOf(s.getInputCount()))),
+        s -> Long.valueOf(s.getInputCount())),
 
-    OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+    OUTPUT_BYTES(
         "outputBytes",
         "Bytes Out (5 mins)",
         "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getOutputBytes())),
+        ConnectionStatus::getOutputBytes),
 
-    OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+    OUTPUT_COUNT(
         "outputCount",
         "FlowFiles Out (5 mins)",
         "The number of FlowFiles that were pulled from this Connection in the past 5 minutes",
         Formatter.COUNT,
-        s -> Long.valueOf(s.getOutputCount()))),
+        s -> Long.valueOf(s.getOutputCount())),
 
-    QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+    QUEUED_BYTES(
         "queuedBytes",
         "Queued Bytes",
         "The number of Bytes queued in this Connection",
         Formatter.DATA_SIZE,
-        s -> s.getQueuedBytes())),
+        ConnectionStatus::getQueuedBytes),
 
-    QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+    QUEUED_COUNT(
         "queuedCount",
         "Queued Count",
         "The number of FlowFiles queued in this Connection",
         Formatter.COUNT,
-        s -> Long.valueOf(s.getQueuedCount())));
+        s -> Long.valueOf(s.getQueuedCount()));
 
 
     private MetricDescriptor<ConnectionStatus> descriptor;
 
-    private ConnectionStatusDescriptor(final MetricDescriptor<ConnectionStatus> descriptor) {
-        this.descriptor = descriptor;
+    ConnectionStatusDescriptor(final String field, final String label, final String description,
+                              final MetricDescriptor.Formatter formatter, final ValueMapper<ConnectionStatus> valueFunction) {
+
+        this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
     }
 
     public String getField() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java
new file mode 100644
index 0000000..68c04ca
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+public class EmptyStatusSnapshot implements StatusSnapshot {
+    private static final ValueReducer<StatusSnapshot, StatusSnapshot> VALUE_REDUCER = new EmptyValueReducer();
+    private static final Long METRIC_VALUE = 0L;
+
+    private final Date timestamp;
+    private final Set<MetricDescriptor<?>> metricsDescriptors;
+
+    public EmptyStatusSnapshot(final Date timestamp, final Set<MetricDescriptor<?>> metricsDescriptors) {
+        this.timestamp = timestamp;
+        this.metricsDescriptors = metricsDescriptors;
+    }
+
+    @Override
+    public Date getTimestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public Set<MetricDescriptor<?>> getMetricDescriptors() {
+        return metricsDescriptors;
+    }
+
+    @Override
+    public Long getStatusMetric(final MetricDescriptor<?> descriptor) {
+        return METRIC_VALUE;
+    }
+
+    @Override
+    public StatusSnapshot withoutCounters() {
+        return this;
+    }
+
+    @Override
+    public ValueReducer<StatusSnapshot, StatusSnapshot> getValueReducer() {
+        return VALUE_REDUCER;
+    }
+
+    private static class EmptyValueReducer implements ValueReducer<StatusSnapshot, StatusSnapshot> {
+        @Override
+        public StatusSnapshot reduce(final List<StatusSnapshot> values) {
+            return (values == null || values.isEmpty()) ? null : values.get(0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java
new file mode 100644
index 0000000..d5cbeae
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+public interface IndexableMetric {
+    int getIndex();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java
new file mode 100644
index 0000000..215e364
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+public class MetricRollingBuffer {
+    private final int capacity;
+
+    private StatusSnapshot[] snapshots;
+    private int writeIndex = 0;
+    private int readIndex;
+    private boolean readExhausted;
+    private int count = 0;
+
+    public MetricRollingBuffer(final int maxCapacity) {
+        this.capacity = maxCapacity;
+    }
+
+    public void update(final StatusSnapshot snapshot) {
+        if (snapshot == null) {
+            return;
+        }
+
+        if (snapshots == null) {
+            snapshots = new StatusSnapshot[Math.min(capacity, 16)];
+        }
+
+        if (snapshots[writeIndex] == null) {
+            count++;
+        }
+
+        snapshots[writeIndex++] = snapshot;
+
+        if (writeIndex >= snapshots.length) {
+            if (snapshots.length < capacity) {
+                grow();
+            } else {
+                writeIndex = 0;
+            }
+        }
+    }
+
+    public int size() {
+        return count;
+    }
+
+    public void expireBefore(final Date date) {
+        if (snapshots == null) {
+            return;
+        }
+
+        int readIndex = writeIndex;
+        for (int i=0; i < snapshots.length; i++) {
+            final StatusSnapshot snapshot = snapshots[readIndex];
+            if (snapshot == null) {
+                readIndex++;
+                if (readIndex >= snapshots.length) {
+                    readIndex = 0;
+                }
+
+                continue;
+            }
+
+            final Date snapshotTimestamp = snapshot.getTimestamp();
+            if (snapshotTimestamp.after(date)) {
+                break;
+            }
+
+            snapshots[readIndex] = null;
+            count--;
+
+            readIndex++;
+            if (readIndex >= snapshots.length) {
+                readIndex = 0;
+            }
+        }
+
+        if (count < snapshots.length / 4 || snapshots.length - count > 128) {
+            // If we're using less than 1/4 of the array or we have at least 128 null entries, compact.
+            compact();
+        }
+    }
+
+    private void grow() {
+        final int initialSize = snapshots.length;
+        final int newSize = Math.min(capacity, snapshots.length + 64);
+        final StatusSnapshot[] newArray = new StatusSnapshot[newSize];
+        System.arraycopy(snapshots, 0, newArray, 0, snapshots.length);
+        snapshots = newArray;
+        writeIndex = initialSize;
+    }
+
+    private void compact() {
+        final StatusSnapshot[] newArray = new StatusSnapshot[count + 1];
+        int insertionIndex = 0;
+
+        int readIndex = writeIndex;
+        for (int i=0; i < snapshots.length; i++) {
+            final StatusSnapshot snapshot = snapshots[readIndex];
+            if (snapshot != null) {
+                newArray[insertionIndex++] = snapshot;
+            }
+
+            readIndex++;
+            if (readIndex >= snapshots.length) {
+                readIndex = 0;
+            }
+        }
+
+        snapshots = newArray;
+        writeIndex = count;
+        count = newArray.length - 1;
+    }
+
+    public List<StatusSnapshot> getSnapshots(final List<Date> timestamps, final boolean includeCounters, final Set<MetricDescriptor<?>> defaultStatusMetrics) {
+        if (snapshots == null) {
+            return Collections.emptyList();
+        }
+
+        final List<StatusSnapshot> list = new ArrayList<>(snapshots.length);
+
+        resetRead();
+
+        for (final Date timestamp : timestamps) {
+            final StatusSnapshot snapshot = getSnapshotForTimestamp(timestamp);
+            if (snapshot == null) {
+                list.add(new EmptyStatusSnapshot(timestamp, defaultStatusMetrics));
+            } else {
+                list.add(includeCounters ? snapshot : snapshot.withoutCounters());
+            }
+        }
+
+        return list;
+    }
+
+    private StatusSnapshot getSnapshotForTimestamp(final Date timestamp) {
+        while (!readExhausted) {
+            final StatusSnapshot snapshot = snapshots[readIndex];
+            if (snapshot == null) {
+                advanceRead();
+                continue;
+            }
+
+            final Date snapshotTimestamp = snapshot.getTimestamp();
+            if (snapshotTimestamp.before(timestamp)) {
+                advanceRead();
+                continue;
+            }
+
+            if (snapshotTimestamp.after(timestamp)) {
+                return null;
+            }
+
+            advanceRead();
+            return snapshot;
+        }
+
+        return null;
+    }
+
+    private void resetRead() {
+        readIndex = writeIndex;
+        readExhausted = false;
+    }
+
+    private void advanceRead() {
+        readIndex++;
+
+        if (readIndex >= snapshots.length) {
+            readIndex = 0;
+        }
+
+        if (readIndex == writeIndex) {
+            readExhausted = true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
index 25b9dfc..6bdb3c0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
@@ -17,79 +17,90 @@
 
 package org.apache.nifi.controller.status.history;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
 
+import java.util.concurrent.TimeUnit;
+
 public enum ProcessGroupStatusDescriptor {
 
-    BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>(
+    BYTES_READ(
         "bytesRead",
         "Bytes Read (5 mins)",
         "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getBytesRead())),
+        ProcessGroupStatus::getBytesRead),
 
-    BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten",
+    BYTES_WRITTEN(
+        "bytesWritten",
         "Bytes Written (5 mins)",
         "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getBytesWritten())),
+        ProcessGroupStatus::getBytesWritten),
 
-    BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred",
+    BYTES_TRANSFERRED(
+        "bytesTransferred",
         "Bytes Transferred (5 mins)",
         "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getBytesRead() + s.getBytesWritten())),
+        s -> s.getBytesRead() + s.getBytesWritten()),
 
-    INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes",
+    INPUT_BYTES("inputBytes",
         "Bytes In (5 mins)",
         "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getInputContentSize())),
+        ProcessGroupStatus::getInputContentSize),
 
-    INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount",
+    INPUT_COUNT(
+        "inputCount",
         "FlowFiles In (5 mins)",
         "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
         Formatter.COUNT,
-        s -> s.getInputCount().longValue())),
+        s -> s.getInputCount().longValue()),
 
-    OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes",
+    OUTPUT_BYTES(
+        "outputBytes",
         "Bytes Out (5 mins)",
         "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getOutputContentSize())),
+        ProcessGroupStatus::getOutputContentSize),
 
-    OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount",
+    OUTPUT_COUNT(
+        "outputCount",
         "FlowFiles Out (5 mins)",
         "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
         Formatter.COUNT,
-        s -> s.getOutputCount().longValue())),
+        s -> s.getOutputCount().longValue()),
 
-    QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes",
+    QUEUED_BYTES(
+        "queuedBytes",
         "Queued Bytes",
         "The cumulative size of all FlowFiles queued in all Connections of this Process Group",
         Formatter.DATA_SIZE,
-        s -> s.getQueuedContentSize())),
+        ProcessGroupStatus::getQueuedContentSize),
 
-    QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount",
+    QUEUED_COUNT(
+        "queuedCount",
         "Queued Count",
         "The number of FlowFiles queued in all Connections of this Process Group",
         Formatter.COUNT,
-        s -> s.getQueuedCount().longValue())),
+        s -> s.getQueuedCount().longValue()),
 
-    TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis",
+    TASK_MILLIS(
+        "taskMillis",
         "Total Task Duration (5 mins)",
         "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes",
         Formatter.DURATION,
-        s -> calculateTaskMillis(s)));
+        ProcessGroupStatusDescriptor::calculateTaskMillis);
+
 
     private MetricDescriptor<ProcessGroupStatus> descriptor;
 
-    private ProcessGroupStatusDescriptor(final MetricDescriptor<ProcessGroupStatus> descriptor) {
-        this.descriptor = descriptor;
+    ProcessGroupStatusDescriptor(final String field, final String label, final String description,
+                               final MetricDescriptor.Formatter formatter, final ValueMapper<ProcessGroupStatus> valueFunction) {
+
+        this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
     }
 
     public String getField() {