You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/12/08 07:59:21 UTC

[GitHub] [nifi] timeabarna opened a new pull request #5582: NIFI-9455 Add json output for Prometheus metrics with basic filtering…

timeabarna opened a new pull request #5582:
URL: https://github.com/apache/nifi/pull/5582


   https://issues.apache.org/jira/browse/NIFI-9455
   
   #### Description of PR
   
   Enables exposing Prometheus metrics in json
   Enables basic filtering for Sample names and Sample values
   Adding root level aggregated prediction metrics
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on pull request #5582: NIFI-9455 Add json output for Prometheus metrics with basic filtering…

Posted by GitBox <gi...@apache.org>.
timeabarna commented on pull request #5582:
URL: https://github.com/apache/nifi/pull/5582#issuecomment-1016166803


   This PR is pending on https://github.com/apache/nifi/pull/5673


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on pull request #5582: NIFI-9455 Add aggregated predictions to Prometheus

Posted by GitBox <gi...@apache.org>.
timeabarna commented on pull request #5582:
URL: https://github.com/apache/nifi/pull/5582#issuecomment-1019764197


   Thanks @exceptionfactory for your help. This PR should be the final of the consolidated PRs, can you please help reviewing it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5582: NIFI-9455 Add aggregated predictions to Prometheus

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5582:
URL: https://github.com/apache/nifi/pull/5582#discussion_r792103838



##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusMetricsUtil.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.prometheus;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
+import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
+import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
+import org.hamcrest.CoreMatchers;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class TestPrometheusMetricsUtil {
+    private static final long DEFAULT_PREDICTION_VALUE = -1L;
+    private static final double EXPECTED_DEFAULT_PREDICTION_VALUE = -1.0;
+    private static final double EXPECTED_BACKPRESSURE_PREDICTION_VALUE = 0.0;
+    private static final double EXPECTED_FALSE_BACKPRESSURE = 0.0;
+    private static final double EXPECTED_TRUE_BACKPRESSURE = 1.0;
+    private static final double EXPECTED_DEFAULT_PERCENT_USED_VALUE = 0.0;
+    private static final double EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE = 100.0;
+    private static final double EXPECTED_NESTED_BYTES_PERCENT_VALUE = 150.0 / 200.0 * 100.0;
+    private static final double EXPECTED_NESTED_COUNT_PERCENT_VALUE = 5.0 / 30.0 * 100.0;
+    private static final String NIFI_PERCENT_USED_BYTES = "nifi_percent_used_bytes";
+    private static final String NIFI_PERCENT_USED_COUNT = "nifi_percent_used_count";
+    private static final String BYTES_AT_BACKPRESSURE = "bytesAtBackpressure";
+    private static final String COUNT_AT_BACKPRESSURE = "countAtBackpressure";
+    private static final String NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION = "nifi_time_to_bytes_backpressure_prediction";
+    private static final String NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION = "nifi_time_to_count_backpressure_prediction";
+    private static final String CONNECTION_1 = "Connection1";
+    private static final String CONNECTION_2 = "Connection2";
+    private static final String CONNECTION_3 = "Connection3";
+    private static final String CONNECTION_4 = "Connection4";
+    private static final String TIME_TO_BYTES_BACKPRESSURE_MILLIS = "timeToBytesBackpressureMillis";
+    private static final String TIME_TO_COUNT_BACKPRESSURE_MILLIS = "timeToCountBackpressureMillis";
+
+    private static ProcessGroupStatus singleProcessGroupStatus;
+    private static ProcessGroupStatus nestedProcessGroupStatus;
+    private static ProcessGroupStatus singleProcessGroupStatusWithBytesBackpressure;
+    private static ProcessGroupStatus nestedProcessGroupStatusWithCountBackpressure;
+    private static Set<String> connections;
+    private static Map<String, Map<String, Long>> mixedValuedPredictions;
+    private static Map<String, Map<String, Long>> defaultValuedPredictions;
+
+    @BeforeAll
+    public static void setup() {
+        singleProcessGroupStatus = createSingleProcessGroupStatus(0, 1, 0, 1);
+        nestedProcessGroupStatus = createNestedProcessGroupStatus();
+        singleProcessGroupStatusWithBytesBackpressure = createSingleProcessGroupStatus(1, 1, 0, 1);
+        nestedProcessGroupStatusWithCountBackpressure = createNestedProcessGroupStatusWithCountBackpressure();
+        connections = createConnections();
+        mixedValuedPredictions = createPredictionsWithMixedValue();
+        defaultValuedPredictions = createPredictionsWithDefaultValuesOnly();
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithSingleProcessGroup() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatus, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithSingleProcessGroupWithBytesBackpressure() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatusWithBytesBackpressure, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_TRUE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithNestedProcessGroups() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatus, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_NESTED_BYTES_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_NESTED_COUNT_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithNestedProcessGroupsWithCountBackpressure() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatusWithCountBackpressure, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_NESTED_BYTES_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_TRUE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregateConnectionPredictionsWithMixedValues() {
+        Map<String, Double> aggregatedMetrics = new HashMap<>();
+        generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
+
+        assertThat(aggregatedMetrics.size(), equalTo(2));
+        assertThat(1.0, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION)));
+        assertThat(2.0, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION)));
+    }
+
+    @Test
+    public void testAggregateConnectionPredictionsWithAllDefaultValues() {
+        Map<String, Double> aggregatedMetrics = new HashMap<>();
+        generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, defaultValuedPredictions);
+
+        assertThat(aggregatedMetrics.size(), equalTo(2));
+        assertThat(EXPECTED_DEFAULT_PREDICTION_VALUE, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION)));
+        assertThat(EXPECTED_DEFAULT_PREDICTION_VALUE, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION)));
+    }
+
+    @Test
+    public void testAggregateConnectionPredictionsWithBackpressure() {
+        Map<String, Double> aggregatedMetrics = new HashMap<>();
+        aggregatedMetrics.put(BYTES_AT_BACKPRESSURE, 1.0);
+        aggregatedMetrics.put(COUNT_AT_BACKPRESSURE, 0.0);
+        generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
+
+        assertThat(EXPECTED_BACKPRESSURE_PREDICTION_VALUE, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION)));
+        assertThat(2.0, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION)));
+    }
+
+    @Test
+    public void testAggregatedConnectionPredictionsDatapointCreationWithAnalyticsNotSet() {
+        ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
+        Map<String, Double> emptyAggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
+                emptyAggregatedMetrics,
+                "",

Review comment:
       Recommend defining a static constant named `EMPTY`, or using a static import of `StringUtils.EMPTY` as opposed to having the empty string quotes repeated through multiple methods.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
##########
@@ -362,4 +365,149 @@ public static CollectorRegistry createBulletinMetrics(BulletinMetricsRegistry bu
         bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level);
         return bulletinMetricsRegistry.getRegistry();
     }
