You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/10/03 17:18:52 UTC
[nifi] branch master updated: NIFI-6723: Enrich processor-related
and JVM GC metrics in Prometheus Reporting Task
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new d7ca37d NIFI-6723: Enrich processor-related and JVM GC metrics in Prometheus Reporting Task
d7ca37d is described below
commit d7ca37d065677e3016477f74ecdfe233ccd725c7
Author: Kotaro Terada <ko...@yahoo-corp.jp>
AuthorDate: Fri Sep 27 09:53:39 2019 +0900
NIFI-6723: Enrich processor-related and JVM GC metrics in Prometheus Reporting Task
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3771
---
.../prometheus/api/PrometheusMetricsUtil.java | 63 +++++++++++++++++++---
1 file changed, 56 insertions(+), 7 deletions(-)
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
index 74b0ccc..fec3338 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
@@ -20,6 +20,7 @@ package org.apache.nifi.reporting.prometheus.api;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import io.prometheus.client.SimpleCollector;
import org.apache.nifi.components.AllowableValue;
@@ -47,7 +48,7 @@ public class PrometheusMetricsUtil {
private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
- // Process Group metrics
+ // Processor / Process Group metrics
private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build()
.name("nifi_amount_flowfiles_sent")
.help("Total number of FlowFiles sent by the component")
@@ -66,6 +67,12 @@ public class PrometheusMetricsUtil {
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
+ private static final Gauge AMOUNT_FLOWFILES_REMOVED = Gauge.build()
+ .name("nifi_amount_flowfiles_removed")
+ .help("Total number of FlowFiles removed by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+ .register(NIFI_REGISTRY);
+
private static final Gauge AMOUNT_BYTES_SENT = Gauge.build()
.name("nifi_amount_bytes_sent")
.help("Total number of bytes sent by the component")
@@ -150,6 +157,7 @@ public class PrometheusMetricsUtil {
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
+ // Processor metrics
private static final Gauge PROCESSOR_COUNTERS = Gauge.build()
.name("nifi_processor_counters")
.help("Counters exposed by NiFi Processors")
@@ -252,6 +260,18 @@ public class PrometheusMetricsUtil {
.labelNames("instance")
.register(JVM_REGISTRY);
+ private static final Gauge JVM_GC_RUNS = Gauge.build()
+ .name("nifi_jvm_gc_runs")
+ .help("NiFi JVM GC number of runs")
+ .labelNames("instance", "gc_name")
+ .register(JVM_REGISTRY);
+
+ private static final Gauge JVM_GC_TIME = Gauge.build()
+ .name("nifi_jvm_gc_time")
+ .help("NiFi JVM GC time in milliseconds")
+ .labelNames("instance", "gc_name")
+ .register(JVM_REGISTRY);
+
public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) {
final String componentId = status.getId();
@@ -305,10 +325,10 @@ public class PrometheusMetricsUtil {
if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
// Report metrics for all components
- for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
+ for (ProcessorStatus processorStatus : status.getProcessorStatus()) {
Map<String, Long> counters = processorStatus.getCounters();
- if(counters != null) {
+ if (counters != null) {
counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
.labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue()));
}
@@ -317,13 +337,36 @@ public class PrometheusMetricsUtil {
final String procComponentId = processorStatus.getId();
final String procComponentName = processorStatus.getName();
final String parentId = processorStatus.getGroupId();
+
+ AMOUNT_FLOWFILES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesSent());
+ AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesReceived());
+ AMOUNT_FLOWFILES_REMOVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesRemoved());
+
+ AMOUNT_BYTES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesSent());
+ AMOUNT_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesRead());
+ AMOUNT_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesWritten());
+ AMOUNT_BYTES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesReceived());
+
+ SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
+ .set(processorStatus.getOutputBytes());
+ SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
+ .set(processorStatus.getInputBytes());
+
+ AMOUNT_ITEMS_OUTPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
+ .set(processorStatus.getOutputCount());
+ AMOUNT_ITEMS_INPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
+ .set(processorStatus.getInputCount());
+
+ AVERAGE_LINEAGE_DURATION.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
+ .set(processorStatus.getAverageLineageDuration());
+
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
.set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount());
AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
.set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount());
}
- for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
+ for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
final String connComponentId = connectionStatus.getId();
final String connComponentName = connectionStatus.getName();
final String sourceId = connectionStatus.getSourceId();
@@ -355,7 +398,7 @@ public class PrometheusMetricsUtil {
IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(isBackpressureEnabled ? 1 : 0);
}
- for(PortStatus portStatus : status.getInputPortStatus()) {
+ for (PortStatus portStatus : status.getInputPortStatus()) {
final String portComponentId = portStatus.getId();
final String portComponentName = portStatus.getName();
final String parentId = portStatus.getGroupId();
@@ -379,7 +422,7 @@ public class PrometheusMetricsUtil {
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
}
- for(PortStatus portStatus : status.getOutputPortStatus()) {
+ for (PortStatus portStatus : status.getOutputPortStatus()) {
final String portComponentId = portStatus.getId();
final String portComponentName = portStatus.getName();
final String parentId = portStatus.getGroupId();
@@ -403,7 +446,7 @@ public class PrometheusMetricsUtil {
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
}
- for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
+ for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
final String rpgComponentId = remoteProcessGroupStatus.getId();
final String rpgComponentName = remoteProcessGroupStatus.getName();
final String parentId = remoteProcessGroupStatus.getGroupId();
@@ -446,6 +489,12 @@ public class PrometheusMetricsUtil {
JVM_UPTIME.labels(instanceId).set(jvmMetrics.uptime());
JVM_FILE_DESCRIPTOR_USAGE.labels(instanceId).set(jvmMetrics.fileDescriptorUsage());
+ jvmMetrics.garbageCollectors()
+ .forEach((name, stat) -> {
+ JVM_GC_RUNS.labels(instanceId, name).set(stat.getRuns());
+ JVM_GC_TIME.labels(instanceId, name).set(stat.getTime(TimeUnit.MILLISECONDS));
+ });
+
return JVM_REGISTRY;
}