You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/03/12 19:40:08 UTC

[nifi] 16/21: NIFI-6111: Fixed bugs in the Status History values. If metrics are not available yet from all nodes for the last point on the graph, leave the cluster aggregate value off for the last point to prevent it from dropping significantly at the end of the chart. Fixed bug where counter values were not properly summed together in cluster view. Addressed issue with Average Task Duration

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.9.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit ca32e045109f97b3a7e2cd5a810a003ca1f0148b
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Mar 8 16:29:03 2019 -0500

    NIFI-6111: Fixed bugs in the Status History values. If metrics are not available yet from all nodes for the last point on the graph, leave the cluster aggregate value off for the last point to prevent it from dropping significantly at the end of the chart. Fixed bug where counter values were not properly summed together in cluster view. Addressed issue with Average Task Duration
    
    This closes #3361.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../endpoints/StatusHistoryEndpointMerger.java     | 22 +++++++++++++++++-----
 .../status/history/StandardStatusSnapshot.java     |  7 ++++++-
 .../org/apache/nifi/util/ComponentMetrics.java     |  6 ++----
 3 files changed, 25 insertions(+), 10 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
index 8e4c26b..90ef471 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
@@ -21,12 +21,12 @@ import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.CounterMetricDescriptor;
 import org.apache.nifi.controller.status.history.MetricDescriptor;
 import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
 import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
 import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
 import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
-import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
 import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
 import org.apache.nifi.controller.status.history.StatusHistoryUtil;
 import org.apache.nifi.controller.status.history.StatusSnapshot;
@@ -157,9 +157,8 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
                         return counters.getOrDefault(descriptorDto.getField(), 0L);
                     };
 
-                    final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(() -> 0, descriptorDto.getField(),
-                        descriptorDto.getLabel(), descriptorDto.getDescription(), Formatter.COUNT, valueMapper);
-
+                    final MetricDescriptor<ProcessorStatus> metricDescriptor = new CounterMetricDescriptor<>(descriptorDto.getField(), descriptorDto.getLabel(),
+                        descriptorDto.getDescription(), Formatter.COUNT, valueMapper);
                     metricDescriptors.put(fieldName, metricDescriptor);
                 }
             }
@@ -247,11 +246,24 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
         return snapshot;
     }
 
-    private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>> snapshotsToAggregate) {
+    private List<StatusSnapshotDTO> aggregate(final Map<Date, List<StatusSnapshot>> snapshotsToAggregate) {
         // Aggregate the snapshots
         final List<StatusSnapshotDTO> aggregatedSnapshotDtos = new ArrayList<>();
+
+        int iteration = 0;
+        int previousSnapshotCount = 0;
         for (final Map.Entry<Date, List<StatusSnapshot>> entry : snapshotsToAggregate.entrySet()) {
             final List<StatusSnapshot> snapshots = entry.getValue();
+
+            // If this is the last snapshot, we don't want to include it unless we have stats from all nodes.
+            // Otherwise, when we look at the stats in a chart, the last point for the cluster stats often seems to
+            // drop off very steeply.
+            if (++iteration == snapshotsToAggregate.size() && snapshots.size() < previousSnapshotCount) {
+                continue;
+            }
+
+            previousSnapshotCount = snapshots.size();
+
             final StatusSnapshot reducedSnapshot = snapshots.get(0).getValueReducer().reduce(snapshots);
 
             final StatusSnapshotDTO dto = new StatusSnapshotDTO();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
index 8e78c5e..652ffd5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
@@ -78,10 +78,15 @@ public class StandardStatusSnapshot implements StatusSnapshot {
 
 
     public void addStatusMetric(final MetricDescriptor<?> metric, final Long value) {
+        if (metric.isCounter()) {
+            addCounterStatusMetric(metric, value);
+            return;
+        }
+
         values[metric.getMetricIdentifier()] = value;
     }
 
-    public void addCounterStatusMetric(final MetricDescriptor<?> metric, final Long value) {
+    private void addCounterStatusMetric(final MetricDescriptor<?> metric, final Long value) {
         if (counterValues == null) {
             counterValues = new HashMap<>();
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
index 7fa2278..5ce1def 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
@@ -73,9 +73,7 @@ public class ComponentMetrics {
         snapshot.setTimestamp(timestamp);
 
         for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
-            if (descriptor.isVisible()) {
-                snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
-            }
+            snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
         }
 
         final Map<String, Long> counters = status.getCounters();
@@ -87,7 +85,7 @@ public class ComponentMetrics {
                 final MetricDescriptor<ProcessorStatus> metricDescriptor = new CounterMetricDescriptor<>(entry.getKey(), label, label, MetricDescriptor.Formatter.COUNT,
                         s -> s.getCounters() == null ? null : s.getCounters().get(counterName));
 
-                snapshot.addCounterStatusMetric(metricDescriptor, entry.getValue());
+                snapshot.addStatusMetric(metricDescriptor, entry.getValue());
             }
         }