You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/10/03 17:18:52 UTC

[nifi] branch master updated: NIFI-6723: Enrich processor-related and JVM GC metrics in Prometheus Reporting Task

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new d7ca37d  NIFI-6723: Enrich processor-related and JVM GC metrics in Prometheus Reporting Task
d7ca37d is described below

commit d7ca37d065677e3016477f74ecdfe233ccd725c7
Author: Kotaro Terada <ko...@yahoo-corp.jp>
AuthorDate: Fri Sep 27 09:53:39 2019 +0900

    NIFI-6723: Enrich processor-related and JVM GC metrics in Prometheus Reporting Task
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3771
---
 .../prometheus/api/PrometheusMetricsUtil.java      | 63 +++++++++++++++++++---
 1 file changed, 56 insertions(+), 7 deletions(-)

diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
index 74b0ccc..fec3338 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
@@ -20,6 +20,7 @@ package org.apache.nifi.reporting.prometheus.api;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import io.prometheus.client.SimpleCollector;
 import org.apache.nifi.components.AllowableValue;
@@ -47,7 +48,7 @@ public class PrometheusMetricsUtil {
     private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
     private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
 
-    // Process Group metrics
+    // Processor / Process Group metrics
     private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build()
             .name("nifi_amount_flowfiles_sent")
             .help("Total number of FlowFiles sent by the component")
@@ -66,6 +67,12 @@ public class PrometheusMetricsUtil {
             .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
             .register(NIFI_REGISTRY);
 
+    private static final Gauge AMOUNT_FLOWFILES_REMOVED = Gauge.build()
+            .name("nifi_amount_flowfiles_removed")
+            .help("Total number of FlowFiles removed by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
     private static final Gauge AMOUNT_BYTES_SENT = Gauge.build()
             .name("nifi_amount_bytes_sent")
             .help("Total number of bytes sent by the component")
@@ -150,6 +157,7 @@ public class PrometheusMetricsUtil {
                     "source_id", "source_name", "destination_id", "destination_name")
             .register(NIFI_REGISTRY);
 
+    // Processor metrics
     private static final Gauge PROCESSOR_COUNTERS = Gauge.build()
             .name("nifi_processor_counters")
             .help("Counters exposed by NiFi Processors")
@@ -252,6 +260,18 @@ public class PrometheusMetricsUtil {
             .labelNames("instance")
             .register(JVM_REGISTRY);
 
+    private static final Gauge JVM_GC_RUNS = Gauge.build()
+            .name("nifi_jvm_gc_runs")
+            .help("NiFi JVM GC number of runs")
+            .labelNames("instance", "gc_name")
+            .register(JVM_REGISTRY);
+
+    private static final Gauge JVM_GC_TIME = Gauge.build()
+            .name("nifi_jvm_gc_time")
+            .help("NiFi JVM GC time in milliseconds")
+            .labelNames("instance", "gc_name")
+            .register(JVM_REGISTRY);
+
     public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) {
 
         final String componentId = status.getId();
@@ -305,10 +325,10 @@ public class PrometheusMetricsUtil {
 
         if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
             // Report metrics for all components
-            for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
+            for (ProcessorStatus processorStatus : status.getProcessorStatus()) {
                 Map<String, Long> counters = processorStatus.getCounters();
 
-                if(counters != null) {
+                if (counters != null) {
                     counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
                             .labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue()));
                 }
@@ -317,13 +337,36 @@ public class PrometheusMetricsUtil {
                 final String procComponentId = processorStatus.getId();
                 final String procComponentName = processorStatus.getName();
                 final String parentId = processorStatus.getGroupId();
+
+                AMOUNT_FLOWFILES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesSent());
+                AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesReceived());
+                AMOUNT_FLOWFILES_REMOVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesRemoved());
+
+                AMOUNT_BYTES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesSent());
+                AMOUNT_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesRead());
+                AMOUNT_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesWritten());
+                AMOUNT_BYTES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesReceived());
+
+                SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
+                        .set(processorStatus.getOutputBytes());
+                SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
+                        .set(processorStatus.getInputBytes());
+
+                AMOUNT_ITEMS_OUTPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
+                        .set(processorStatus.getOutputCount());
+                AMOUNT_ITEMS_INPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
+                        .set(processorStatus.getInputCount());
+
+                AVERAGE_LINEAGE_DURATION.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
+                        .set(processorStatus.getAverageLineageDuration());
+
                 AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
                         .set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount());
                 AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
                         .set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount());
 
             }
-            for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
+            for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
                 final String connComponentId = connectionStatus.getId();
                 final String connComponentName = connectionStatus.getName();
                 final String sourceId = connectionStatus.getSourceId();
@@ -355,7 +398,7 @@ public class PrometheusMetricsUtil {
                 IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
                         .set(isBackpressureEnabled ? 1 : 0);
             }
-            for(PortStatus portStatus : status.getInputPortStatus()) {
+            for (PortStatus portStatus : status.getInputPortStatus()) {
                 final String portComponentId = portStatus.getId();
                 final String portComponentName = portStatus.getName();
                 final String parentId = portStatus.getGroupId();
@@ -379,7 +422,7 @@ public class PrometheusMetricsUtil {
 
                 AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
             }
-            for(PortStatus portStatus : status.getOutputPortStatus()) {
+            for (PortStatus portStatus : status.getOutputPortStatus()) {
                 final String portComponentId = portStatus.getId();
                 final String portComponentName = portStatus.getName();
                 final String parentId = portStatus.getGroupId();
@@ -403,7 +446,7 @@ public class PrometheusMetricsUtil {
 
                 AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
             }
-            for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
+            for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
                 final String rpgComponentId = remoteProcessGroupStatus.getId();
                 final String rpgComponentName = remoteProcessGroupStatus.getName();
                 final String parentId = remoteProcessGroupStatus.getGroupId();
@@ -446,6 +489,12 @@ public class PrometheusMetricsUtil {
         JVM_UPTIME.labels(instanceId).set(jvmMetrics.uptime());
         JVM_FILE_DESCRIPTOR_USAGE.labels(instanceId).set(jvmMetrics.fileDescriptorUsage());
 
+        jvmMetrics.garbageCollectors()
+                .forEach((name, stat) -> {
+                    JVM_GC_RUNS.labels(instanceId, name).set(stat.getRuns());
+                    JVM_GC_TIME.labels(instanceId, name).set(stat.getTime(TimeUnit.MILLISECONDS));
+                });
+
         return JVM_REGISTRY;
     }