You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/08/21 15:44:54 UTC

[1/2] nifi git commit: NIFI-5466: Keep a running total of stats for each component. Refactored FlowFileEvent and repository in order to provide more efficient storage of objects on Java heap by allowing the same 'EMPTY' object to be reused - Refactored

Repository: nifi
Updated Branches:
  refs/heads/master 410176ed2 -> 7bbb5a823


http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
index f939273..c8f5126 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
@@ -17,91 +17,92 @@
 
 package org.apache.nifi.controller.status.history;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
 
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 public enum ProcessorStatusDescriptor {
-    BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>(
+    BYTES_READ(
         "bytesRead",
         "Bytes Read (5 mins)",
         "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getBytesRead())),
+        ProcessorStatus::getBytesRead),
 
-    BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>(
+    BYTES_WRITTEN(
         "bytesWritten",
         "Bytes Written (5 mins)",
         "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getBytesWritten())),
+        ProcessorStatus::getBytesWritten),
 
-    BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>(
+    BYTES_TRANSFERRED(
         "bytesTransferred",
         "Bytes Transferred (5 mins)",
         "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getBytesRead() + s.getBytesWritten())),
+        s -> s.getBytesRead() + s.getBytesWritten()),
 
-    INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>(
+    INPUT_BYTES(
         "inputBytes",
         "Bytes In (5 mins)",
         "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getInputBytes())),
+        ProcessorStatus::getInputBytes),
 
-    INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+    INPUT_COUNT(
         "inputCount",
         "FlowFiles In (5 mins)",
         "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes",
         Formatter.COUNT,
-        s -> Long.valueOf(s.getInputCount()))),
+        s -> Long.valueOf(s.getInputCount())),
 
-    OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>(
+    OUTPUT_BYTES(
         "outputBytes",
         "Bytes Out (5 mins)",
         "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getOutputBytes())),
+        ProcessorStatus::getOutputBytes),
 
-    OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+    OUTPUT_COUNT(
         "outputCount",
         "FlowFiles Out (5 mins)",
         "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes",
         Formatter.COUNT,
-        s -> Long.valueOf(s.getOutputCount()))),
+        s -> Long.valueOf(s.getOutputCount())),
 
-    TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+    TASK_COUNT(
         "taskCount",
         "Tasks (5 mins)",
         "The number of tasks that this Processor has completed in the past 5 minutes",
         Formatter.COUNT,
-        s -> Long.valueOf(s.getInvocations()))),
+        s -> Long.valueOf(s.getInvocations())),
 
-    TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>(
+    TASK_MILLIS(
         "taskMillis",
         "Total Task Duration (5 mins)",
         "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes",
         Formatter.DURATION,
-        s -> TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), TimeUnit.NANOSECONDS))),
+        s -> TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), TimeUnit.NANOSECONDS)),
 
-    TASK_NANOS(new StandardMetricDescriptor<ProcessorStatus>(
+    TASK_NANOS(
         "taskNanos",
         "Total Task Time (nanos)",
         "The total number of thread-nanoseconds that the Processor has used to complete its tasks in the past 5 minutes",
         Formatter.COUNT,
-        ProcessorStatus::getProcessingNanos), false),
+        ProcessorStatus::getProcessingNanos,
+        false),
 
-    FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>(
+    FLOWFILES_REMOVED(
         "flowFilesRemoved",
         "FlowFiles Removed (5 mins)",
         "The total number of FlowFiles removed by this Processor in the last 5 minutes",
         Formatter.COUNT,
-        s -> Long.valueOf(s.getFlowFilesRemoved()))),
+        s -> Long.valueOf(s.getFlowFilesRemoved())),
 
-    AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>(
+    AVERAGE_LINEAGE_DURATION(
         "averageLineageDuration",
         "Average Lineage Duration (5 mins)",
         "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.",
@@ -114,23 +115,24 @@ public enum ProcessorStatusDescriptor {
                 int count = 0;
 
                 for (final StatusSnapshot snapshot : values) {
-                    final long removed = snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue();
-                    final long outputCount = snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue();
+                    final long removed = snapshot.getStatusMetric(FLOWFILES_REMOVED.getDescriptor()).longValue();
+                    final long outputCount = snapshot.getStatusMetric(OUTPUT_COUNT.getDescriptor()).longValue();
                     final long processed = removed + outputCount;
 
                     count += processed;
 
-                    final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
+                    final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
                     final long totalMillis = avgMillis * processed;
                     millis += totalMillis;
                 }
 
                 return count == 0 ? 0 : millis / count;
             }
-        }
-    )),
+        },
+        true
+    ),
 
-    AVERAGE_TASK_NANOS(new StandardMetricDescriptor<ProcessorStatus>(
+    AVERAGE_TASK_NANOS(
         "averageTaskNanos",
         "Average Task Duration (nanoseconds)",
         "The average number of nanoseconds it took this Processor to complete a task, over the past 5 minutes",
@@ -143,12 +145,12 @@ public enum ProcessorStatusDescriptor {
                 int invocations = 0;
 
                 for (final StatusSnapshot snapshot : values) {
-                    final Long taskNanos = snapshot.getStatusMetrics().get(TASK_NANOS.getDescriptor());
+                    final Long taskNanos = snapshot.getStatusMetric(TASK_NANOS.getDescriptor());
                     if (taskNanos != null) {
                         procNanos += taskNanos.longValue();
                     }
 
-                    final Long taskInvocations = snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor());
+                    final Long taskInvocations = snapshot.getStatusMetric(TASK_COUNT.getDescriptor());
                     if (taskInvocations != null) {
                         invocations += taskInvocations.intValue();
                     }
@@ -160,22 +162,38 @@ public enum ProcessorStatusDescriptor {
 
                 return procNanos / invocations;
             }
-        }));
+        },
+        true
+    );
 
 
 
     private final MetricDescriptor<ProcessorStatus> descriptor;
     private final boolean visible;
 
-    private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor) {
-        this(descriptor, true);
+    ProcessorStatusDescriptor(final String field, final String label, final String description,
+                              final MetricDescriptor.Formatter formatter, final ValueMapper<ProcessorStatus> valueFunction) {
+
+        this(field, label, description, formatter, valueFunction, true);
     }
 
-    private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor, final boolean visible) {
-        this.descriptor = descriptor;
+    ProcessorStatusDescriptor(final String field, final String label, final String description,
+                              final MetricDescriptor.Formatter formatter, final ValueMapper<ProcessorStatus> valueFunction, final boolean visible) {
+
+        this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
         this.visible = visible;
     }
 
+    ProcessorStatusDescriptor(final String field, final String label, final String description,
+                              final MetricDescriptor.Formatter formatter, final ValueMapper<ProcessorStatus> valueFunction,
+                              final ValueReducer<StatusSnapshot, Long> reducer, final boolean visible) {
+
+        this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction, reducer);
+        this.visible = visible;
+    }
+
+
+
     public String getField() {
         return descriptor.getField();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
index 5875249..6b131d2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
@@ -24,43 +24,49 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 public enum RemoteProcessGroupStatusDescriptor {
-    SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes",
+    SENT_BYTES(
+        "sentBytes",
         "Bytes Sent (5 mins)",
         "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getSentContentSize())),
+        RemoteProcessGroupStatus::getSentContentSize),
 
-    SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount",
+    SENT_COUNT(
+        "sentCount",
         "FlowFiles Sent (5 mins)",
         "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes",
         Formatter.COUNT,
-        s -> s.getSentCount().longValue())),
+        s -> s.getSentCount().longValue()),
 
-    RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes",
+    RECEIVED_BYTES(
+        "receivedBytes",
         "Bytes Received (5 mins)",
         "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getReceivedContentSize())),
+        RemoteProcessGroupStatus::getReceivedContentSize),
 
-    RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount",
+    RECEIVED_COUNT(
+        "receivedCount",
         "FlowFiles Received (5 mins)",
         "The number of FlowFiles that have been received from the remote system in the past 5 minutes",
         Formatter.COUNT,
-        s -> s.getReceivedCount().longValue())),
+        s -> s.getReceivedCount().longValue()),
 
-    RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond",
+    RECEIVED_BYTES_PER_SECOND(
+        "receivedBytesPerSecond",
         "Received Bytes Per Second",
         "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
         Formatter.DATA_SIZE,
-        s -> s.getReceivedContentSize().longValue() / 300L)),
+        s -> s.getReceivedContentSize().longValue() / 300L),
 
-    SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond",
+    SENT_BYTES_PER_SECOND(
+        "sentBytesPerSecond",
         "Sent Bytes Per Second",
         "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
         Formatter.DATA_SIZE,
-        s -> s.getSentContentSize().longValue() / 300L)),
+        s -> s.getSentContentSize().longValue() / 300L),
 
-    TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond",
+    TOTAL_BYTES_PER_SECOND("totalBytesPerSecond",
         "Total Bytes Per Second",
         "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second",
         Formatter.DATA_SIZE,
@@ -69,9 +75,9 @@ public enum RemoteProcessGroupStatusDescriptor {
             public Long getValue(final RemoteProcessGroupStatus status) {
                 return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L);
             }
-        })),
+        }),
 
-    AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>(
+    AVERAGE_LINEAGE_DURATION(
         "averageLineageDuration",
         "Average Lineage Duration (5 mins)",
         "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.",
@@ -84,22 +90,29 @@ public enum RemoteProcessGroupStatusDescriptor {
                 int count = 0;
 
                 for (final StatusSnapshot snapshot : values) {
-                    final long sent = snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue();
+                    final long sent = snapshot.getStatusMetric(SENT_COUNT.getDescriptor()).longValue();
                     count += sent;
 
-                    final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
+                    final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
                     final long totalMillis = avgMillis * sent;
                     millis += totalMillis;
                 }
 
                 return count == 0 ? 0 : millis / count;
             }
-        }));
+        });
+
 
     private final MetricDescriptor<RemoteProcessGroupStatus> descriptor;
 
-    private RemoteProcessGroupStatusDescriptor(final MetricDescriptor<RemoteProcessGroupStatus> descriptor) {
-        this.descriptor = descriptor;
+    RemoteProcessGroupStatusDescriptor(final String field, final String label, final String description,
+                               final MetricDescriptor.Formatter formatter, final ValueMapper<RemoteProcessGroupStatus> valueFunction) {
+        this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
+    }
+
+    RemoteProcessGroupStatusDescriptor(final String field, final String label, final String description,
+                                       final MetricDescriptor.Formatter formatter, final ValueMapper<RemoteProcessGroupStatus> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) {
+        this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction, reducer);
     }
 
     public String getField() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
index 6970fce..de5e0b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
@@ -20,6 +20,7 @@ import java.util.List;
 
 public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
 
+    private final IndexableMetric indexableMetric;
     private final String field;
     private final String label;
     private final String description;
@@ -27,12 +28,14 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
     private final ValueMapper<T> valueMapper;
     private final ValueReducer<StatusSnapshot, Long> reducer;
 