+
+    public static void aggregatePercentUsed(final ProcessGroupStatus status, final Map<String, Double> aggregatedMetrics) {
+        status.getProcessGroupStatus().forEach((childGroupStatus) -> aggregatePercentUsed(childGroupStatus, aggregatedMetrics));
+
+        for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
+            final double percentUsedBytes = getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold());
+            final double percentUsedCount = getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold());
+
+            determineMaxValueForPercentUsed(aggregatedMetrics,
+                    "nifi_percent_used_bytes",
+                    percentUsedBytes);
+
+            determineMaxValueForPercentUsed(aggregatedMetrics,
+                    "nifi_percent_used_count",
+                    percentUsedCount);
+
+            setBackpressure(aggregatedMetrics, percentUsedBytes, "bytesAtBackpressure");
+            setBackpressure(aggregatedMetrics, percentUsedCount, "countAtBackpressure");
+        }
+    }
+
+    public static void aggregateConnectionPredictionMetrics(final Map<String, Double> aggregatedMetrics, final Map<String, Long> predictions) {
+        determineMinValueForPredictions(aggregatedMetrics,
+                "nifi_time_to_bytes_backpressure_prediction",
+                predictions.get("timeToBytesBackpressureMillis"),
+                "bytesAtBackpressure");
+
+        determineMinValueForPredictions(aggregatedMetrics,
+                "nifi_time_to_count_backpressure_prediction",
+                predictions.get("timeToCountBackpressureMillis"),
+                "countAtBackpressure");
+    }
+
+    private static void setBackpressure(final Map<String, Double> aggregatedMetrics, final double percentUsed, final String atBackpressureKey) {
+        if (percentUsed >= 100) {
+            aggregatedMetrics.put(atBackpressureKey, 1.0);
+        } else if (!aggregatedMetrics.containsKey(atBackpressureKey)) {
+            aggregatedMetrics.put(atBackpressureKey, 0.0);
+        }

Review comment:
       What do you think about defining static constants for some of these values?  For example, `1.0` could be defined as `MAXIMUM_BACKPRESSURE`, and that value could be use elsewhere for evaluation.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
##########
@@ -5622,14 +5625,29 @@ public ProcessorDiagnosticsEntity getProcessorDiagnostics(final String id) {
         nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesReceived(), "TOTAL_BYTES_RECEIVED",
                 instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
 
+        //Add total task duration for root to the NiFi metrics registry
+        final StatusHistoryEntity rootGPStatusHistory = getProcessGroupStatusHistory(rootPGId);
+        final List<StatusSnapshotDTO> aggregatedStatusHistory = rootGPStatusHistory.getStatusHistory().getAggregateSnapshots();
+        final long taskDuration = aggregatedStatusHistory.isEmpty() ? 0L :
+                aggregatedStatusHistory.get(aggregatedStatusHistory.size() - 1).getStatusMetrics()

Review comment:
       This line is a bit difficult to read because of the inline index calculation based on `size() - 1`.  Is there a reason for always reading the last element as opposed to the first?  Perhaps definition and index variable and including a comment would be helpful.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
##########
@@ -362,4 +365,149 @@ public static CollectorRegistry createBulletinMetrics(BulletinMetricsRegistry bu
         bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level);
         return bulletinMetricsRegistry.getRegistry();
     }
+
+    public static void aggregatePercentUsed(final ProcessGroupStatus status, final Map<String, Double> aggregatedMetrics) {
+        status.getProcessGroupStatus().forEach((childGroupStatus) -> aggregatePercentUsed(childGroupStatus, aggregatedMetrics));
+
+        for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
+            final double percentUsedBytes = getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold());
+            final double percentUsedCount = getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold());
+
+            determineMaxValueForPercentUsed(aggregatedMetrics,
+                    "nifi_percent_used_bytes",
+                    percentUsedBytes);
+
+            determineMaxValueForPercentUsed(aggregatedMetrics,
+                    "nifi_percent_used_count",
+                    percentUsedCount);
+
+            setBackpressure(aggregatedMetrics, percentUsedBytes, "bytesAtBackpressure");
+            setBackpressure(aggregatedMetrics, percentUsedCount, "countAtBackpressure");
+        }
+    }
+
+    public static void aggregateConnectionPredictionMetrics(final Map<String, Double> aggregatedMetrics, final Map<String, Long> predictions) {
+        determineMinValueForPredictions(aggregatedMetrics,
+                "nifi_time_to_bytes_backpressure_prediction",
+                predictions.get("timeToBytesBackpressureMillis"),
+                "bytesAtBackpressure");
+
+        determineMinValueForPredictions(aggregatedMetrics,
+                "nifi_time_to_count_backpressure_prediction",
+                predictions.get("timeToCountBackpressureMillis"),
+                "countAtBackpressure");
+    }
+
+    private static void setBackpressure(final Map<String, Double> aggregatedMetrics, final double percentUsed, final String atBackpressureKey) {
+        if (percentUsed >= 100) {
+            aggregatedMetrics.put(atBackpressureKey, 1.0);
+        } else if (!aggregatedMetrics.containsKey(atBackpressureKey)) {
+            aggregatedMetrics.put(atBackpressureKey, 0.0);
+        }
+    }
+
+    private static void determineMinValueForPredictions(final Map<String, Double> aggregatedMetrics, final String metricFamilySamplesName,
+                                                        final double metricSampleValue, final String atBackpressureKey) {
+        final Double currentValue = aggregatedMetrics.get(metricFamilySamplesName);
+        if (aggregatedMetrics.get(atBackpressureKey) != null && aggregatedMetrics.get(atBackpressureKey) == 1.0) {
+            aggregatedMetrics.put(metricFamilySamplesName, 0.0);
+        } else if (currentValue == null) {
+            aggregatedMetrics.put(metricFamilySamplesName, metricSampleValue);
+        } else if (metricSampleValue > -1) {
+            if (currentValue == -1) {
+                aggregatedMetrics.put(metricFamilySamplesName, metricSampleValue);
+            } else {
+                aggregatedMetrics.put(metricFamilySamplesName, Math.min(metricSampleValue, currentValue));
+            }
+        }
+    }
+
+    private static void determineMaxValueForPercentUsed(final Map<String, Double> aggregatedMetrics, final String metricFamilySamplesName, final double metricSampleValue) {
+
+        Double currentValue = aggregatedMetrics.get(metricFamilySamplesName);
+        if (currentValue == null) {
+            currentValue = 0.0;
+        }
+        aggregatedMetrics.put(metricFamilySamplesName, Math.max(metricSampleValue, currentValue));
+    }
+
+    public static CollectorRegistry createAggregatedConnectionStatusAnalyticsMetrics(final ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry,
+                                                                                     final Map<String, Double> aggregatedMetrics,
+                                                                                     final String instId, final String compType, final String compName, final String compId) {
+
+        final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
+        final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType;
+        final String componentName = StringUtils.isEmpty(compName) ? DEFAULT_LABEL_STRING : compName;
+        final String componentId = StringUtils.isEmpty(compId) ? DEFAULT_LABEL_STRING : compId;
+        final Double bytesValue = aggregatedMetrics.get("nifi_time_to_bytes_backpressure_prediction");
+        final Double countsValue = aggregatedMetrics.get("nifi_time_to_count_backpressure_prediction");
+        final double bytesBackpressure = bytesValue == null ? -1.0 : bytesValue;
+        final double countsBackpressure = countsValue == null ? -1.0 : countsValue;

Review comment:
       Similar to the other comment, `-1.0` could be defined as `UNDEFINED_BACKPRESSURE`. 

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
##########
@@ -5622,14 +5625,29 @@ public ProcessorDiagnosticsEntity getProcessorDiagnostics(final String id) {
         nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesReceived(), "TOTAL_BYTES_RECEIVED",
                 instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
 
+        //Add total task duration for root to the NiFi metrics registry
+        final StatusHistoryEntity rootGPStatusHistory = getProcessGroupStatusHistory(rootPGId);
+        final List<StatusSnapshotDTO> aggregatedStatusHistory = rootGPStatusHistory.getStatusHistory().getAggregateSnapshots();
+        final long taskDuration = aggregatedStatusHistory.isEmpty() ? 0L :
+                aggregatedStatusHistory.get(aggregatedStatusHistory.size() - 1).getStatusMetrics()
+                .get(ProcessGroupStatusDescriptor.TASK_MILLIS.getField());
+        nifiMetricsRegistry.setDataPoint(taskDuration, "TOTAL_TASK_DURATION",
+                instanceId, "RootProcessGroup", rootPGName, rootPGId, "");

Review comment:
       It would be helpful to define `RootProcessGroup` as a static constant and reuse where it is currently referenced.

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusMetricsUtil.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.prometheus;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
+import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
+import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
+import org.hamcrest.CoreMatchers;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class TestPrometheusMetricsUtil {
+    private static final long DEFAULT_PREDICTION_VALUE = -1L;
+    private static final double EXPECTED_DEFAULT_PREDICTION_VALUE = -1.0;
+    private static final double EXPECTED_BACKPRESSURE_PREDICTION_VALUE = 0.0;
+    private static final double EXPECTED_FALSE_BACKPRESSURE = 0.0;
+    private static final double EXPECTED_TRUE_BACKPRESSURE = 1.0;
+    private static final double EXPECTED_DEFAULT_PERCENT_USED_VALUE = 0.0;
+    private static final double EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE = 100.0;
+    private static final double EXPECTED_NESTED_BYTES_PERCENT_VALUE = 150.0 / 200.0 * 100.0;
+    private static final double EXPECTED_NESTED_COUNT_PERCENT_VALUE = 5.0 / 30.0 * 100.0;
+    private static final String NIFI_PERCENT_USED_BYTES = "nifi_percent_used_bytes";
+    private static final String NIFI_PERCENT_USED_COUNT = "nifi_percent_used_count";
+    private static final String BYTES_AT_BACKPRESSURE = "bytesAtBackpressure";
+    private static final String COUNT_AT_BACKPRESSURE = "countAtBackpressure";
+    private static final String NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION = "nifi_time_to_bytes_backpressure_prediction";
+    private static final String NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION = "nifi_time_to_count_backpressure_prediction";
+    private static final String CONNECTION_1 = "Connection1";
+    private static final String CONNECTION_2 = "Connection2";
+    private static final String CONNECTION_3 = "Connection3";
+    private static final String CONNECTION_4 = "Connection4";
+    private static final String TIME_TO_BYTES_BACKPRESSURE_MILLIS = "timeToBytesBackpressureMillis";
+    private static final String TIME_TO_COUNT_BACKPRESSURE_MILLIS = "timeToCountBackpressureMillis";
+
+    private static ProcessGroupStatus singleProcessGroupStatus;
+    private static ProcessGroupStatus nestedProcessGroupStatus;
+    private static ProcessGroupStatus singleProcessGroupStatusWithBytesBackpressure;
+    private static ProcessGroupStatus nestedProcessGroupStatusWithCountBackpressure;
+    private static Set<String> connections;
+    private static Map<String, Map<String, Long>> mixedValuedPredictions;
+    private static Map<String, Map<String, Long>> defaultValuedPredictions;
+
+    @BeforeAll
+    public static void setup() {
+        singleProcessGroupStatus = createSingleProcessGroupStatus(0, 1, 0, 1);
+        nestedProcessGroupStatus = createNestedProcessGroupStatus();
+        singleProcessGroupStatusWithBytesBackpressure = createSingleProcessGroupStatus(1, 1, 0, 1);
+        nestedProcessGroupStatusWithCountBackpressure = createNestedProcessGroupStatusWithCountBackpressure();
+        connections = createConnections();
+        mixedValuedPredictions = createPredictionsWithMixedValue();
+        defaultValuedPredictions = createPredictionsWithDefaultValuesOnly();
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithSingleProcessGroup() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatus, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithSingleProcessGroupWithBytesBackpressure() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatusWithBytesBackpressure, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_DEFAULT_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_TRUE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithNestedProcessGroups() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatus, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_NESTED_BYTES_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_NESTED_COUNT_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregatePercentUsedWithNestedProcessGroupsWithCountBackpressure() {
+        final Map<String, Double> aggregatedMetrics = new HashMap<>();
+
+        PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatusWithCountBackpressure, aggregatedMetrics);
+
+        assertThat(aggregatedMetrics.size(), equalTo(4));
+        assertThat(EXPECTED_NESTED_BYTES_PERCENT_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)));
+        assertThat(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, equalTo(aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)));
+        assertThat(EXPECTED_FALSE_BACKPRESSURE, equalTo(aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)));
+        assertThat(EXPECTED_TRUE_BACKPRESSURE, equalTo(aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)));
+    }
+
+    @Test
+    public void testAggregateConnectionPredictionsWithMixedValues() {
+        Map<String, Double> aggregatedMetrics = new HashMap<>();
+        generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
+
+        assertThat(aggregatedMetrics.size(), equalTo(2));
+        assertThat(1.0, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION)));
+        assertThat(2.0, equalTo(aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION)));

