You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/01/25 20:47:39 UTC

[GitHub] [nifi] exceptionfactory commented on a change in pull request #5582: NIFI-9455 Add aggregated predictions to Prometheus

exceptionfactory commented on a change in pull request #5582:
URL: https://github.com/apache/nifi/pull/5582#discussion_r792103838



##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusMetricsUtil.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.prometheus;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
+import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
+import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
+import org.hamcrest.CoreMatchers;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class TestPrometheusMetricsUtil {
+    private static final long DEFAULT_PREDICTION_VALUE = -1L;
+    private static final double EXPECTED_DEFAULT_PREDICTION_VALUE = -1.0;
+    private static final double EXPECTED_BACKPRESSURE_PREDICTION_VALUE = 0.0;
+    private static final double EXPECTED_FALSE_BACKPRESSURE = 0.0;
+    private static final double EXPECTED_TRUE_BACKPRESSURE = 1.0;
+    private static final double EXPECTED_DEFAULT_PERCENT_USED_VALUE = 0.0;
+    private static final double EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE = 100.0;
+    private static final double EXPECTED_NESTED_BYTES_PERCENT_VALUE = 150.0 / 200.0 * 100.0;
+    private static final double EXPECTED_NESTED_COUNT_PERCENT_VALUE = 5.0 / 30.0 * 100.0;
+    private static final String NIFI_PERCENT_USED_BYTES = "nifi_percent_used_bytes";
+    private static final String NIFI_PERCENT_USED_COUNT = "nifi_percent_used_count";
+    private static final String BYTES_AT_BACKPRESSURE = "bytesAtBackpressure";
+    private static final String COUNT_AT_BACKPRESSURE = "countAtBackpressure";
+    private static final String NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION = "nifi_time_to_bytes_backpressure_prediction";
+    private static final String NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION = "nifi_time_to_count_backpressure_prediction";
+    private static final String CONNECTION_1 = "Connection1";
+    private static final String CONNECTION_2 = "Connection2";
+    private static final String CONNECTION_3 = "Connection3";
+    private static final String CONNECTION_4 = "Connection4";
+    private static final String TIME_TO_BYTES_BACKPRESSURE_MILLIS = "timeToBytesBackpressureMillis";
+    private static final String TIME_TO_COUNT_BACKPRESSURE_MILLIS = "timeToCountBackpressureMillis";
+
+    private static ProcessGroupStatus singleProcessGroupStatus;
+    private static ProcessGroupStatus nestedProcessGroupStatus;
+    private static ProcessGroupStatus singleProcessGroupStatusWithBytesBackpressure;
+    private static ProcessGroupStatus nestedProcessGroupStatusWithCountBackpressure;
+    private static Set<String> connections;
+    private static Map<String, Map<String, Long>> mixedValuedPredictions;
+    private static Map<String, Map<String, Long>> defaultValuedPredictions;
+
+    @BeforeAll
+    public static void setup() {
+        singleProcessGroupStatus = createSingleProcessGroupStatus(0, 1, 0, 1);
+        nestedProcessGroupStatus = createNestedProcessGroupStatus();
+        singleProcessGroupStatusWithBytesBackpressure = createSingleProcessGroupStatus(1, 1, 0, 1);
+        nestedProcessGroupStatusWithCountBackpressure = createNestedProcessGroupStatusWithCountBackpressure();
+        connections = createConnections();
+        mixedValuedPredictions = createPredictionsWithMixedValue();
+        defaultValuedPredictions = createPredictionsWithDefaultValuesOnly();
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithSingleProcessGroup() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatus, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithSingleProcessGroupWithBytesBackpressure() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatusWithBytesBackpressure, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_TRUE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithNestedProcessGroups() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatus, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_NESTED_BYTES_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_NESTED_COUNT_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithNestedProcessGroupsWithCountBackpressure() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatusWithCountBackpressure, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_NESTED_BYTES_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_TRUE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregateConnectionPredictionsWithMixedValues() {
+        Map<String, Double> aggregatedMetrics = new HashMap<>();
+        generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
+
+        assertThat(aggregatedMetrics.size(), equalTo(2));
+        assertThat(1.0, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION)));
+        assertThat(2.0, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION)));
+    }
+
+    @Test
+    public void testAggregateConnectionPredictionsWithAllDefaultValues() {
+        Map<String, Double> aggregatedMetrics = new HashMap<>();
+        generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, defaultValuedPredictions);
+
+        assertThat(aggregatedMetrics.size(), equalTo(2));
+        assertThat(EXPECTED_DEFAULT_PREDICTION_VALUE, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION)));
+        assertThat(EXPECTED_DEFAULT_PREDICTION_VALUE, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION)));
+    }
+
+    @Test
+    public void testAggregateConnectionPredictionsWithBackpressure() {
+        Map<String, Double> aggregatedMetrics = new HashMap<>();
+        aggregatedMetrics.put(BYTES_AT_BACKPRESSURE, 1.0);
+        aggregatedMetrics.put(COUNT_AT_BACKPRESSURE, 0.0);
+        generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
+
+        assertThat(EXPECTED_BACKPRESSURE_PREDICTION_VALUE, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION)));
+        assertThat(2.0, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION)));
+    }
+
+    @Test
+    public void testAggregatedConnectionPredictionsDatapointCreationWithAnalyticsNotSet() {
+        ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
+        Map<String, Double> emptyAggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
+                emptyAggregatedMetrics,
+                "",

Review comment:
       Recommend defining a static constant named `EMPTY`, or using a static import of `StringUtils.EMPTY` as opposed to having the empty string quotes repeated through multiple methods.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
##########
@@ -362,4 +365,149 @@ public static CollectorRegistry createBulletinMetrics(BulletinMetricsRegistry bu
         bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level);
         return bulletinMetricsRegistry.getRegistry();
     }