-    public StandardMetricDescriptor(final String field, final String label, final String description, final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction) {
-        this(field, label, description, formatter, valueFunction, null);
+    public StandardMetricDescriptor(final IndexableMetric indexableMetric, final String field, final String label, final String description,
+                                    final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction) {
+        this(indexableMetric, field, label, description, formatter, valueFunction, null);
     }
 
-    public StandardMetricDescriptor(final String field, final String label, final String description,
-            final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) {
+    public StandardMetricDescriptor(final IndexableMetric indexableMetric, final String field, final String label, final String description,
+                                    final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) {
+        this.indexableMetric = indexableMetric;
         this.field = field;
         this.label = label;
         this.description = description;
@@ -42,6 +45,11 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
     }
 
     @Override
+    public int getMetricIdentifier() {
+        return indexableMetric.getIndex();
+    }
+
+    @Override
     public String getField() {
         return field;
     }
@@ -100,7 +108,7 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
         public Long reduce(final List<StatusSnapshot> values) {
             long sum = 0;
             for (final StatusSnapshot snapshot : values) {
-                sum += snapshot.getStatusMetrics().get(StandardMetricDescriptor.this);
+                sum += snapshot.getStatusMetric(StandardMetricDescriptor.this);
             }
 
             return sum;

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusHistory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusHistory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusHistory.java
index 9299b0d..96b3759 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusHistory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusHistory.java
@@ -16,18 +16,25 @@
  */
 package org.apache.nifi.controller.status.history;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 public class StandardStatusHistory implements StatusHistory {
+    private final List<StatusSnapshot> snapshots;
+    private final Date generated;
+    private final Map<String, String> componentDetails;
+
+    public StandardStatusHistory(final List<StatusSnapshot> snapshots, final Map<String, String> componentDetails, final Date generated) {
+        this.snapshots = snapshots;
+        this.generated = generated;
+        this.componentDetails = componentDetails;
+    }
 
-    private final List<StatusSnapshot> snapshots = new ArrayList<>();
-    private final Date generated = new Date();
-    private final Map<String, String> componentDetails = new LinkedHashMap<>();
+    @Override
+    public List<StatusSnapshot> getStatusSnapshots() {
+        return snapshots;
+    }
 
     @Override
     public Date getDateGenerated() {
@@ -36,19 +43,6 @@ public class StandardStatusHistory implements StatusHistory {
 
     @Override
     public Map<String, String> getComponentDetails() {
-        return Collections.unmodifiableMap(componentDetails);
-    }
-
-    public void setComponentDetail(final String detailName, final String detailValue) {
-        componentDetails.put(detailName, detailValue);
-    }
-
-    @Override
-    public List<StatusSnapshot> getStatusSnapshots() {
-        return Collections.unmodifiableList(snapshots);
-    }
-
-    public void addStatusSnapshot(final StatusSnapshot snapshot) {
-        snapshots.add(snapshot);
+        return componentDetails;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
index abaf899..4662753 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
@@ -17,7 +17,7 @@
 package org.apache.nifi.controller.status.history;
 
 import java.util.Date;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -25,25 +25,63 @@ import java.util.Set;
 
 public class StandardStatusSnapshot implements StatusSnapshot {
 
-    private final Map<MetricDescriptor<?>, Long> metricValues = new LinkedHashMap<>();
+    private final Set<MetricDescriptor<?>> metricDescriptors;
+    private final long[] values;
+
+    private Map<MetricDescriptor<?>, Long> counterValues = null;
     private Date timestamp = new Date();
 
+
+    public StandardStatusSnapshot(final Set<MetricDescriptor<?>> metricDescriptors) {
+        this.metricDescriptors = metricDescriptors;
+        values = new long[metricDescriptors.size()];
+    }
+
+    private StandardStatusSnapshot(final Set<MetricDescriptor<?>> metricDescriptors, final long[] values) {
+        this.metricDescriptors = metricDescriptors;
+        this.values = values;
+    }
+
     @Override
     public Date getTimestamp() {
         return timestamp;
     }
 
-    public void setTimestamp(final Date timestamp) {
-        this.timestamp = timestamp;
+    @Override
+    public Set<MetricDescriptor<?>> getMetricDescriptors() {
+        return metricDescriptors;
     }
 
     @Override
-    public Map<MetricDescriptor<?>, Long> getStatusMetrics() {
-        return metricValues;
+    public Long getStatusMetric(final MetricDescriptor<?> descriptor) {
+        return values[descriptor.getMetricIdentifier()];
+    }
+
+    public void setTimestamp(final Date timestamp) {
+        this.timestamp = timestamp;
     }
 
+
     public void addStatusMetric(final MetricDescriptor<?> metric, final Long value) {
-        metricValues.put(metric, value);
+        values[metric.getMetricIdentifier()] = value;
+    }
+
+    public void addCounterStatusMetric(final MetricDescriptor<?> metric, final Long value) {
+        if (counterValues == null) {
+            counterValues = new HashMap<>();
+        }
+
+        counterValues.put(metric, value);
+    }
+
+    public StandardStatusSnapshot withoutCounters() {
+        if (counterValues == null) {
+            return this;
+        }
+
+        final StandardStatusSnapshot without = new StandardStatusSnapshot(metricDescriptors, values);
+        without.setTimestamp(timestamp);
+        return without;
     }
 
     @Override
@@ -52,16 +90,16 @@ public class StandardStatusSnapshot implements StatusSnapshot {
             @Override
             public StatusSnapshot reduce(final List<StatusSnapshot> values) {
                 Date reducedTimestamp = null;
-                final Set<MetricDescriptor<?>> allDescriptors = new LinkedHashSet<>(metricValues.keySet());
+                final Set<MetricDescriptor<?>> allDescriptors = new LinkedHashSet<>(metricDescriptors);
 
                 for (final StatusSnapshot statusSnapshot : values) {
                     if (reducedTimestamp == null) {
                         reducedTimestamp = statusSnapshot.getTimestamp();
                     }
-                    allDescriptors.addAll(statusSnapshot.getStatusMetrics().keySet());
+                    allDescriptors.addAll(statusSnapshot.getMetricDescriptors());
                 }
 
-                final StandardStatusSnapshot reduced = new StandardStatusSnapshot();
+                final StandardStatusSnapshot reduced = new StandardStatusSnapshot(allDescriptors);
                 if (reducedTimestamp != null) {
                     reduced.setTimestamp(reducedTimestamp);
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
index a126589..9ac3909 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
@@ -16,6 +16,10 @@
  */
 package org.apache.nifi.controller.status.history;
 
+import org.apache.nifi.web.api.dto.status.StatusDescriptorDTO;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
@@ -27,10 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.web.api.dto.status.StatusDescriptorDTO;
-import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
-
 public class StatusHistoryUtil {
 
     public static StatusHistoryDTO createStatusHistoryDTO(final StatusHistory statusHistory) {
@@ -43,7 +43,7 @@ public class StatusHistoryUtil {
             final StatusSnapshotDTO snapshotDto = StatusHistoryUtil.createStatusSnapshotDto(snapshot);
             snapshotDtos.add(snapshotDto);
             metricNames.addAll(snapshotDto.getStatusMetrics().keySet());
-            metricDescriptors.addAll(snapshot.getStatusMetrics().keySet());
+            metricDescriptors.addAll(snapshot.getMetricDescriptors());
         }
 
         // We need to ensure that the 'aggregate snapshot' has an entry for every metric, including counters.
@@ -94,9 +94,7 @@ public class StatusHistoryUtil {
 
         final Set<MetricDescriptor<?>> allDescriptors = new LinkedHashSet<>();
         for (final StatusSnapshot statusSnapshot : statusHistory.getStatusSnapshots()) {
-            for (final MetricDescriptor<?> metricDescriptor : statusSnapshot.getStatusMetrics().keySet()) {
-                allDescriptors.add(metricDescriptor);
-            }
+            allDescriptors.addAll(statusSnapshot.getMetricDescriptors());
         }
 
         for (final MetricDescriptor<?> metricDescriptor : allDescriptors) {
@@ -111,8 +109,8 @@ public class StatusHistoryUtil {
 
         dto.setTimestamp(statusSnapshot.getTimestamp());
         final Map<String, Long> statusMetrics = new HashMap<>();
-        for (final Map.Entry<MetricDescriptor<?>, Long> entry : statusSnapshot.getStatusMetrics().entrySet()) {
-            statusMetrics.put(entry.getKey().getField(), entry.getValue());
+        for (final MetricDescriptor<?> descriptor : statusSnapshot.getMetricDescriptors()) {
+            statusMetrics.put(descriptor.getField(), statusSnapshot.getStatusMetric(descriptor));
         }
         dto.setStatusMetrics(statusMetrics);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
index 6b90030..5336d17 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
@@ -16,43 +16,64 @@
  */
 package org.apache.nifi.controller.status.history;
 
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
-import org.apache.nifi.util.ComponentStatusReport;
-import org.apache.nifi.util.ComponentStatusReport.ComponentType;
+import org.apache.nifi.util.ComponentMetrics;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.RingBuffer;
-import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 public class VolatileComponentStatusRepository implements ComponentStatusRepository {
+    private static final Logger logger = LoggerFactory.getLogger(VolatileComponentStatusRepository.class);
+
+    private static final Set<MetricDescriptor<?>> DEFAULT_PROCESSOR_METRICS = Arrays.stream(ProcessorStatusDescriptor.values())
+        .map(ProcessorStatusDescriptor::getDescriptor)
+        .collect(Collectors.toSet());
+    private static final Set<MetricDescriptor<?>> DEFAULT_CONNECTION_METRICS = Arrays.stream(ConnectionStatusDescriptor.values())
+        .map(ConnectionStatusDescriptor::getDescriptor)
+        .collect(Collectors.toSet());
+    private static final Set<MetricDescriptor<?>> DEFAULT_GROUP_METRICS = Arrays.stream(ProcessGroupStatusDescriptor.values())
+        .map(ProcessGroupStatusDescriptor::getDescriptor)
+        .collect(Collectors.toSet());
+    private static final Set<MetricDescriptor<?>> DEFAULT_RPG_METRICS = Arrays.stream(RemoteProcessGroupStatusDescriptor.values())
+        .map(RemoteProcessGroupStatusDescriptor::getDescriptor)
+        .collect(Collectors.toSet());
+
 
     public static final String NUM_DATA_POINTS_PROPERTY = "nifi.components.status.repository.buffer.size";
     public static final int DEFAULT_NUM_DATA_POINTS = 288;   // 1 day worth of 5-minute snapshots
 
-    private final RingBuffer<Capture> captures;
-    private final Logger logger = LoggerFactory.getLogger(VolatileComponentStatusRepository.class);
+    private final Map<String, ComponentStatusHistory> componentStatusHistories = new HashMap<>();
 
+    private final RingBuffer<Date> timestamps;
+    private final RingBuffer<List<GarbageCollectionStatus>> gcStatuses;
+    private final int numDataPoints;
     private volatile long lastCaptureTime = 0L;
 
     /**
      * Default no args constructor for service loading only
      */
-    public VolatileComponentStatusRepository(){
-        captures = null;
+    public VolatileComponentStatusRepository() {
+        numDataPoints = DEFAULT_NUM_DATA_POINTS;
+        gcStatuses = null;
+        timestamps = null;
     }
 
     public VolatileComponentStatusRepository(final NiFiProperties nifiProperties) {
-        final int numDataPoints = nifiProperties.getIntegerProperty(NUM_DATA_POINTS_PROPERTY, DEFAULT_NUM_DATA_POINTS);
-        captures = new RingBuffer<>(numDataPoints);
+        numDataPoints = nifiProperties.getIntegerProperty(NUM_DATA_POINTS_PROPERTY, DEFAULT_NUM_DATA_POINTS);
+        gcStatuses = new RingBuffer<>(numDataPoints);
+        timestamps = new RingBuffer<>(numDataPoints);
     }
 
     @Override
@@ -62,214 +83,116 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
 
     @Override
     public synchronized void capture(final ProcessGroupStatus rootGroupStatus, final List<GarbageCollectionStatus> gcStatus, final Date timestamp) {
-        final ComponentStatusReport statusReport = ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentType.PROCESSOR,
-            ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, ComponentType.REMOTE_PROCESS_GROUP);
+        final Date evicted = timestamps.add(timestamp);
+        if (evicted != null) {
+            componentStatusHistories.values().forEach(history -> history.expireBefore(evicted));
+        }
+
+        capture(rootGroupStatus, timestamp);
+        gcStatuses.add(gcStatus);
 
-        captures.add(new Capture(timestamp, statusReport, gcStatus));
         logger.debug("Captured metrics for {}", this);
         lastCaptureTime = Math.max(lastCaptureTime, timestamp.getTime());
     }
 
-    @Override
-    public Date getLastCaptureDate() {
-        return new Date(lastCaptureTime);
-    }
 
-    @Override
-    public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints, final boolean includeCounters) {
-        final StandardStatusHistory history = new StandardStatusHistory();
-        history.setComponentDetail(COMPONENT_DETAIL_ID, processorId);
-
-        captures.forEach(new ForEachEvaluator<Capture>() {
-            @Override
-            public boolean evaluate(final Capture capture) {
-                final ComponentStatusReport statusReport = capture.getStatusReport();
-                final ProcessorStatus status = statusReport.getProcessorStatus(processorId);
-                if (status == null) {
-                    return true;
-                }
+    private void capture(final ProcessGroupStatus groupStatus, final Date timestamp) {
+        // Capture status for the ProcessGroup
+        final ComponentDetails groupDetails = ComponentDetails.forProcessGroup(groupStatus);
+        final StatusSnapshot groupSnapshot = ComponentMetrics.createSnapshot(groupStatus, timestamp);
+        updateStatusHistory(groupSnapshot, groupDetails, timestamp);
 
-                history.setComponentDetail(COMPONENT_DETAIL_GROUP_ID, status.getGroupId());
-                history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName());
-                history.setComponentDetail(COMPONENT_DETAIL_TYPE, status.getType());
+        // Capture statuses for the Processors
+        for (final ProcessorStatus processorStatus : groupStatus.getProcessorStatus()) {
+            final ComponentDetails componentDetails = ComponentDetails.forProcessor(processorStatus);
+            final StatusSnapshot snapshot = ComponentMetrics.createSnapshot(processorStatus, timestamp);
+            updateStatusHistory(snapshot, componentDetails, timestamp);
+        }
 
-                final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
-                snapshot.setTimestamp(capture.getCaptureDate());
+        // Capture statuses for the Connections
+        for (final ConnectionStatus connectionStatus : groupStatus.getConnectionStatus()) {
+            final ComponentDetails componentDetails = ComponentDetails.forConnection(connectionStatus);
+            final StatusSnapshot snapshot = ComponentMetrics.createSnapshot(connectionStatus, timestamp);
+            updateStatusHistory(snapshot, componentDetails, timestamp);
+        }
 
-                for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
-                    if (descriptor.isVisible()) {
-                        snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
-                    }
-                }
+        // Capture statuses for the RPG's
+        for (final RemoteProcessGroupStatus rpgStatus : groupStatus.getRemoteProcessGroupStatus()) {
+            final ComponentDetails componentDetails = ComponentDetails.forRemoteProcessGroup(rpgStatus);
+            final StatusSnapshot snapshot = ComponentMetrics.createSnapshot(rpgStatus, timestamp);
+            updateStatusHistory(snapshot, componentDetails, timestamp);
+        }
 
-                if (includeCounters) {
-                    final Map<String, Long> counters = status.getCounters();
-                    if (counters != null) {
-                        for (final Map.Entry<String, Long> entry : counters.entrySet()) {
-                            final String counterName = entry.getKey();
+        // Capture statuses for the child groups
+        for (final ProcessGroupStatus childStatus : groupStatus.getProcessGroupStatus()) {
+            capture(childStatus, timestamp);
+        }
+    }
 
-                            final String label = entry.getKey() + " (5 mins)";
-                            final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(entry.getKey(), label, label, Formatter.COUNT,
-                                s -> s.getCounters() == null ? null : s.getCounters().get(counterName));
 
-                            snapshot.addStatusMetric(metricDescriptor, entry.getValue());
-                        }
-                    }
-                }
+    private void updateStatusHistory(final StatusSnapshot statusSnapshot, final ComponentDetails componentDetails, final Date timestamp) {
+        final String componentId = componentDetails.getComponentId();
+        final ComponentStatusHistory procHistory = componentStatusHistories.computeIfAbsent(componentId, id -> new ComponentStatusHistory(componentDetails, numDataPoints));
+        procHistory.update(statusSnapshot, componentDetails);
+    }
 
-                history.addStatusSnapshot(snapshot);
-                return true;
-            }
-        });
 
-        return history;
+    @Override
+    public Date getLastCaptureDate() {
+        return new Date(lastCaptureTime);
     }
 
     @Override
-    public StatusHistory getConnectionStatusHistory(final String connectionId, final Date start, final Date end, final int preferredDataPoints) {
-        final StandardStatusHistory history = new StandardStatusHistory();
-        history.setComponentDetail(COMPONENT_DETAIL_ID, connectionId);
-
-        captures.forEach(new ForEachEvaluator<Capture>() {
-            @Override
-            public boolean evaluate(final Capture capture) {
-                final ComponentStatusReport statusReport = capture.getStatusReport();
-                final ConnectionStatus status = statusReport.getConnectionStatus(connectionId);
-                if (status == null) {
-                    return true;
-                }
-
-                history.setComponentDetail(COMPONENT_DETAIL_GROUP_ID, status.getGroupId());
-                history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName());
-                history.setComponentDetail(COMPONENT_DETAIL_SOURCE_NAME, status.getSourceName());
-                history.setComponentDetail(COMPONENT_DETAIL_DESTINATION_NAME, status.getDestinationName());
-
-                final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
-                snapshot.setTimestamp(capture.getCaptureDate());
-
-                for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
-                    snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
-                }
-
-                history.addStatusSnapshot(snapshot);
-                return true;
-            }
-        });
+    public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints, final boolean includeCounters) {
+        return getStatusHistory(processorId, includeCounters, DEFAULT_PROCESSOR_METRICS);
+    }
 
-        return history;
+    @Override
+    public StatusHistory getConnectionStatusHistory(final String connectionId, final Date start, final Date end, final int preferredDataPoints) {
+        return getStatusHistory(connectionId, true, DEFAULT_CONNECTION_METRICS);
     }
 
     @Override
     public StatusHistory getProcessGroupStatusHistory(final String processGroupId, final Date start, final Date end, final int preferredDataPoints) {
-        final StandardStatusHistory history = new StandardStatusHistory();
-        history.setComponentDetail(COMPONENT_DETAIL_ID, processGroupId);
-
-        captures.forEach(new ForEachEvaluator<Capture>() {
-            @Override
-            public boolean evaluate(final Capture capture) {
-                final ComponentStatusReport statusReport = capture.getStatusReport();
-                final ProcessGroupStatus status = statusReport.getProcessGroupStatus(processGroupId);
-                if (status == null) {
-                    return true;
-                }
-
-                history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName());
-
-                final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
-                snapshot.setTimestamp(capture.getCaptureDate());
-
-                for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
-                    snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
-                }
-
-                history.addStatusSnapshot(snapshot);
-                return true;
-            }
-        });
-
-        return history;
+        return getStatusHistory(processGroupId, true, DEFAULT_GROUP_METRICS);
     }
 
     @Override
     public StatusHistory getRemoteProcessGroupStatusHistory(final String remoteGroupId, final Date start, final Date end, final int preferredDataPoints) {
-        final StandardStatusHistory history = new StandardStatusHistory();
-        history.setComponentDetail(COMPONENT_DETAIL_ID, remoteGroupId);
-
-        captures.forEach(new ForEachEvaluator<Capture>() {
-            @Override
-            public boolean evaluate(final Capture capture) {
-                final ComponentStatusReport statusReport = capture.getStatusReport();
-                final RemoteProcessGroupStatus status = statusReport.getRemoteProcessGroupStatus(remoteGroupId);
-                if (status == null) {
-                    return true;
-                }
-
-                history.setComponentDetail(COMPONENT_DETAIL_GROUP_ID, status.getGroupId());
-                history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName());
-                history.setComponentDetail(COMPONENT_DETAIL_URI, status.getTargetUri());
+        return getStatusHistory(remoteGroupId, true, DEFAULT_RPG_METRICS);
+    }
 
-                final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
-                snapshot.setTimestamp(capture.getCaptureDate());
 
-                for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
-                    snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
-                }
-
-                history.addStatusSnapshot(snapshot);
-                return true;
-            }
-        });
+    private synchronized StatusHistory getStatusHistory(final String componentId, final boolean includeCounters, final Set<MetricDescriptor<?>> defaultMetricDescriptors) {
+        final ComponentStatusHistory history = componentStatusHistories.get(componentId);
+        if (history == null) {
+            return null;
+        }
 
-        return history;
+        final List<Date> dates = timestamps.asList();
+        return history.toStatusHistory(dates, includeCounters, defaultMetricDescriptors);
     }
 
+
     @Override
     public GarbageCollectionHistory getGarbageCollectionHistory(final Date start, final Date end) {
         final StandardGarbageCollectionHistory history = new StandardGarbageCollectionHistory();
 
-        captures.forEach(new ForEachEvaluator<Capture>() {
-            @Override
-            public boolean evaluate(final Capture capture) {
-                if (capture.getCaptureDate().before(start)) {
-                    return true;
-                }
-                if (capture.getCaptureDate().after(end)) {
-                    return false;
+        gcStatuses.forEach(statusSet -> {
+            for (final GarbageCollectionStatus gcStatus : statusSet) {
+                if (gcStatus.getTimestamp().before(start)) {
+                    continue;
                 }
-
-                final List<GarbageCollectionStatus> statuses = capture.getGarbageCollectionStatus();
-                if (statuses != null) {
-                    statuses.stream().forEach(history::addGarbageCollectionStatus);
+                if (gcStatus.getTimestamp().after(end)) {
+                    continue;
                 }
 
-                return true;
+                history.addGarbageCollectionStatus(gcStatus);
             }
+
+            return true;
         });
 
         return history;
     }
-
-    private static class Capture {
-        private final Date captureDate;
-        private final ComponentStatusReport statusReport;
-        private final List<GarbageCollectionStatus> gcStatus;
-
-        public Capture(final Date date, final ComponentStatusReport statusReport, final List<GarbageCollectionStatus> gcStatus) {
-            this.captureDate = date;
-            this.statusReport = statusReport;
-            this.gcStatus = gcStatus;
-        }
-
-        public Date getCaptureDate() {
-            return captureDate;
-        }
-
-        public ComponentStatusReport getStatusReport() {
-            return statusReport;
-        }
-
-        public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
-            return gcStatus;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 1a98ee3..e68aba8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -16,10 +16,6 @@
  */
 package org.apache.nifi.controller.tasks;
 
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
@@ -35,8 +31,8 @@ import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
 import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory;
 import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
 import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
-import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
 import org.apache.nifi.controller.scheduling.LifecycleState;
+import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
 import org.apache.nifi.controller.scheduling.SchedulingAgent;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.logging.ComponentLog;
@@ -51,6 +47,10 @@ import org.apache.nifi.util.Connectables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Continually runs a <code>{@link Connectable}</code> component as long as the component has work to do.
  * {@link #invoke()} ()} will return <code>{@link InvocationResult}</code> telling if the component should be yielded.
@@ -267,10 +267,10 @@ public class ConnectableTask {
                 final long processingNanos = System.nanoTime() - startNanos;
 
                 try {
-                    final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier());
+                    final StandardFlowFileEvent procEvent = new StandardFlowFileEvent();
                     procEvent.setProcessingNanos(processingNanos);
                     procEvent.setInvocations(invocationCount);
-                    repositoryContext.getFlowFileEventRepository().updateRepository(procEvent);
+                    repositoryContext.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier());
                 } catch (final IOException e) {
                     logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable.getRunnableComponent(), e.toString());
                     logger.error("", e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index edfa355..e173cc6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -957,7 +957,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     public Collection<ProcessorNode> getProcessors() {
         readLock.lock();
         try {
-            return processors.values();
+            return new HashSet<>(processors.values());
         } finally {
             readLock.unlock();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
new file mode 100644
index 0000000..9c1505c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
+import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class ComponentMetrics {
+    private static final Set<MetricDescriptor<?>> PROCESSOR_METRICS;
+    private static final Set<MetricDescriptor<?>> CONNECTION_METRICS;
+    private static final Set<MetricDescriptor<?>> PROCESS_GROUP_METRICS;
+    private static final Set<MetricDescriptor<?>> RPG_METRICS;
+
+    static {
+        PROCESSOR_METRICS = new HashSet<>();
+        CONNECTION_METRICS = new HashSet<>();
+        PROCESS_GROUP_METRICS = new HashSet<>();
+        RPG_METRICS = new HashSet<>();
+
+        for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
+            PROCESSOR_METRICS.add(descriptor.getDescriptor());
+        }
+
+        for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
+            CONNECTION_METRICS.add(descriptor.getDescriptor());
+        }
+
+        for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
+            PROCESS_GROUP_METRICS.add(descriptor.getDescriptor());
+        }
+
+        for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
+            RPG_METRICS.add(descriptor.getDescriptor());
+        }
+    }
+
+
+    public static StatusSnapshot createSnapshot(final ProcessorStatus status, final Date timestamp) {
+        if (isEmpty(status)) {
+            return null;
+        }
+
+        final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS);
+        snapshot.setTimestamp(timestamp);
+
+        for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
+            if (descriptor.isVisible()) {
+                snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
+            }
+        }
+
+        final Map<String, Long> counters = status.getCounters();
+        if (counters != null) {
+            for (final Map.Entry<String, Long> entry : counters.entrySet()) {
+                final String counterName = entry.getKey();
+
+                final String label = entry.getKey() + " (5 mins)";
+                final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(() -> 0, entry.getKey(), label, label, MetricDescriptor.Formatter.COUNT,
+                        s -> s.getCounters() == null ? null : s.getCounters().get(counterName));
+
+                snapshot.addCounterStatusMetric(metricDescriptor, entry.getValue());
+            }
+        }
+
+        return snapshot;
+    }
+
+    public static boolean isEmpty(final ProcessorStatus status) {
+        for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
+            if (descriptor.isVisible()) {
+                final Long value = descriptor.getDescriptor().getValueFunction().getValue(status);
+                if (value != null && value > 0) {
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
+
+    public static StatusSnapshot createSnapshot(final ConnectionStatus status,  final Date timestamp) {
+        if (isEmpty(status)) {
+            return null;
+        }
+
+        final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(CONNECTION_METRICS);
+        snapshot.setTimestamp(timestamp);
+
+        for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
+            snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
+        }
+
+        return snapshot;
+    }
+
+    public static boolean isEmpty(final ConnectionStatus status) {
+        for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
+            final Long value = descriptor.getDescriptor().getValueFunction().getValue(status);
+            if (value != null && value > 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public static StatusSnapshot createSnapshot(final ProcessGroupStatus status,  final Date timestamp) {
+        if (isEmpty(status)) {
+            return null;
+        }
+
+        final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESS_GROUP_METRICS);
+        snapshot.setTimestamp(timestamp);
+
+        for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
+            snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
+        }
+
+        return snapshot;
+    }
+
+    private static boolean isEmpty(final ProcessGroupStatus status) {
+        for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
+            final Long value = descriptor.getDescriptor().getValueFunction().getValue(status);
+            if (value != null && value > 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public static StatusSnapshot createSnapshot(final RemoteProcessGroupStatus status, final Date timestamp) {
+        if (isEmpty(status)) {
+            return null;
+        }
+
+        final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(RPG_METRICS);
+        snapshot.setTimestamp(timestamp);
+
+        for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
+            snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
+        }
+
+        return snapshot;
+    }
+
+    private static boolean isEmpty(final RemoteProcessGroupStatus status) {
+        for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
+            final Long value = descriptor.getDescriptor().getValueFunction().getValue(status);
+            if (value != null && value > 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentStatusReport.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentStatusReport.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentStatusReport.java
deleted file mode 100644
index ca31467..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentStatusReport.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.controller.status.ConnectionStatus;
-import org.apache.nifi.controller.status.PortStatus;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-
-/**
- * ComponentStatusReport is a util class that can be used to "flatten" a ProcessGroupStatus into a collection of Map's so that retrieval of a Status for a particular component is very efficient
- */
-public class ComponentStatusReport {
-
-    private final Map<String, ProcessGroupStatus> groupMap = new HashMap<>();
-    private final Map<String, ProcessorStatus> processorMap = new HashMap<>();
-    private final Map<String, ConnectionStatus> connectionMap = new HashMap<>();
-    private final Map<String, RemoteProcessGroupStatus> remoteGroupMap = new HashMap<>();
-    private final Map<String, PortStatus> inputPortMap = new HashMap<>();
-    private final Map<String, PortStatus> outputPortMap = new HashMap<>();
-
-    private ComponentStatusReport() {
-    }
-
-    public static ComponentStatusReport createEmpty() {
-        return new ComponentStatusReport();
-    }
-
-    public static ComponentStatusReport fromProcessGroupStatus(final ProcessGroupStatus status) {
-        return fromProcessGroupStatus(status, ComponentType.values());
-    }
-
-    public static ComponentStatusReport fromProcessGroupStatus(final ProcessGroupStatus status, final ComponentType... componentTypes) {
-        final Set<ComponentType> componentTypeSet = new HashSet<>();
-        for (final ComponentType type : componentTypes) {
-            componentTypeSet.add(type);
-        }
-
-        final ComponentStatusReport report = new ComponentStatusReport();
-        report.populate(status, componentTypeSet);
-        return report;
-    }
-
-    private void populate(final ProcessGroupStatus status, final Set<ComponentType> componentTypes) {
-        if (componentTypes.contains(ComponentType.PROCESS_GROUP)) {
-            groupMap.put(status.getId(), status);
-        }
-
-        if (componentTypes.contains(ComponentType.PROCESSOR)) {
-            for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
-                processorMap.put(procStatus.getId(), procStatus);
-            }
-        }
-
-        if (componentTypes.contains(ComponentType.CONNECTION)) {
-            for (final ConnectionStatus connStatus : status.getConnectionStatus()) {
-                connectionMap.put(connStatus.getId(), connStatus);
-            }
-        }
-
-        if (componentTypes.contains(ComponentType.REMOTE_PROCESS_GROUP)) {
-            for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
-                remoteGroupMap.put(rpgStatus.getId(), rpgStatus);
-            }
-        }
-
-        if (componentTypes.contains(ComponentType.INPUT_PORT)) {
-            for (final PortStatus portStatus : status.getInputPortStatus()) {
-                inputPortMap.put(portStatus.getId(), portStatus);
-            }
-        }
-
-        if (componentTypes.contains(ComponentType.OUTPUT_PORT)) {
-            for (final PortStatus portStatus : status.getOutputPortStatus()) {
-                outputPortMap.put(portStatus.getId(), portStatus);
-            }
-        }
-
-        for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) {
-            populate(childStatus, componentTypes);
-        }
-    }
-
-    public ProcessGroupStatus getProcessGroupStatus(final String groupId) {
-        return groupMap.get(groupId);
-    }
-
-    public ProcessorStatus getProcessorStatus(final String processorId) {
-        return processorMap.get(processorId);
-    }
-
-    public ConnectionStatus getConnectionStatus(final String connectionId) {
-        return connectionMap.get(connectionId);
-    }
-
-    public RemoteProcessGroupStatus getRemoteProcessGroupStatus(final String remoteGroupId) {
-        return remoteGroupMap.get(remoteGroupId);
-    }
-
-    public PortStatus getInputPortStatus(final String portId) {
-        return inputPortMap.get(portId);
-    }
-
-    public PortStatus getOutputPortStatus(final String portId) {
-        return outputPortMap.get(portId);
-    }
-
-    public static enum ComponentType {
-
-        PROCESSOR,
-        INPUT_PORT,
-        OUTPUT_PORT,
-        PROCESS_GROUP,
-        CONNECTION,
-        REMOTE_PROCESS_GROUP;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
index 2bc158f..778efbe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
@@ -16,17 +16,15 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
 import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
-import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.junit.Test;
+import org.testng.Assert;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.junit.Test;
-import org.testng.Assert;
-
 public class TestRingBufferEventRepository {
 
     @Test
@@ -34,15 +32,15 @@ public class TestRingBufferEventRepository {
         final RingBufferEventRepository repo = new RingBufferEventRepository(5);
         long insertNanos = 0L;
         for (int i = 0; i < 1000000; i++) {
-            final FlowFileEvent event = generateEvent("ABC");
+            final FlowFileEvent event = generateEvent();
 
             final long insertStart = System.nanoTime();
-            repo.updateRepository(event);
+            repo.updateRepository(event, "ABC");
             insertNanos += System.nanoTime() - insertStart;
         }
 
         final long queryStart = System.nanoTime();
-        final StandardRepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
+        final StandardRepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis());
         final long queryNanos = System.nanoTime() - queryStart;
         System.out.println(report);
         System.out.println("Insert: " + TimeUnit.MILLISECONDS.convert(insertNanos, TimeUnit.NANOSECONDS));
@@ -55,37 +53,32 @@ public class TestRingBufferEventRepository {
         final FlowFileEventRepository repo = new RingBufferEventRepository(5);
         String id1 = "component1";
         String id2 = "component2";
-        repo.updateRepository(generateEvent(id1));
-        repo.updateRepository(generateEvent(id2));
-        RepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
+        repo.updateRepository(generateEvent(), id1);
+        repo.updateRepository(generateEvent(), id2);
+        RepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis());
         FlowFileEvent entry = report.getReportEntry(id1);
         Assert.assertNotNull(entry);
         entry = report.getReportEntry(id2);
         Assert.assertNotNull(entry);
 
         repo.purgeTransferEvents(id1);
-        report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
+        report = repo.reportTransferEvents(System.currentTimeMillis());
         entry = report.getReportEntry(id1);
         Assert.assertNull(entry);
         entry = report.getReportEntry(id2);
         Assert.assertNotNull(entry);
 
         repo.purgeTransferEvents(id2);
-        report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
+        report = repo.reportTransferEvents(System.currentTimeMillis());
         entry = report.getReportEntry(id2);
         Assert.assertNull(entry);
 
         repo.close();
     }
 
-    private FlowFileEvent generateEvent(final String id) {
+    private FlowFileEvent generateEvent() {
         return new FlowFileEvent() {
             @Override
-            public String getComponentIdentifier() {
-                return id;
-            }
-
-            @Override
             public int getFlowFilesIn() {
                 return 1;
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java
new file mode 100644
index 0000000..f8b4559
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.metrics;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSecondPrecisionEventContainer {
+
+    @Test
+    public void testUpdateOncePerSecond() {
+        final SecondPrecisionEventContainer container = new SecondPrecisionEventContainer(5);
+        final long startTime = System.currentTimeMillis();
+
+        final StandardFlowFileEvent event = new StandardFlowFileEvent();
+        event.setBytesRead(100L);
+        event.setBytesWritten(100L);
+
+        for (int i=0; i < 5; i++) {
+            for (int j=0; j < 300; j++) {
+                container.addEvent(event, startTime + (i * 300_000) + (j * 1000));
+            }
+
+            final long timestamp = startTime + 300_000 * i + 300_000;
+            final FlowFileEvent result = container.generateReport(timestamp);
+            assertEquals("Failure at i=" + i, 300 * 100, result.getBytesRead());
+            assertEquals("Failure at i=" + i, 300 * 100, result.getBytesWritten());
+        }
+    }
+
+    @Test
+    public void testExpiresOnReportGeneration() {
+        final SecondPrecisionEventContainer container = new SecondPrecisionEventContainer(5);
+        final long startTime = System.currentTimeMillis();
+
+        final StandardFlowFileEvent event = new StandardFlowFileEvent();
+        event.setBytesRead(100L);
+        event.setBytesWritten(100L);
+
+        for (int j=0; j < 100; j++) {
+            container.addEvent(event, startTime + (j * 1000));
+        }
+
+        final FlowFileEvent resultAt5Mins = container.generateReport(startTime + 300_000);
+        assertEquals(100 * 100, resultAt5Mins.getBytesRead());
+        assertEquals(100 * 100, resultAt5Mins.getBytesWritten());
+
+        final FlowFileEvent resultAt5MinsPlus50Seconds = container.generateReport(startTime + 350_000);
+        assertEquals(50 * 100, resultAt5MinsPlus50Seconds.getBytesRead());
+        assertEquals(50 * 100, resultAt5MinsPlus50Seconds.getBytesWritten());
+
+        final FlowFileEvent resultAt5MinsPlus99Seconds = container.generateReport(startTime + 399_000);
+        assertEquals(100, resultAt5MinsPlus99Seconds.getBytesRead());
+        assertEquals(100, resultAt5MinsPlus99Seconds.getBytesWritten());
+
+        final FlowFileEvent resultAt5MinsPlus100Seconds = container.generateReport(startTime + 400_000);
+        assertEquals(0, resultAt5MinsPlus100Seconds.getBytesRead());
+        assertEquals(0, resultAt5MinsPlus100Seconds.getBytesWritten());
+
+        final FlowFileEvent resultAt5MinsPlus101Seconds = container.generateReport(startTime + 401_000);
+        assertEquals(0, resultAt5MinsPlus101Seconds.getBytesRead());
+        assertEquals(0, resultAt5MinsPlus101Seconds.getBytesWritten());
+
+        final FlowFileEvent resultsAt5MinsPlus300seconds = container.generateReport(startTime + 600_000);
+        assertEquals(0, resultsAt5MinsPlus300seconds.getBytesRead());
+        assertEquals(0, resultsAt5MinsPlus300seconds.getBytesWritten());
+
+        final FlowFileEvent resultsAt5MinsPlus600seconds = container.generateReport(startTime + 900_000);
+        assertEquals(0, resultsAt5MinsPlus600seconds.getBytesRead());
+        assertEquals(0, resultsAt5MinsPlus600seconds.getBytesWritten());
+    }
+
+    @Test
+    public void testExpiresOnReportGenerationWithSkipsBetweenUpdates() {
+        final SecondPrecisionEventContainer container = new SecondPrecisionEventContainer(5);
+        final long startTime = System.currentTimeMillis();
+
+        final StandardFlowFileEvent event = new StandardFlowFileEvent();
+        event.setBytesRead(100L);
+        event.setBytesWritten(100L);
+
+        for (int j=0; j < 20; j++) {
+            container.addEvent(event, startTime + (j * 5000));
+        }
+
+        final FlowFileEvent resultAt5Mins = container.generateReport(startTime + 300_000);
+        assertEquals(20 * 100, resultAt5Mins.getBytesRead());
+        assertEquals(20 * 100, resultAt5Mins.getBytesWritten());
+
+        final FlowFileEvent resultAt5MinsPlus50Seconds = container.generateReport(startTime + 350_000);
+        assertEquals(10 * 100, resultAt5MinsPlus50Seconds.getBytesRead());
+        assertEquals(10 * 100, resultAt5MinsPlus50Seconds.getBytesWritten());
+
+        final FlowFileEvent resultAt5MinsPlus94Seconds = container.generateReport(startTime + 394_000);
+        assertEquals(100, resultAt5MinsPlus94Seconds.getBytesRead());
+        assertEquals(100, resultAt5MinsPlus94Seconds.getBytesWritten());
+
+        final FlowFileEvent resultAt5MinsPlus95Seconds = container.generateReport(startTime + 395_000);
+        assertEquals(100, resultAt5MinsPlus95Seconds.getBytesRead());
+        assertEquals(100, resultAt5MinsPlus95Seconds.getBytesWritten());
+
+        final FlowFileEvent resultAt5MinsPlus100Seconds = container.generateReport(startTime + 400_000);
+        assertEquals(0, resultAt5MinsPlus100Seconds.getBytesRead());
+        assertEquals(0, resultAt5MinsPlus100Seconds.getBytesWritten());
+
+        final FlowFileEvent resultAt5MinsPlus101Seconds = container.generateReport(startTime + 401_000);
+        assertEquals(0, resultAt5MinsPlus101Seconds.getBytesRead());
+        assertEquals(0, resultAt5MinsPlus101Seconds.getBytesWritten());
+
+        final FlowFileEvent resultsAt5MinsPlus300seconds = container.generateReport(startTime + 600_000);
+        assertEquals(0, resultsAt5MinsPlus300seconds.getBytesRead());
+        assertEquals(0, resultsAt5MinsPlus300seconds.getBytesWritten());
+
+        final FlowFileEvent resultsAt5MinsPlus600seconds = container.generateReport(startTime + 900_000);
+        assertEquals(0, resultsAt5MinsPlus600seconds.getBytesRead());
+        assertEquals(0, resultsAt5MinsPlus600seconds.getBytesWritten());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java
new file mode 100644
index 0000000..338fc62
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestMetricRollingBuffer {
+    private static final Set<MetricDescriptor<?>> PROCESSOR_METRICS = Arrays.stream(ProcessorStatusDescriptor.values())
+        .map(ProcessorStatusDescriptor::getDescriptor)
+        .collect(Collectors.toSet());
+
+    @Test
+    public void testBufferGrows() {
+        final int bufferCapacity = 1000;
+        final MetricRollingBuffer buffer = new MetricRollingBuffer(bufferCapacity);
+
+        final long startTime = System.currentTimeMillis();
+        final List<Date> timestamps = new ArrayList<>();
+
+        int iterations = 1440;
+        for (int i=0; i < iterations; i++) {
+            final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS);
+            snapshot.setTimestamp(new Date(startTime + i * 1000));
+            timestamps.add(snapshot.getTimestamp());
+
+            snapshot.addStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor(), Long.valueOf(i));
+
+            buffer.update(snapshot);
+        }
+
+        assertEquals(bufferCapacity, buffer.size());
+
+        final List<StatusSnapshot> snapshots = buffer.getSnapshots(timestamps, true, PROCESSOR_METRICS);
+        assertEquals(iterations, snapshots.size());
+
+        final int expectedEmptyCount = iterations - bufferCapacity;
+        final long emptyCount = snapshots.stream().filter(snapshot -> snapshot instanceof EmptyStatusSnapshot).count();
+        assertEquals(expectedEmptyCount, emptyCount);
+
+        for (int i=0; i < iterations; i++) {
+            final StatusSnapshot snapshot = snapshots.get(i);
+            if (i < expectedEmptyCount) {
+                assertTrue("Snapshot at i=" + i + " is not an EmptyStatusSnapshot", snapshot instanceof EmptyStatusSnapshot);
+            } else {
+                assertEquals(Long.valueOf(i), snapshot.getStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor()));
+                assertFalse(snapshot instanceof EmptyStatusSnapshot);
+            }
+        }
+    }
+
+    @Test
+    public void testBufferShrinks() {
+        // Cause buffer to grow
+        final int bufferCapacity = 1000;
+        final MetricRollingBuffer buffer = new MetricRollingBuffer(bufferCapacity);
+
+        final long startTime = System.currentTimeMillis();
+
+        int iterations = 1440;
+        for (int i=0; i < iterations; i++) {
+            final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS);
+            snapshot.setTimestamp(new Date(startTime + i * 1000));
+
+            snapshot.addStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor(), Long.valueOf(i));
+            buffer.update(snapshot);
+        }
+
+        assertEquals(bufferCapacity, buffer.size());
+
+        // Expire data ensure that the buffer shrinks
+        final long lastTimestamp = startTime + 1440 * 1000;
+        buffer.expireBefore(new Date(lastTimestamp - 144_001L));
+        assertEquals(144, buffer.size());
+
+        buffer.expireBefore(new Date(lastTimestamp - 16_001L));
+        assertEquals(16, buffer.size());
+
+        buffer.expireBefore(new Date(lastTimestamp));
+        assertEquals(0, buffer.size());
+
+        // Ensure that we can now properly add data again
+        long insertStart = lastTimestamp + 10_000L;
+        final List<Date> timestamps = new ArrayList<>();
+        for (int i=0; i < 4; i++) {
+            final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS);
+            snapshot.setTimestamp(new Date(insertStart + i * 1000));
+            timestamps.add(snapshot.getTimestamp());
+
+            snapshot.addStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor(), Long.valueOf(i));
+            buffer.update(snapshot);
+        }
+
+        assertEquals(4, buffer.size());
+        final List<StatusSnapshot> snapshots = buffer.getSnapshots(timestamps, true, PROCESSOR_METRICS);
+        assertEquals(4, snapshots.size());
+        for (int i=0; i < 4; i++) {
+            final StatusSnapshot snapshot = snapshots.get(i);
+            assertEquals(Long.valueOf(i), snapshot.getStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor()));
+        }
+    }
+}


[2/2] nifi git commit: NIFI-5466: Keep a running total of stats for each component. Refactored FlowFileEvent and repository in order to provide more efficient storage of objects on Java heap by allowing the same 'EMPTY' object to be reused - Refactored

Posted by ma...@apache.org.
NIFI-5466: Keep a running total of stats for each component. Refactored FlowFileEvent and repository in order to provide more efficient storage of objects on Java heap by allowing the same 'EMPTY' object to be reused
 - Refactored VolatileComponentStatusRepository to avoid holding on to ProcessorStatus objects, etc, and only keep what they need
 - Updated VolatileComponentStatusRepository to ensure that we are efficiently storing metrics for processors, etc. that are not running

This closes #2939

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7bbb5a82
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7bbb5a82
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7bbb5a82

Branch: refs/heads/master
Commit: 7bbb5a823aa3d338cecf45bd683f9f34a1338d02
Parents: 410176e
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Aug 2 14:17:36 2018 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Aug 21 11:44:25 2018 -0400

----------------------------------------------------------------------
 .../status/history/MetricDescriptor.java        |   5 +-
 .../status/history/StatusSnapshot.java          |  12 +-
 .../endpoints/StatusHistoryEndpointMerger.java  |  11 +-
 .../controller/repository/FlowFileEvent.java    |   2 -
 .../repository/FlowFileEventRepository.java     |   7 +-
 .../repository/RepositoryStatusReport.java      |   2 +-
 .../apache/nifi/controller/FlowController.java  |  26 +-
 .../repository/StandardProcessSession.java      |  16 +-
 .../StandardRepositoryStatusReport.java         |   7 +-
 .../repository/metrics/EmptyFlowFileEvent.java  | 114 ++++++++
 .../repository/metrics/EventContainer.java      |   6 +-
 .../controller/repository/metrics/EventSum.java |  38 ++-
 .../repository/metrics/EventSumValue.java       |  63 +++-
 .../metrics/RingBufferEventRepository.java      |  16 +-
 .../metrics/SecondPrecisionEventContainer.java  |  96 +++++--
 .../metrics/StandardFlowFileEvent.java          |  14 +-
 .../scheduling/EventDrivenSchedulingAgent.java  |  18 +-
 .../status/history/ComponentDetails.java        | 123 ++++++++
 .../status/history/ComponentStatusHistory.java  |  53 ++++
 .../history/ConnectionStatusDescriptor.java     |  30 +-
 .../status/history/EmptyStatusSnapshot.java     |  66 +++++
 .../status/history/IndexableMetric.java         |  21 ++
 .../status/history/MetricRollingBuffer.java     | 196 +++++++++++++
 .../history/ProcessGroupStatusDescriptor.java   |  59 ++--
 .../history/ProcessorStatusDescriptor.java      |  96 ++++---
 .../RemoteProcessGroupStatusDescriptor.java     |  53 ++--
 .../history/StandardMetricDescriptor.java       |  18 +-
 .../status/history/StandardStatusHistory.java   |  34 +--
 .../status/history/StandardStatusSnapshot.java  |  58 +++-
 .../status/history/StatusHistoryUtil.java       |  18 +-
 .../VolatileComponentStatusRepository.java      | 285 +++++++------------
 .../nifi/controller/tasks/ConnectableTask.java  |  14 +-
 .../nifi/groups/StandardProcessGroup.java       |   2 +-
 .../org/apache/nifi/util/ComponentMetrics.java  | 188 ++++++++++++
 .../apache/nifi/util/ComponentStatusReport.java | 137 ---------
 .../TestRingBufferEventRepository.java          |  31 +-
 .../TestSecondPrecisionEventContainer.java      | 135 +++++++++
 .../status/history/TestMetricRollingBuffer.java | 126 ++++++++
 38 files changed, 1589 insertions(+), 607 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java
index 8fdce05..c0c52b6 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java
@@ -23,13 +23,14 @@ package org.apache.nifi.controller.status.history;
  */
 public interface MetricDescriptor<T> {
 
-    public enum Formatter {
-
+    enum Formatter {
         COUNT,
         DURATION,
         DATA_SIZE
     };
 
+    int getMetricIdentifier();
+
     /**
      * Specifies how the values should be formatted
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java
index 551ceb2..da794ee 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java
@@ -17,7 +17,7 @@
 package org.apache.nifi.controller.status.history;
 
 import java.util.Date;
-import java.util.Map;
+import java.util.Set;
 
 /**
  * A StatusSnapshot represents a Component's status report at some point in time
@@ -29,10 +29,16 @@ public interface StatusSnapshot {
      */
     Date getTimestamp();
 
+    Set<MetricDescriptor<?>> getMetricDescriptors();
+
+    Long getStatusMetric(MetricDescriptor<?> descriptor);
+
     /**
-     * @return a Map of MetricDescriptor to value
+     * Returns an instance of StatusSnapshot that has all the same information as {@code this} except for
+     * Counters. If {@code this} does not contain any counters, the object returned may (or may not) be {@code this}.
+     * @return a StatusSnapshot without counters
      */
-    Map<MetricDescriptor<?>, Long> getStatusMetrics();
+    StatusSnapshot withoutCounters();
 
     /**
      * @return a {@link ValueReducer} that is capable of merging multiple

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
index 49e952b..8e4c26b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
@@ -41,6 +41,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -156,7 +157,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
                         return counters.getOrDefault(descriptorDto.getField(), 0L);
                     };
 
-                    final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(descriptorDto.getField(),
+                    final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(() -> 0, descriptorDto.getField(),
                         descriptorDto.getLabel(), descriptorDto.getDescription(), Formatter.COUNT, valueMapper);
 
                     metricDescriptors.put(fieldName, metricDescriptor);
@@ -197,11 +198,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
                 final StatusSnapshot snapshot = createSnapshot(snapshotDto, metricDescriptors);
                 final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
 
-                Map<String, StatusSnapshot> nodeToSnapshotMap = dateToNodeSnapshots.get(normalizedDate);
-                if (nodeToSnapshotMap == null) {
-                    nodeToSnapshotMap = new HashMap<>();
-                    dateToNodeSnapshots.put(normalizedDate, nodeToSnapshotMap);
-                }
+                Map<String, StatusSnapshot> nodeToSnapshotMap = dateToNodeSnapshots.computeIfAbsent(normalizedDate, k -> new HashMap<>());
                 nodeToSnapshotMap.put(nodeStatusSnapshot.getNodeId(), snapshot);
             }
         }
@@ -220,7 +217,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
     }
 
     private StatusSnapshot createSnapshot(final StatusSnapshotDTO snapshotDto, final Map<String, MetricDescriptor<?>> metricDescriptors) {
-        final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
+        final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(new HashSet<>(metricDescriptors.values()));
         snapshot.setTimestamp(snapshotDto.getTimestamp());
 
         // Default all metrics to 0 so that if a counter has not yet been registered, it will have a value of 0 instead

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
index 26cea50..7b131cc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
@@ -20,8 +20,6 @@ import java.util.Map;
 
 public interface FlowFileEvent {
 
-    String getComponentIdentifier();
-
     int getFlowFilesIn();
 
     int getFlowFilesOut();

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
index 1781d18..8fac237 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
@@ -25,15 +25,16 @@ public interface FlowFileEventRepository extends Closeable {
      * Updates the repository to include a new FlowFile processing event
      *
      * @param event new event
+     * @param  componentIdentifier the ID of the component that the event belongs to
      * @throws java.io.IOException ioe
      */
-    void updateRepository(FlowFileEvent event) throws IOException;
+    void updateRepository(FlowFileEvent event, String componentIdentifier) throws IOException;
 
     /**
-     * @param sinceEpochMillis age of report
+     * @param now the current time
      * @return a report of processing activity since the given time
      */
-    RepositoryStatusReport reportTransferEvents(long sinceEpochMillis);
+    RepositoryStatusReport reportTransferEvents(long now);
 
     /**
      * Causes any flow file events of the given entry age in epoch milliseconds

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
index e434905..5479e26 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
@@ -20,7 +20,7 @@ import java.util.Map;
 
 public interface RepositoryStatusReport {
 
-    void addReportEntry(FlowFileEvent entry);
+    void addReportEntry(FlowFileEvent entry, String componentId);
 
     Map<String, FlowFileEvent> getReportEntries();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index b83749c..21c61e9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -103,6 +103,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
+import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent;
 import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
 import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
 import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
@@ -630,7 +631,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
-                componentStatusRepository.capture(getControllerStatus(), getGarbageCollectionStatus());
+                try {
+                    componentStatusRepository.capture(getControllerStatus(), getGarbageCollectionStatus());
+                } catch (final Exception e) {
+                    LOG.error("Failed to capture component stats for Stats History", e);
+                }
             }
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
@@ -3333,18 +3338,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         status.setType(isProcessorAuthorized ? procNode.getComponentType() : "Processor");
 
         final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier());
-        if (entry == null) {
-            status.setInputBytes(0L);
-            status.setInputCount(0);
-            status.setOutputBytes(0L);
-            status.setOutputCount(0);
-            status.setBytesWritten(0L);
-            status.setBytesRead(0L);
-            status.setProcessingNanos(0);
-            status.setInvocations(0);
-            status.setAverageLineageDuration(0L);
-            status.setFlowFilesRemoved(0);
-        } else {
+        if (entry != null && entry != EmptyFlowFileEvent.INSTANCE) {
             final int processedCount = entry.getFlowFilesOut();
             final long numProcessedBytes = entry.getContentSizeOut();
             status.setOutputBytes(numProcessedBytes);
@@ -4117,13 +4111,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     private RepositoryStatusReport getProcessorStats() {
-        // processed in last 5 minutes
-        return getProcessorStats(System.currentTimeMillis() - 300000);
+        return flowFileEventRepository.reportTransferEvents(System.currentTimeMillis());
     }
 
-    private RepositoryStatusReport getProcessorStats(final long since) {
-        return flowFileEventRepository.reportTransferEvents(since);
-    }
 
     //
     // Clustering methods

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 7255645..9741cff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -528,7 +528,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         try {
             // update event repository
             final Connectable connectable = context.getConnectable();
-            final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
+            final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent();
             flowFileEvent.setBytesRead(checkpoint.bytesRead);
             flowFileEvent.setBytesWritten(checkpoint.bytesWritten);
             flowFileEvent.setContentSizeIn(checkpoint.contentSizeIn);
@@ -553,10 +553,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final Map<String, Long> counters = combineCounters(checkpoint.countersOnCommit, checkpoint.immediateCounters);
             flowFileEvent.setCounters(counters);
 
-            context.getFlowFileEventRepository().updateRepository(flowFileEvent);
+            context.getFlowFileEventRepository().updateRepository(flowFileEvent, connectable.getIdentifier());
 
-            for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) {
-                context.getFlowFileEventRepository().updateRepository(connectionEvent);
+            for (final Map.Entry<String, StandardFlowFileEvent> entry : checkpoint.connectionCounts.entrySet()) {
+                context.getFlowFileEventRepository().updateRepository(entry.getValue(), entry.getKey());
             }
         } catch (final IOException ioe) {
             LOG.error("FlowFile Event Repository failed to update", ioe);
@@ -1052,14 +1052,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         final Connectable connectable = context.getConnectable();
-        final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
+        final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent();
         flowFileEvent.setBytesRead(bytesRead);
         flowFileEvent.setBytesWritten(bytesWritten);
         flowFileEvent.setCounters(immediateCounters);
 
         // update event repository
         try {
-            context.getFlowFileEventRepository().updateRepository(flowFileEvent);
+            context.getFlowFileEventRepository().updateRepository(flowFileEvent, connectable.getIdentifier());
         } catch (final Exception e) {
             LOG.error("Failed to update FlowFileEvent Repository due to " + e);
             if (LOG.isDebugEnabled()) {
@@ -1458,7 +1458,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     private void incrementConnectionInputCounts(final String connectionId, final int flowFileCount, final long bytes) {
-        final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent(id));
+        final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent());
         connectionEvent.setContentSizeIn(connectionEvent.getContentSizeIn() + bytes);
         connectionEvent.setFlowFilesIn(connectionEvent.getFlowFilesIn() + flowFileCount);
     }
@@ -1468,7 +1468,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     private void incrementConnectionOutputCounts(final String connectionId, final int flowFileCount, final long bytes) {
-        final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent(id));
+        final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent());
         connectionEvent.setContentSizeOut(connectionEvent.getContentSizeOut() + bytes);
         connectionEvent.setFlowFilesOut(connectionEvent.getFlowFilesOut() + flowFileCount);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java
index 3e30059..a367168 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java
@@ -54,13 +54,14 @@ public class StandardRepositoryStatusReport implements RepositoryStatusReport {
      * Adds an entry to the report.
      *
      * @param entry an entry
+     * @param componentId the id of the component that the entry belongs to
      */
     @Override
-    public void addReportEntry(FlowFileEvent entry) {
+    public void addReportEntry(FlowFileEvent entry, final String componentId) {
         if (entry == null) {
             throw new NullPointerException("report entry may not be null");
         }
-        this.entries.put(entry.getComponentIdentifier(), entry);
+        this.entries.put(componentId, entry);
     }
 
     @Override
@@ -69,7 +70,7 @@ public class StandardRepositoryStatusReport implements RepositoryStatusReport {
         for (final String key : this.entries.keySet()) {
             final FlowFileEvent entry = this.entries.get(key);
             strb.append("[")
-                    .append(entry.getComponentIdentifier()).append(", ")
+                    .append(key).append(", ")
                     .append(entry.getFlowFilesIn()).append(", ")
                     .append(entry.getContentSizeIn()).append(", ")
                     .append(entry.getFlowFilesOut()).append(", ")

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java
new file mode 100644
index 0000000..3c14140
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.metrics;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EmptyFlowFileEvent implements FlowFileEvent {
+    public static final EmptyFlowFileEvent INSTANCE = new EmptyFlowFileEvent();
+
+    private EmptyFlowFileEvent() {
+    }
+
+    @Override
+    public int getFlowFilesIn() {
+        return 0;
+    }
+
+    @Override
+    public int getFlowFilesOut() {
+        return 0;
+    }
+
+    @Override
+    public int getFlowFilesRemoved() {
+        return 0;
+    }
+
+    @Override
+    public long getContentSizeIn() {
+        return 0;
+    }
+
+    @Override
+    public long getContentSizeOut() {
+        return 0;
+    }
+
+    @Override
+    public long getContentSizeRemoved() {
+        return 0;
+    }
+
+    @Override
+    public long getBytesRead() {
+        return 0;
+    }
+
+    @Override
+    public long getBytesWritten() {
+        return 0;
+    }
+
+    @Override
+    public long getProcessingNanoseconds() {
+        return 0;
+    }
+
+    @Override
+    public long getAverageLineageMillis() {
+        return 0;
+    }
+
+    @Override
+    public long getAggregateLineageMillis() {
+        return 0;
+    }
+
+    @Override
+    public int getFlowFilesReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getBytesReceived() {
+        return 0;
+    }
+
+    @Override
+    public int getFlowFilesSent() {
+        return 0;
+    }
+
+    @Override
+    public long getBytesSent() {
+        return 0;
+    }
+
+    @Override
+    public int getInvocations() {
+        return 0;
+    }
+
+    @Override
+    public Map<String, Long> getCounters() {
+        return Collections.emptyMap();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
index 9dd3c8e..d193b7d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
@@ -20,9 +20,9 @@ package org.apache.nifi.controller.repository.metrics;
 import org.apache.nifi.controller.repository.FlowFileEvent;
 
 public interface EventContainer {
-    public void addEvent(FlowFileEvent event);
+    void addEvent(FlowFileEvent event);
 
-    public void purgeEvents(long cutoffEpochMillis);
+    void purgeEvents(long cutoffEpochMillis);
 
-    public FlowFileEvent generateReport(String componentId, long sinceEpochMillis);
+    FlowFileEvent generateReport(long sinceEpochMillis);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
index b1c9120..5fef08b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
@@ -17,31 +17,30 @@
 
 package org.apache.nifi.controller.repository.metrics;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.nifi.controller.repository.FlowFileEvent;
 
-public class EventSum {
+import java.util.concurrent.atomic.AtomicReference;
 
+public class EventSum {
     private final AtomicReference<EventSumValue> ref = new AtomicReference<>();
 
     public EventSumValue getValue() {
         final EventSumValue value = ref.get();
-        return value == null ? new EventSumValue() : value;
+        return value == null ? new EventSumValue(System.currentTimeMillis()) : value;
     }
 
-    public void addOrReset(final FlowFileEvent event) {
-        final long expectedMinute = System.currentTimeMillis() / 60000;
+    public EventSumValue addOrReset(final FlowFileEvent event, final long timestamp) {
+        final long expectedSecond = timestamp / 1000;
 
         EventSumValue curValue;
         while (true) {
             curValue = ref.get();
-            if (curValue == null || curValue.getMinuteTimestamp() != expectedMinute) {
-                final EventSumValue newValue = new EventSumValue();
+            if (curValue == null || (curValue.getTimestamp() / 1000) != expectedSecond) {
+                final EventSumValue newValue = new EventSumValue(timestamp);
                 final boolean replaced = ref.compareAndSet(curValue, newValue);
                 if (replaced) {
-                    curValue = newValue;
-                    break;
+                    newValue.add(event);
+                    return curValue;
                 }
             } else {
                 break;
@@ -49,5 +48,24 @@ public class EventSum {
         }
 
         curValue.add(event);
+        return null;
+    }
+
+
+    public EventSumValue reset(final long ifOlderThan) {
+        while (true) {
+            final EventSumValue curValue = ref.get();
+            if (curValue == null) {
+                return null;
+            }
+
+            if (curValue.getTimestamp() < ifOlderThan) {
+                if (ref.compareAndSet(curValue, null)) {
+                    return curValue;
+                }
+            } else {
+                return null;
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
index 90990df..210f7ac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
@@ -17,13 +17,14 @@
 
 package org.apache.nifi.controller.repository.metrics;
 
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.nifi.controller.repository.FlowFileEvent;
-
 public class EventSumValue {
+    private volatile boolean empty = true;
 
     private int flowFilesIn = 0;
     private int flowFilesOut = 0;
@@ -44,16 +45,16 @@ public class EventSumValue {
     private int invocations = 0;
     private Map<String, Long> counters;
 
-    private final long minuteTimestamp;
     private final long millisecondTimestamp;
 
 
-    public EventSumValue() {
-        this.millisecondTimestamp = System.currentTimeMillis();
-        this.minuteTimestamp = millisecondTimestamp / 60000;
+    public EventSumValue(final long timestamp) {
+        this.millisecondTimestamp = timestamp;
     }
 
     public synchronized void add(final FlowFileEvent flowFileEvent) {
+        empty = false;
+
         this.aggregateLineageMillis += flowFileEvent.getAggregateLineageMillis();
         this.bytesRead += flowFileEvent.getBytesRead();
         this.bytesReceived += flowFileEvent.getBytesReceived();
@@ -84,8 +85,12 @@ public class EventSumValue {
         }
     }
 
-    public synchronized FlowFileEvent toFlowFileEvent(final String componentId) {
-        final StandardFlowFileEvent event = new StandardFlowFileEvent(componentId);
+    public synchronized FlowFileEvent toFlowFileEvent() {
+        if (empty) {
+            return EmptyFlowFileEvent.INSTANCE;
+        }
+
+        final StandardFlowFileEvent event = new StandardFlowFileEvent();
         event.setAggregateLineageMillis(aggregateLineageMillis);
         event.setBytesRead(bytesRead);
         event.setBytesReceived(bytesReceived);
@@ -106,6 +111,10 @@ public class EventSumValue {
     }
 
     public synchronized void add(final EventSumValue other) {
+        if (other.empty) {
+            return;
+        }
+
         synchronized (other) {
             this.aggregateLineageMillis += other.aggregateLineageMillis;
             this.bytesRead += other.bytesRead;
@@ -139,8 +148,42 @@ public class EventSumValue {
         }
     }
 
-    public long getMinuteTimestamp() {
-        return minuteTimestamp;
+    public synchronized void subtract(final EventSumValue other) {
+        if (other.empty) {
+            return;
+        }
+
+        synchronized (other) {
+            this.aggregateLineageMillis -= other.aggregateLineageMillis;
+            this.bytesRead -= other.bytesRead;
+            this.bytesReceived -= other.bytesReceived;
+            this.bytesSent -= other.bytesSent;
+            this.bytesWritten -= other.bytesWritten;
+            this.contentSizeIn -= other.contentSizeIn;
+            this.contentSizeOut -= other.contentSizeOut;
+            this.contentSizeRemoved -= other.contentSizeRemoved;
+            this.flowFilesIn -= other.flowFilesIn;
+            this.flowFilesOut -= other.flowFilesOut;
+            this.flowFilesReceived -= other.flowFilesReceived;
+            this.flowFilesRemoved -= other.flowFilesRemoved;
+            this.flowFilesSent -= other.flowFilesSent;
+            this.invocations -= other.invocations;
+            this.processingNanos -= other.processingNanos;
+
+            final Map<String, Long> eventCounters = other.counters;
+            if (eventCounters != null) {
+                if (counters == null) {
+                    counters = new HashMap<>();
+                }
+
+                for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
+                    final String counterName = entry.getKey();
+                    final Long counterValue = entry.getValue();
+
+                    counters.compute(counterName, (key, value) -> value == null ? counterValue : counterValue - value);
+                }
+            }
+        }
     }
 
     public long getTimestamp() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
index c60f98d..bcd0344 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
@@ -16,14 +16,14 @@
  */
 package org.apache.nifi.controller.repository.metrics;
 
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.nifi.controller.repository.FlowFileEvent;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
 
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 public class RingBufferEventRepository implements FlowFileEventRepository {
 
     private final int numMinutes;
@@ -38,8 +38,7 @@ public class RingBufferEventRepository implements FlowFileEventRepository {
     }
 
     @Override
-    public void updateRepository(final FlowFileEvent event) {
-        final String componentId = event.getComponentIdentifier();
+    public void updateRepository(final FlowFileEvent event, final String componentId) {
         final EventContainer eventContainer = componentEventMap.computeIfAbsent(componentId, id -> new SecondPrecisionEventContainer(numMinutes));
         eventContainer.addEvent(event);
     }
@@ -48,10 +47,7 @@ public class RingBufferEventRepository implements FlowFileEventRepository {
     public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) {
         final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport();
 
-        componentEventMap.entrySet().stream()
-            .map(entry -> entry.getValue().generateReport(entry.getKey(), sinceEpochMillis))
-            .forEach(event -> report.addReportEntry(event));
-
+        componentEventMap.forEach((componentId, container) -> report.addReportEntry(container.generateReport(sinceEpochMillis), componentId));
         return report;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
index 72a8cfc..2482177 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
@@ -18,13 +18,24 @@ e * Licensed to the Apache Software Foundation (ASF) under one or more
 package org.apache.nifi.controller.repository.metrics;
 
 import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
 
 public class SecondPrecisionEventContainer implements EventContainer {
+    private static final Logger logger = LoggerFactory.getLogger(SecondPrecisionEventContainer.class);
+
     private final int numBins;
     private final EventSum[] sums;
+    private final EventSumValue aggregateValue = new EventSumValue(0);
+    private final AtomicLong lastUpdateSecond = new AtomicLong(0);
 
     public SecondPrecisionEventContainer(final int numMinutes) {
-        numBins = 1 + numMinutes * 60;
+        // number of bins is number of seconds in 'numMinutes' plus 1. We add one because
+        // we want to have the 'current bin' that we are adding values to, in addition to the
+        // previous (X = numMinutes * 60) bins of values that have completed
+        numBins = numMinutes * 60 + 1;
         sums = new EventSum[numBins];
 
         for (int i = 0; i < numBins; i++) {
@@ -34,11 +45,62 @@ public class SecondPrecisionEventContainer implements EventContainer {
 
     @Override
     public void addEvent(final FlowFileEvent event) {
-        final int second = (int) (System.currentTimeMillis() / 1000);
-        final int binIdx = second % numBins;
+        addEvent(event, System.currentTimeMillis());
+    }
+
+    protected void addEvent(final FlowFileEvent event, final long timestamp) {
+        final long second = timestamp / 1000;
+        final int binIdx = (int) (second % numBins);
         final EventSum sum = sums[binIdx];
 
-        sum.addOrReset(event);
+        final EventSumValue replaced = sum.addOrReset(event, timestamp);
+
+        aggregateValue.add(event);
+
+        if (replaced == null) {
+            logger.debug("Updated bin {}. Did NOT replace.", binIdx);
+        } else {
+            logger.debug("Replaced bin {}", binIdx);
+            aggregateValue.subtract(replaced);
+        }
+
+        // If there are any buckets that have expired, we need to update our aggregate value to reflect that.
+        processExpiredBuckets(second);
+    }
+
+    private void processExpiredBuckets(final long currentSecond) {
+        final long lastUpdate = lastUpdateSecond.get();
+        if (currentSecond > lastUpdate) {
+            final boolean updated = lastUpdateSecond.compareAndSet(lastUpdate, currentSecond);
+            if (updated) {
+                if (lastUpdate == 0L) {
+                    // First update, so nothing to expire
+                    return;
+                }
+
+                final int secondsElapsed = (int) (currentSecond - lastUpdate);
+
+                int index = (int) (currentSecond % numBins);
+                final long expirationTimestamp = 1000 * (currentSecond - numBins);
+
+                int expired = 0;
+                for (int i=0; i < secondsElapsed; i++) {
+                    index--;
+                    if (index < 0) {
+                        index = sums.length - 1;
+                    }
+
+                    final EventSum expiredSum = sums[index];
+                    final EventSumValue expiredValue = expiredSum.reset(expirationTimestamp);
+                    if (expiredValue != null) {
+                        aggregateValue.subtract(expiredValue);
+                        expired++;
+                    }
+                }
+
+                logger.debug("Expired {} bins", expired);
+            }
+        }
     }
 
     @Override
@@ -47,23 +109,17 @@ public class SecondPrecisionEventContainer implements EventContainer {
     }
 
     @Override
-    public FlowFileEvent generateReport(final String componentId, final long sinceEpochMillis) {
-        final EventSumValue eventSumValue = new EventSumValue();
-        final long second = sinceEpochMillis / 1000;
-        final int startBinIdx = (int) (second % numBins);
-
-        for (int i = 0; i < numBins; i++) {
-            int binIdx = (startBinIdx + i) % numBins;
-            final EventSum sum = sums[binIdx];
-
-            final EventSumValue sumValue = sum.getValue();
-            if (sumValue.getTimestamp() >= sinceEpochMillis) {
-                eventSumValue.add(sumValue);
-            }
+    public FlowFileEvent generateReport(final long now) {
+        final long second = now / 1000 + 1;
+        final long lastUpdate = lastUpdateSecond.get();
+        final long secondsSinceUpdate = second - lastUpdate;
+        if (secondsSinceUpdate > numBins) {
+            logger.debug("EventContainer hasn't been updated in {} seconds so will generate report as Empty FlowFile Event", secondsSinceUpdate);
+            return EmptyFlowFileEvent.INSTANCE;
         }
 
-        final FlowFileEvent flowFileEvent = eventSumValue.toFlowFileEvent(componentId);
-        return flowFileEvent;
+        logger.debug("Will expire up to {} bins", secondsSinceUpdate);
+        processExpiredBuckets(second);
+        return aggregateValue.toFlowFileEvent();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
index 40ec983..fc00675 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
@@ -16,13 +16,11 @@
  */
 package org.apache.nifi.controller.repository.metrics;
 
-import java.util.Map;
-
 import org.apache.nifi.controller.repository.FlowFileEvent;
 
-public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
+import java.util.Map;
 
-    private final String componentId;
+public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
 
     private int flowFilesIn;
     private int flowFilesOut;
@@ -41,13 +39,7 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
     private int invocations;
     private Map<String, Long> counters;
 
-    public StandardFlowFileEvent(final String componentId) {
-        this.componentId = componentId;
-    }
-
-    @Override
-    public String getComponentIdentifier() {
-        return componentId;
+    public StandardFlowFileEvent() {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 8b960fc..de97225 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -16,13 +16,6 @@
  */
 package org.apache.nifi.controller.scheduling;
 
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
@@ -54,6 +47,13 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
 
     private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
@@ -255,10 +255,10 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
                             }
                             try {
                                 final long processingNanos = System.nanoTime() - startNanos;
-                                final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier());
+                                final StandardFlowFileEvent procEvent = new StandardFlowFileEvent();
                                 procEvent.setProcessingNanos(processingNanos);
                                 procEvent.setInvocations(invocationCount);
-                                context.getFlowFileEventRepository().updateRepository(procEvent);
+                                context.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier());
                             } catch (final IOException e) {
                                 logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString());
                                 logger.error("", e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java
new file mode 100644
index 0000000..68a79b9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ComponentDetails {
+    private final String componentId;
+    private final String groupId;
+    private final String componentName;
+    private final String componentType;
+    private final String sourceName;
+    private final String destinationName;
+    private final String targetUri;
+
+
+    public ComponentDetails(final String id, final String groupId, final String componentName, final String componentType,
+                            final String sourceName, final String destinationName, final String remoteUri) {
+        this.componentId = id;
+        this.groupId = groupId;
+        this.componentName = componentName;
+        this.componentType = componentType;
+        this.sourceName = sourceName;
+        this.destinationName = destinationName;
+        this.targetUri = remoteUri;
+    }
+
+    public static ComponentDetails forProcessor(final ProcessorStatus status) {
+        return forProcessor(status.getId(), status.getGroupId(), status.getName(), status.getType());
+    }
+
+    public static ComponentDetails forProcessor(final String id, final String groupId, final String processorName, final String processorType) {
+        return new ComponentDetails(id, groupId, processorName, processorType, null, null, null);
+    }
+
+    public static ComponentDetails forConnection(final ConnectionStatus status) {
+        return forConnection(status.getId(), status.getGroupId(), status.getName(), status.getSourceName(), status.getDestinationName());
+    }
+
+    public static ComponentDetails forConnection(final String id, final String groupId, final String connectionName, final String sourceName, final String destinationName) {
+        return new ComponentDetails(id, groupId, connectionName, sourceName, destinationName, null, null);
+    }
+
+    public static ComponentDetails forProcessGroup(final ProcessGroupStatus status) {
+        return forProcessGroup(status.getId(), status.getName());
+    }
+
+    public static ComponentDetails forProcessGroup(final String id, final String groupName) {
+        return new ComponentDetails(id,null, groupName, null, null, null, null);
+    }
+
+    public static ComponentDetails forRemoteProcessGroup(final RemoteProcessGroupStatus status) {
+        return forRemoteProcessGroup(status.getId(), status.getGroupId(), status.getName(), status.getTargetUri());
+    }
+
+    public static ComponentDetails forRemoteProcessGroup(final String id, final String parentGroupId, final String rpgName, final String remoteUri) {
+        return new ComponentDetails(id, parentGroupId, rpgName, null, null, null, remoteUri);
+    }
+
+    public String getComponentId() {
+        return componentId;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public String getComponentName() {
+        return componentName;
+    }
+
+    public String getComponentType() {
+        return componentType;
+    }
+
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public String getTargetUri() {
+        return targetUri;
+    }
+
+    /**
+     * Returns a {@Link Map} whose keys are those values defined by {@link ComponentStatusRepository#COMPONENT_DETAIL_GROUP_ID ComponentStatusRepository.COMPONENT_DETAIL_*}
+     * and values are the values that are populated for this ComponentDetails object.
+     */
+    public Map<String, String> toMap() {
+        final Map<String, String> map = new HashMap<>();
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_ID, componentId);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_GROUP_ID, groupId);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_NAME, componentName);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_TYPE, componentType);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_SOURCE_NAME, sourceName);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_DESTINATION_NAME, destinationName);
+        map.put(ComponentStatusRepository.COMPONENT_DETAIL_URI, targetUri);
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java
new file mode 100644
index 0000000..cd4b76b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ComponentStatusHistory {
+
+    private final MetricRollingBuffer snapshots;
+    private ComponentDetails componentDetails;
+
+    public ComponentStatusHistory(final ComponentDetails details, final int maxCapacity) {
+        this.componentDetails = details;
+        snapshots = new MetricRollingBuffer(maxCapacity);
+    }
+
+    public void expireBefore(final Date timestamp) {
+        snapshots.expireBefore(timestamp);
+    }
+
+    public void update(final StatusSnapshot snapshot, final ComponentDetails details) {
+        if (snapshot == null) {
+            return;
+        }
+
+        snapshots.update(snapshot);
+        componentDetails = details;
+    }
+
+    public StatusHistory toStatusHistory(final List<Date> timestamps, final boolean includeCounters, final Set<MetricDescriptor<?>> defaultStatusMetrics) {
+        final Date dateGenerated = new Date();
+        final Map<String, String> componentDetailsMap = componentDetails.toMap();
+        final List<StatusSnapshot> snapshotList = snapshots.getSnapshots(timestamps, includeCounters, defaultStatusMetrics);
+        return new StandardStatusHistory(snapshotList, componentDetailsMap, dateGenerated);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
index c298803..ac738b3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
@@ -21,53 +21,55 @@ import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
 
 public enum ConnectionStatusDescriptor {
-    INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+    INPUT_BYTES(
         "inputBytes",
         "Bytes In (5 mins)",
         "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getInputBytes())),
+        ConnectionStatus::getInputBytes),
 
-    INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+    INPUT_COUNT(
         "inputCount",
         "FlowFiles In (5 mins)",
         "The number of FlowFiles that were transferred to this Connection in the past 5 minutes",
         Formatter.COUNT,
-        s -> Long.valueOf(s.getInputCount()))),
+        s -> Long.valueOf(s.getInputCount())),
 
-    OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+    OUTPUT_BYTES(
         "outputBytes",
         "Bytes Out (5 mins)",
         "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getOutputBytes())),
+        ConnectionStatus::getOutputBytes),
 
-    OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+    OUTPUT_COUNT(
         "outputCount",
         "FlowFiles Out (5 mins)",
         "The number of FlowFiles that were pulled from this Connection in the past 5 minutes",
         Formatter.COUNT,
-        s -> Long.valueOf(s.getOutputCount()))),
+        s -> Long.valueOf(s.getOutputCount())),
 
-    QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+    QUEUED_BYTES(
         "queuedBytes",
         "Queued Bytes",
         "The number of Bytes queued in this Connection",
         Formatter.DATA_SIZE,
-        s -> s.getQueuedBytes())),
+        ConnectionStatus::getQueuedBytes),
 
-    QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+    QUEUED_COUNT(
         "queuedCount",
         "Queued Count",
         "The number of FlowFiles queued in this Connection",
         Formatter.COUNT,
-        s -> Long.valueOf(s.getQueuedCount())));
+        s -> Long.valueOf(s.getQueuedCount()));
 
 
     private MetricDescriptor<ConnectionStatus> descriptor;
 
-    private ConnectionStatusDescriptor(final MetricDescriptor<ConnectionStatus> descriptor) {
-        this.descriptor = descriptor;
+    ConnectionStatusDescriptor(final String field, final String label, final String description,
+                              final MetricDescriptor.Formatter formatter, final ValueMapper<ConnectionStatus> valueFunction) {
+
+        this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
     }
 
     public String getField() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java
new file mode 100644
index 0000000..68c04ca
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+public class EmptyStatusSnapshot implements StatusSnapshot {
+    private static final ValueReducer<StatusSnapshot, StatusSnapshot> VALUE_REDUCER = new EmptyValueReducer();
+    private static final Long METRIC_VALUE = 0L;
+
+    private final Date timestamp;
+    private final Set<MetricDescriptor<?>> metricsDescriptors;
+
+    public EmptyStatusSnapshot(final Date timestamp, final Set<MetricDescriptor<?>> metricsDescriptors) {
+        this.timestamp = timestamp;
+        this.metricsDescriptors = metricsDescriptors;
+    }
+
+    @Override
+    public Date getTimestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public Set<MetricDescriptor<?>> getMetricDescriptors() {
+        return metricsDescriptors;
+    }
+
+    @Override
+    public Long getStatusMetric(final MetricDescriptor<?> descriptor) {
+        return METRIC_VALUE;
+    }
+
+    @Override
+    public StatusSnapshot withoutCounters() {
+        return this;
+    }
+
+    @Override
+    public ValueReducer<StatusSnapshot, StatusSnapshot> getValueReducer() {
+        return VALUE_REDUCER;
+    }
+
+    private static class EmptyValueReducer implements ValueReducer<StatusSnapshot, StatusSnapshot> {
+        @Override
+        public StatusSnapshot reduce(final List<StatusSnapshot> values) {
+            return (values == null || values.isEmpty()) ? null : values.get(0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java
new file mode 100644
index 0000000..d5cbeae
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+public interface IndexableMetric {
+    int getIndex();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java
new file mode 100644
index 0000000..215e364
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+public class MetricRollingBuffer {
+    private final int capacity;
+
+    private StatusSnapshot[] snapshots;
+    private int writeIndex = 0;
+    private int readIndex;
+    private boolean readExhausted;
+    private int count = 0;
+
+    public MetricRollingBuffer(final int maxCapacity) {
+        this.capacity = maxCapacity;
+    }
+
+    public void update(final StatusSnapshot snapshot) {
+        if (snapshot == null) {
+            return;
+        }
+
+        if (snapshots == null) {
+            snapshots = new StatusSnapshot[Math.min(capacity, 16)];
+        }
+
+        if (snapshots[writeIndex] == null) {
+            count++;
+        }
+
+        snapshots[writeIndex++] = snapshot;
+
+        if (writeIndex >= snapshots.length) {
+            if (snapshots.length < capacity) {
+                grow();
+            } else {
+                writeIndex = 0;
+            }
+        }
+    }
+
+    public int size() {
+        return count;
+    }
+
+    public void expireBefore(final Date date) {
+        if (snapshots == null) {
+            return;
+        }
+
+        int readIndex = writeIndex;
+        for (int i=0; i < snapshots.length; i++) {
+            final StatusSnapshot snapshot = snapshots[readIndex];
+            if (snapshot == null) {
+                readIndex++;
+                if (readIndex >= snapshots.length) {
+                    readIndex = 0;
+                }
+
+                continue;
+            }
+
+            final Date snapshotTimestamp = snapshot.getTimestamp();
+            if (snapshotTimestamp.after(date)) {
+                break;
+            }
+
+            snapshots[readIndex] = null;
+            count--;
+
+            readIndex++;
+            if (readIndex >= snapshots.length) {
+                readIndex = 0;
+            }
+        }
+
+        if (count < snapshots.length / 4 || snapshots.length - count > 128) {
+            // If we're using less than 1/4 of the array or we have at least 128 null entries, compact.
+            compact();
+        }
+    }
+
+    private void grow() {
+        final int initialSize = snapshots.length;
+        final int newSize = Math.min(capacity, snapshots.length + 64);
+        final StatusSnapshot[] newArray = new StatusSnapshot[newSize];
+        System.arraycopy(snapshots, 0, newArray, 0, snapshots.length);
+        snapshots = newArray;
+        writeIndex = initialSize;
+    }
+
+    private void compact() {
+        final StatusSnapshot[] newArray = new StatusSnapshot[count + 1];
+        int insertionIndex = 0;
+
+        int readIndex = writeIndex;
+        for (int i=0; i < snapshots.length; i++) {
+            final StatusSnapshot snapshot = snapshots[readIndex];
+            if (snapshot != null) {
+                newArray[insertionIndex++] = snapshot;
+            }
+
+            readIndex++;
+            if (readIndex >= snapshots.length) {
+                readIndex = 0;
+            }
+        }
+
+        snapshots = newArray;
+        writeIndex = count;
+        count = newArray.length - 1;
+    }
+
+    public List<StatusSnapshot> getSnapshots(final List<Date> timestamps, final boolean includeCounters, final Set<MetricDescriptor<?>> defaultStatusMetrics) {
+        if (snapshots == null) {
+            return Collections.emptyList();
+        }
+
+        final List<StatusSnapshot> list = new ArrayList<>(snapshots.length);
+
+        resetRead();
+
+        for (final Date timestamp : timestamps) {
+            final StatusSnapshot snapshot = getSnapshotForTimestamp(timestamp);
+            if (snapshot == null) {
+                list.add(new EmptyStatusSnapshot(timestamp, defaultStatusMetrics));
+            } else {
+                list.add(includeCounters ? snapshot : snapshot.withoutCounters());
+            }
+        }
+
+        return list;
+    }
+
+    private StatusSnapshot getSnapshotForTimestamp(final Date timestamp) {
+        while (!readExhausted) {
+            final StatusSnapshot snapshot = snapshots[readIndex];
+            if (snapshot == null) {
+                advanceRead();
+                continue;
+            }
+
+            final Date snapshotTimestamp = snapshot.getTimestamp();
+            if (snapshotTimestamp.before(timestamp)) {
+                advanceRead();
+                continue;
+            }
+
+            if (snapshotTimestamp.after(timestamp)) {
+                return null;
+            }
+
+            advanceRead();
+            return snapshot;
+        }
+
+        return null;
+    }
+
+    private void resetRead() {
+        readIndex = writeIndex;
+        readExhausted = false;
+    }
+
+    private void advanceRead() {
+        readIndex++;
+
+        if (readIndex >= snapshots.length) {
+            readIndex = 0;
+        }
+
+        if (readIndex == writeIndex) {
+            readExhausted = true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bbb5a82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
index 25b9dfc..6bdb3c0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
@@ -17,79 +17,90 @@
 
 package org.apache.nifi.controller.status.history;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
 
+import java.util.concurrent.TimeUnit;
+
 public enum ProcessGroupStatusDescriptor {
 
-    BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>(
+    BYTES_READ(
         "bytesRead",
         "Bytes Read (5 mins)",
         "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getBytesRead())),
+        ProcessGroupStatus::getBytesRead),
 
-    BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten",
+    BYTES_WRITTEN(
+        "bytesWritten",
         "Bytes Written (5 mins)",
         "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getBytesWritten())),
+        ProcessGroupStatus::getBytesWritten),
 
-    BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred",
+    BYTES_TRANSFERRED(
+        "bytesTransferred",
         "Bytes Transferred (5 mins)",
         "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getBytesRead() + s.getBytesWritten())),
+        s -> s.getBytesRead() + s.getBytesWritten()),
 
-    INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes",
+    INPUT_BYTES("inputBytes",
         "Bytes In (5 mins)",
         "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getInputContentSize())),
+        ProcessGroupStatus::getInputContentSize),
 
-    INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount",
+    INPUT_COUNT(
+        "inputCount",
         "FlowFiles In (5 mins)",
         "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
         Formatter.COUNT,
-        s -> s.getInputCount().longValue())),
+        s -> s.getInputCount().longValue()),
 
-    OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes",
+    OUTPUT_BYTES(
+        "outputBytes",
         "Bytes Out (5 mins)",
         "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
         Formatter.DATA_SIZE,
-        s -> s.getOutputContentSize())),
+        ProcessGroupStatus::getOutputContentSize),
 
-    OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount",
+    OUTPUT_COUNT(
+        "outputCount",
         "FlowFiles Out (5 mins)",
         "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
         Formatter.COUNT,
-        s -> s.getOutputCount().longValue())),
+        s -> s.getOutputCount().longValue()),
 
-    QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes",
+    QUEUED_BYTES(
+        "queuedBytes",
         "Queued Bytes",
         "The cumulative size of all FlowFiles queued in all Connections of this Process Group",
         Formatter.DATA_SIZE,
-        s -> s.getQueuedContentSize())),
+        ProcessGroupStatus::getQueuedContentSize),
 
-    QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount",
+    QUEUED_COUNT(
+        "queuedCount",
         "Queued Count",
         "The number of FlowFiles queued in all Connections of this Process Group",
         Formatter.COUNT,
-        s -> s.getQueuedCount().longValue())),
+        s -> s.getQueuedCount().longValue()),
 
-    TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis",
+    TASK_MILLIS(
+        "taskMillis",
         "Total Task Duration (5 mins)",
         "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes",
         Formatter.DURATION,
-        s -> calculateTaskMillis(s)));
+        ProcessGroupStatusDescriptor::calculateTaskMillis);
+
 
     private MetricDescriptor<ProcessGroupStatus> descriptor;
 
-    private ProcessGroupStatusDescriptor(final MetricDescriptor<ProcessGroupStatus> descriptor) {
-        this.descriptor = descriptor;
+    ProcessGroupStatusDescriptor(final String field, final String label, final String description,
+                               final MetricDescriptor.Formatter formatter, final ValueMapper<ProcessGroupStatus> valueFunction) {
+
+        this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
     }
 
     public String getField() {