Review comment:
       Is there a particular reason for using Hamcrest matchers as opposed to standard JUnit `assertEquals()` methods?  Recommend adjusting to use JUnit matches where possible for simplicity.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5582: NIFI-9455 Add json output for Prometheus metrics with basic filtering…

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5582:
URL: https://github.com/apache/nifi/pull/5582#discussion_r765091931



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
##########
@@ -420,7 +422,10 @@ public Response getFlowMetrics(
                     value = "The producer for flow file metrics. Each producer may have its own output format.",
                     required = true
             )
-            @PathParam("producer") final String producer) throws InterruptedException {
+            @PathParam("producer") final String producer,
+            @QueryParam("sampleName") final String sampleName,
+            @QueryParam("sampleLabelValues") final String sampleLabelValues,
+            @QueryParam("firstFieldName") @DefaultValue("samples") final String firstFieldName) throws InterruptedException {

Review comment:
       These new parameters are helpful, but they appear to be applied only to the JSON format.  Instead, they should apply to all output formats, which might require a bit more refactoring to reduce potential duplication.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
##########
@@ -442,6 +447,26 @@ public Response getFlowMetrics(
             return generateOkResponse(response)
                     .type(MediaType.TEXT_PLAIN_TYPE)
                     .build();
+        } else if ("json".equalsIgnoreCase(producer)) {
+            final Collection<CollectorRegistry> allRegistries = serviceFacade.generateFlowMetrics();
+            final Map<String, List<Collector.MetricFamilySamples.Sample>> response = new HashMap<>();

Review comment:
       Did you consider writing out the response using `StreamingOutput` analogous to the current Prometheus approach?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on pull request #5582: NIFI-9455 Add aggregated predictions to Prometheus

Posted by GitBox <gi...@apache.org>.
timeabarna commented on pull request #5582:
URL: https://github.com/apache/nifi/pull/5582#issuecomment-1022902488


   Thanks @exceptionfactory for your review, I've modified the code based on your recommendations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory closed pull request #5582: NIFI-9455 Add aggregated predictions to Prometheus

Posted by GitBox <gi...@apache.org>.
exceptionfactory closed pull request #5582:
URL: https://github.com/apache/nifi/pull/5582


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org