+
+    public static void aggregatePercentUsed(final ProcessGroupStatus status, final Map<String, Double> aggregatedMetrics) {
+        status.getProcessGroupStatus().forEach((childGroupStatus) -> aggregatePercentUsed(childGroupStatus, aggregatedMetrics));
+
+        for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
+            final double percentUsedBytes = getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold());
+            final double percentUsedCount = getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold());
+
+            determineMaxValueForPercentUsed(aggregatedMetrics,
+                    "nifi_percent_used_bytes",
+                    percentUsedBytes);
+
+            determineMaxValueForPercentUsed(aggregatedMetrics,
+                    "nifi_percent_used_count",
+                    percentUsedCount);
+
+            setBackpressure(aggregatedMetrics, percentUsedBytes, "bytesAtBackpressure");
+            setBackpressure(aggregatedMetrics, percentUsedCount, "countAtBackpressure");
+        }
+    }
+
+    public static void aggregateConnectionPredictionMetrics(final Map<String, Double> aggregatedMetrics, final Map<String, Long> predictions) {
+        determineMinValueForPredictions(aggregatedMetrics,
+                "nifi_time_to_bytes_backpressure_prediction",
+                predictions.get("timeToBytesBackpressureMillis"),
+                "bytesAtBackpressure");
+
+        determineMinValueForPredictions(aggregatedMetrics,
+                "nifi_time_to_count_backpressure_prediction",
+                predictions.get("timeToCountBackpressureMillis"),
+                "countAtBackpressure");
+    }
+
+    private static void setBackpressure(final Map<String, Double> aggregatedMetrics, final double percentUsed, final String atBackpressureKey) {
+        if (percentUsed >= 100) {
+            aggregatedMetrics.put(atBackpressureKey, 1.0);
+        } else if (!aggregatedMetrics.containsKey(atBackpressureKey)) {
+            aggregatedMetrics.put(atBackpressureKey, 0.0);
+        }

Review comment:
       What do you think about defining static constants for some of these values?  For example, `1.0` could be defined as `MAXIMUM_BACKPRESSURE`, and that value could be use elsewhere for evaluation.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
##########
@@ -5622,14 +5625,29 @@ public ProcessorDiagnosticsEntity getProcessorDiagnostics(final String id) {
         nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesReceived(), "TOTAL_BYTES_RECEIVED",
                 instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
 
+        //Add total task duration for root to the NiFi metrics registry
+        final StatusHistoryEntity rootGPStatusHistory = getProcessGroupStatusHistory(rootPGId);
+        final List<StatusSnapshotDTO> aggregatedStatusHistory = rootGPStatusHistory.getStatusHistory().getAggregateSnapshots();
+        final long taskDuration = aggregatedStatusHistory.isEmpty() ? 0L :
+                aggregatedStatusHistory.get(aggregatedStatusHistory.size() - 1).getStatusMetrics()

Review comment:
       This line is a bit difficult to read because of the inline index calculation based on `size() - 1`.  Is there a reason for always reading the last element as opposed to the first?  Perhaps definition and index variable and including a comment would be helpful.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
##########
@@ -362,4 +365,149 @@ public static CollectorRegistry createBulletinMetrics(BulletinMetricsRegistry bu
         bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level);
         return bulletinMetricsRegistry.getRegistry();
     }
+
+    public static void aggregatePercentUsed(final ProcessGroupStatus status, final Map<String, Double> aggregatedMetrics) {
+        status.getProcessGroupStatus().forEach((childGroupStatus) -> aggregatePercentUsed(childGroupStatus, aggregatedMetrics));
+
+        for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
+            final double percentUsedBytes = getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold());
+            final double percentUsedCount = getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold());
+
+            determineMaxValueForPercentUsed(aggregatedMetrics,
+                    "nifi_percent_used_bytes",
+                    percentUsedBytes);
+
+            determineMaxValueForPercentUsed(aggregatedMetrics,
+                    "nifi_percent_used_count",
+                    percentUsedCount);
+
+            setBackpressure(aggregatedMetrics, percentUsedBytes, "bytesAtBackpressure");
+            setBackpressure(aggregatedMetrics, percentUsedCount, "countAtBackpressure");
+        }
+    }
+
+    public static void aggregateConnectionPredictionMetrics(final Map<String, Double> aggregatedMetrics, final Map<String, Long> predictions) {
+        determineMinValueForPredictions(aggregatedMetrics,
+                "nifi_time_to_bytes_backpressure_prediction",
+                predictions.get("timeToBytesBackpressureMillis"),
+                "bytesAtBackpressure");
+
+        determineMinValueForPredictions(aggregatedMetrics,
+                "nifi_time_to_count_backpressure_prediction",
+                predictions.get("timeToCountBackpressureMillis"),
+                "countAtBackpressure");
+    }
+
+    private static void setBackpressure(final Map<String, Double> aggregatedMetrics, final double percentUsed, final String atBackpressureKey) {
+        if (percentUsed >= 100) {
+            aggregatedMetrics.put(atBackpressureKey, 1.0);
+        } else if (!aggregatedMetrics.containsKey(atBackpressureKey)) {
+            aggregatedMetrics.put(atBackpressureKey, 0.0);
+        }
+    }
+
+    private static void determineMinValueForPredictions(final Map<String, Double> aggregatedMetrics, final String metricFamilySamplesName,
+                                                        final double metricSampleValue, final String atBackpressureKey) {
+        final Double currentValue = aggregatedMetrics.get(metricFamilySamplesName);
+        if (aggregatedMetrics.get(atBackpressureKey) != null && aggregatedMetrics.get(atBackpressureKey) == 1.0) {
+            aggregatedMetrics.put(metricFamilySamplesName, 0.0);
+        } else if (currentValue == null) {
+            aggregatedMetrics.put(metricFamilySamplesName, metricSampleValue);
+        } else if (metricSampleValue > -1) {
+            if (currentValue == -1) {
+                aggregatedMetrics.put(metricFamilySamplesName, metricSampleValue);
+            } else {
+                aggregatedMetrics.put(metricFamilySamplesName, Math.min(metricSampleValue, currentValue));
+            }
+        }
+    }
+
+    private static void determineMaxValueForPercentUsed(final Map<String, Double> aggregatedMetrics, final String metricFamilySamplesName, final double metricSampleValue) {
+
+        Double currentValue = aggregatedMetrics.get(metricFamilySamplesName);
+        if (currentValue == null) {
+            currentValue = 0.0;
+        }
+        aggregatedMetrics.put(metricFamilySamplesName, Math.max(metricSampleValue, currentValue));
+    }
+
+    public static CollectorRegistry createAggregatedConnectionStatusAnalyticsMetrics(final ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry,
+                                                                                     final Map<String, Double> aggregatedMetrics,
+                                                                                     final String instId, final String compType, final String compName, final String compId) {
+
+        final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
+        final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType;
+        final String componentName = StringUtils.isEmpty(compName) ? DEFAULT_LABEL_STRING : compName;
+        final String componentId = StringUtils.isEmpty(compId) ? DEFAULT_LABEL_STRING : compId;
+        final Double bytesValue = aggregatedMetrics.get("nifi_time_to_bytes_backpressure_prediction");
+        final Double countsValue = aggregatedMetrics.get("nifi_time_to_count_backpressure_prediction");
+        final double bytesBackpressure = bytesValue == null ? -1.0 : bytesValue;
+        final double countsBackpressure = countsValue == null ? -1.0 : countsValue;

Review comment:
       Similar to the other comment, `-1.0` could be defined as `UNDEFINED_BACKPRESSURE`. 

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
##########
@@ -5622,14 +5625,29 @@ public ProcessorDiagnosticsEntity getProcessorDiagnostics(final String id) {
         nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesReceived(), "TOTAL_BYTES_RECEIVED",
                 instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
 
+        //Add total task duration for root to the NiFi metrics registry
+        final StatusHistoryEntity rootGPStatusHistory = getProcessGroupStatusHistory(rootPGId);
+        final List<StatusSnapshotDTO> aggregatedStatusHistory = rootGPStatusHistory.getStatusHistory().getAggregateSnapshots();
+        final long taskDuration = aggregatedStatusHistory.isEmpty() ? 0L :
+                aggregatedStatusHistory.get(aggregatedStatusHistory.size() - 1).getStatusMetrics()
+                .get(ProcessGroupStatusDescriptor.TASK_MILLIS.getField());
+        nifiMetricsRegistry.setDataPoint(taskDuration, "TOTAL_TASK_DURATION",
+                instanceId, "RootProcessGroup", rootPGName, rootPGId, "");

Review comment:
       It would be helpful to define `RootProcessGroup` as a static constant and reuse where it is currently referenced.

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusMetricsUtil.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.prometheus;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
+import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
+import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
+import org.hamcrest.CoreMatchers;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class TestPrometheusMetricsUtil {
+    private static final long DEFAULT_PREDICTION_VALUE = -1L;
+    private static final double EXPECTED_DEFAULT_PREDICTION_VALUE = -1.0;
+    private static final double EXPECTED_BACKPRESSURE_PREDICTION_VALUE = 0.0;
+    private static final double EXPECTED_FALSE_BACKPRESSURE = 0.0;
+    private static final double EXPECTED_TRUE_BACKPRESSURE = 1.0;
+    private static final double EXPECTED_DEFAULT_PERCENT_USED_VALUE = 0.0;
+    private static final double EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE = 100.0;
+    private static final double EXPECTED_NESTED_BYTES_PERCENT_VALUE = 150.0 / 200.0 * 100.0;
+    private static final double EXPECTED_NESTED_COUNT_PERCENT_VALUE = 5.0 / 30.0 * 100.0;
+    private static final String NIFI_PERCENT_USED_BYTES = "nifi_percent_used_bytes";
+    private static final String NIFI_PERCENT_USED_COUNT = "nifi_percent_used_count";
+    private static final String BYTES_AT_BACKPRESSURE = "bytesAtBackpressure";
+    private static final String COUNT_AT_BACKPRESSURE = "countAtBackpressure";
+    private static final String NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION = "nifi_time_to_bytes_backpressure_prediction";
+    private static final String NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION = "nifi_time_to_count_backpressure_prediction";
+    private static final String CONNECTION_1 = "Connection1";
+    private static final String CONNECTION_2 = "Connection2";
+    private static final String CONNECTION_3 = "Connection3";
+    private static final String CONNECTION_4 = "Connection4";
+    private static final String TIME_TO_BYTES_BACKPRESSURE_MILLIS = "timeToBytesBackpressureMillis";
+    private static final String TIME_TO_COUNT_BACKPRESSURE_MILLIS = "timeToCountBackpressureMillis";
+
+    private static ProcessGroupStatus singleProcessGroupStatus;
+    private static ProcessGroupStatus nestedProcessGroupStatus;
+    private static ProcessGroupStatus singleProcessGroupStatusWithBytesBackpressure;
+    private static ProcessGroupStatus nestedProcessGroupStatusWithCountBackpressure;
+    private static Set<String> connections;
+    private static Map<String, Map<String, Long>> mixedValuedPredictions;
+    private static Map<String, Map<String, Long>> defaultValuedPredictions;
+
+    @BeforeAll
+    public static void setup() {
+        singleProcessGroupStatus = createSingleProcessGroupStatus(0, 1, 0, 1);
+        nestedProcessGroupStatus = createNestedProcessGroupStatus();
+        singleProcessGroupStatusWithBytesBackpressure = createSingleProcessGroupStatus(1, 1, 0, 1);
+        nestedProcessGroupStatusWithCountBackpressure = createNestedProcessGroupStatusWithCountBackpressure();
+        connections = createConnections();
+        mixedValuedPredictions = createPredictionsWithMixedValue();
+        defaultValuedPredictions = createPredictionsWithDefaultValuesOnly();
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithSingleProcessGroup() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatus, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithSingleProcessGroupWithBytesBackpressure() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatusWithBytesBackpressure, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_TRUE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithNestedProcessGroups() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatus, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_NESTED_BYTES_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_NESTED_COUNT_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithNestedProcessGroupsWithCountBackpressure() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatusWithCountBackpressure, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_NESTED_BYTES_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_TRUE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregateConnectionPredictionsWithMixedValues() {
+        Map<String, Double> aggregatedMetrics = new HashMap<>();
+        generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
+
+        assertThat(aggregatedMetrics.size(), equalTo(2));
+        assertThat(1.0, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION)));
+        assertThat(2.0, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION)));

Review comment:
       Is there a particular reason for using Hamcrest matchers as opposed to standard JUnit `assertEquals()` methods?  Recommend adjusting to use JUnit matches where possible for simplicity.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org