You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2019/06/07 20:31:17 UTC
[nifi] branch master updated: NIFI-6352: Add ability to report all
component metrics to Prometheus
This is an automated email from the ASF dual-hosted git repository.
kdoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 99d6ed2 NIFI-6352: Add ability to report all component metrics to Prometheus
99d6ed2 is described below
commit 99d6ed244cd8808646eaa9fbf446e4be36defafc
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Jun 5 12:32:22 2019 -0400
NIFI-6352: Add ability to report all component metrics to Prometheus
This closes #3519.
Signed-off-by: Kevin Doran <kd...@apache.org>
---
.../prometheus/PrometheusReportingTask.java | 32 +-
.../reporting/prometheus/PrometheusServer.java | 11 +-
.../prometheus/api/PrometheusMetricsUtil.java | 330 ++++++++++++++++-----
.../prometheus/TestPrometheusReportingTask.java | 4 +-
4 files changed, 296 insertions(+), 81 deletions(-)
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
index daaddd3..7dcfe73 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
@@ -40,6 +40,10 @@ import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Server;
+import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
+import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
+import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
+
@Tags({ "reporting", "prometheus", "metrics", "time series data" })
@CapabilityDescription("Reports metrics in Prometheus format by creating /metrics http endpoint which can be used for external monitoring of the application."
+ " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance")
@@ -59,7 +63,8 @@ public class PrometheusReportingTask extends AbstractReportingTask {
+ "specified in the SSL Context Service");
public static final PropertyDescriptor METRICS_ENDPOINT_PORT = new PropertyDescriptor.Builder()
- .name("Prometheus Metrics Endpoint Port")
+ .name("prometheus-reporting-task-metrics-endpoint-port")
+ .displayName("Prometheus Metrics Endpoint Port")
.description("The Port where prometheus metrics can be accessed")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -68,7 +73,8 @@ public class PrometheusReportingTask extends AbstractReportingTask {
.build();
public static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
- .name("Instance ID")
+ .name("prometheus-reporting-task-instance-id")
+ .displayName("Instance ID")
.description("Id of this NiFi instance to be included in the metrics sent to Prometheus")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -76,16 +82,27 @@ public class PrometheusReportingTask extends AbstractReportingTask {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ public static final PropertyDescriptor METRICS_STRATEGY = new PropertyDescriptor.Builder()
+ .name("prometheus-reporting-task-metrics-strategy")
+ .displayName("Metrics Reporting Strategy")
+ .description("The granularity on which to report metrics. Options include only the root process group, all process groups, or all components")
+ .allowableValues(METRICS_STRATEGY_ROOT, METRICS_STRATEGY_PG, METRICS_STRATEGY_COMPONENTS)
+ .defaultValue(METRICS_STRATEGY_COMPONENTS.getValue())
+ .required(true)
+ .build();
+
public static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder()
- .name("Send JVM-metrics")
- .description("Send JVM-metrics in addition to the Nifi-metrics")
+ .name("prometheus-reporting-task-metrics-send-jvm")
+ .displayName("Send JVM metrics")
+ .description("Send JVM metrics in addition to the NiFi metrics")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
- .name("SSL Context Service")
+ .name("prometheus-reporting-task-ssl-context")
+ .displayName("SSL Context Service")
.description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+ "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
.required(false)
@@ -93,7 +110,8 @@ public class PrometheusReportingTask extends AbstractReportingTask {
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
- .name("Client Authentication")
+ .name("prometheus-reporting-task-client-auth")
+ .displayName("Client Authentication")
.description("Specifies whether or not the Reporting Task should authenticate clients. This value is ignored if the <SSL Context Service> "
+ "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.")
.required(true)
@@ -107,6 +125,7 @@ public class PrometheusReportingTask extends AbstractReportingTask {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(METRICS_ENDPOINT_PORT);
props.add(INSTANCE_ID);
+ props.add(METRICS_STRATEGY);
props.add(SEND_JVM_METRICS);
props.add(SSL_CONTEXT);
props.add(CLIENT_AUTH);
@@ -165,5 +184,6 @@ public class PrometheusReportingTask extends AbstractReportingTask {
@Override
public void onTrigger(final ReportingContext context) {
this.prometheusServer.setReportingContext(context);
+ this.prometheusServer.setMetricsStrategy(context.getProperty(METRICS_STRATEGY).getValue());
}
}
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
index acaa447..76ef126 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
@@ -54,6 +54,7 @@ public class PrometheusServer {
private Server server;
private ServletContextHandler handler;
private ReportingContext context;
+ private String metricsStrategy;
private boolean sendJvmMetrics;
private String instanceId;
@@ -70,7 +71,8 @@ public class PrometheusServer {
rootGroupStatus = PrometheusServer.this.context.getEventAccess().getControllerStatus();
ServletOutputStream response = resp.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(response);
- nifiRegistry = PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, PrometheusServer.this.instanceId);
+
+ nifiRegistry = PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, PrometheusServer.this.instanceId, "", "RootProcessGroup", metricsStrategy);
TextFormat.write004(osw, nifiRegistry.metricFamilySamples());
if (PrometheusServer.this.sendJvmMetrics == true) {
@@ -167,4 +169,11 @@ public class PrometheusServer {
this.instanceId = iid;
}
+ public String getMetricsStrategy() {
+ return metricsStrategy;
+ }
+
+ public void setMetricsStrategy(String metricsStrategy) {
+ this.metricsStrategy = metricsStrategy;
+ }
}
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
index ebd0102..f4fa642 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
@@ -17,9 +17,11 @@
package org.apache.nifi.reporting.prometheus.api;
-import java.util.Collection;
import java.util.Map;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
@@ -27,100 +29,116 @@ import com.yammer.metrics.core.VirtualMachineMetrics;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.TransmissionStatus;
public class PrometheusMetricsUtil {
+
+ public static final AllowableValue METRICS_STRATEGY_ROOT = new AllowableValue("Root Process Group", "Root Process Group",
+ "Send rollup metrics for the entire root process group");
+ public static final AllowableValue METRICS_STRATEGY_PG = new AllowableValue("All Process Groups", "All Process Groups",
+ "Send metrics for each process group");
+ public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components",
+ "Send metrics for each component in the system, to include processors, connections, controller services, etc.");
+
private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
+ // Process Group metrics
private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build()
- .name("nifi_process_group_amount_flowfiles_sent")
- .help("Total number of FlowFiles in ProcessGroup sent")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_flowfiles_sent")
+ .help("Total number of FlowFiles sent by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_TRANSFERRED = Gauge.build()
- .name("nifi_process_group_amount_flowfiles_transferred")
- .help("Total number of FlowFiles in ProcessGroup transferred")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_flowfiles_transferred")
+ .help("Total number of FlowFiles transferred by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_RECEIVED = Gauge.build()
- .name("nifi_process_group_amount_flowfiles_received")
- .help("Total number of FlowFiles in ProcessGroup received")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_flowfiles_received")
+ .help("Total number of FlowFiles received by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_SENT = Gauge.build()
- .name("nifi_process_group_amount_bytes_sent")
- .help("Total number of Bytes in ProcessGroup sent")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_bytes_sent")
+ .help("Total number of bytes sent by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_READ = Gauge.build()
- .name("nifi_process_group_amount_bytes_read")
- .help("Total number of Bytes in ProcessGroup read")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_bytes_read")
+ .help("Total number of bytes read by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build()
- .name("nifi_process_group_amount_bytes_written")
- .help("Total number of Bytes in ProcessGroup written")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_bytes_written")
+ .help("Total number of bytes written by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_RECEIVED = Gauge.build()
- .name("nifi_process_group_amount_bytes_received")
- .help("Total number of Bytes in ProcessGroup received")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_bytes_received")
+ .help("Total number of bytes received by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_TRANSFERRED = Gauge.build()
- .name("nifi_process_group_amount_bytes_transferred")
- .help("Total number of Bytes in ProcessGroup transferred")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_bytes_transferred")
+ .help("Total number of Bytes transferred by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
-
private static final Gauge AMOUNT_THREADS_TOTAL_ACTIVE = Gauge.build()
- .name("nifi_process_group_amount_threads_active")
- .help("Total number of threads in ProcessGroup active")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_threads_active")
+ .help("Total number of threads active for the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build()
- .name("nifi_process_group_size_content_output_total")
- .help("Total size of content output in ProcessGroup")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_size_content_output_total")
+ .help("Total size of content output by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_INPUT_TOTAL = Gauge.build()
- .name("nifi_process_group_size_content_input_total")
- .help("Total size of content input in ProcessGroup")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_size_content_input_total")
+ .help("Total size of content input by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_QUEUED_TOTAL = Gauge.build()
- .name("nifi_process_group_size_content_queued_total")
- .help("Total size of content queued in ProcessGroup")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_size_content_queued_total")
+ .help("Total size of content queued in the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_OUTPUT = Gauge.build()
- .name("nifi_process_group_amount_items_output")
- .help("Total amount of items in ProcessGroup output")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_items_output")
+ .help("Total number of items output by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_INPUT = Gauge.build()
- .name("nifi_process_group_amount_items_input")
- .help("Total amount of items in ProcessGroup input")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_items_input")
+ .help("Total number of items input by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_QUEUED = Gauge.build()
- .name("nifi_process_group_amount_items_queued")
- .help("Total amount of items in ProcessGroup queued")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_items_queued")
+ .help("Total number of items queued by the component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge PROCESSOR_COUNTERS = Gauge.build()
@@ -129,6 +147,60 @@ public class PrometheusMetricsUtil {
.labelNames("processor_name", "counter_name", "processor_id", "instance")
.register(NIFI_REGISTRY);
+ // Connection metrics
+ private static final Gauge BACKPRESSURE_BYTES_THRESHOLD = Gauge.build()
+ .name("nifi_backpressure_bytes_threshold")
+ .help("The number of bytes that can be queued before backpressure is applied")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
+ .register(NIFI_REGISTRY);
+
+ private static final Gauge BACKPRESSURE_OBJECT_THRESHOLD = Gauge.build()
+ .name("nifi_backpressure_object_threshold")
+ .help("The number of flow files that can be queued before backpressure is applied")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
+ .register(NIFI_REGISTRY);
+
+ private static final Gauge IS_BACKPRESSURE_ENABLED = Gauge.build()
+ .name("nifi_backpressure_enabled")
+ .help("Whether backpressure has been applied for this component. Values are 0 or 1")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
+ .register(NIFI_REGISTRY);
+
+ // Port metrics
+ private static final Gauge IS_TRANSMITTING = Gauge.build()
+ .name("nifi_transmitting")
+ .help("Whether this component is transmitting data. Values are 0 or 1")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "run_status")
+ .register(NIFI_REGISTRY);
+
+ // Remote Process Group (RPG) metrics
+ private static final Gauge ACTIVE_REMOTE_PORT_COUNT = Gauge.build()
+ .name("nifi_active_remote_port_count")
+ .help("The number of active remote ports associated with this component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
+ .register(NIFI_REGISTRY);
+
+ private static final Gauge INACTIVE_REMOTE_PORT_COUNT = Gauge.build()
+ .name("nifi_inactive_remote_port_count")
+ .help("The number of inactive remote ports associated with this component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
+ .register(NIFI_REGISTRY);
+
+ private static final Gauge AVERAGE_LINEAGE_DURATION = Gauge.build()
+ .name("nifi_average_lineage_duration")
+ .help("The average lineage duration (in milliseconds) for all flow file processed by this component")
+ .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+ "source_id", "source_name", "destination_id", "destination_name")
+ .register(NIFI_REGISTRY);
+
+ ///////////////////////////////////////////////////////////////
+ // JVM Metrics
+ ///////////////////////////////////////////////////////////////
private static final Gauge JVM_HEAP_USED = Gauge.build()
.name("nifi_jvm_heap_used")
.help("NiFi JVM heap used")
@@ -171,45 +243,159 @@ public class PrometheusMetricsUtil {
.labelNames("instance")
.register(JVM_REGISTRY);
- public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId) {
+ public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) {
+
+ final String componentId = status.getId();
+ final String componentName = status.getName();
+
+ AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent());
+ AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred());
+ AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesReceived());
+
+ AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent());
+ AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead());
+ AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten());
+ AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived());
+ AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred());
+
+ SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+ .set(status.getOutputContentSize());
+ SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+ .set(status.getInputContentSize());
+ SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+ .set(status.getQueuedContentSize());
- final String processGroupId = status.getId();
- final String processGroupName = status.getName();
- Collection<ProcessorStatus> processorStatus = status.getProcessorStatus();
+ AMOUNT_ITEMS_OUTPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+ .set(status.getOutputCount());
+ AMOUNT_ITEMS_INPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+ .set(status.getInputCount());
+ AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "")
+ .set(status.getQueuedCount());
- AMOUNT_FLOWFILES_SENT.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesSent());
- AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesTransferred());
- AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesReceived());
+ AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getActiveThreadCount());
- AMOUNT_BYTES_SENT.labels(instanceId, processGroupName, processGroupId).set(status.getBytesSent());
- AMOUNT_BYTES_READ.labels(instanceId, processGroupName, processGroupId).set(status.getBytesRead());
- AMOUNT_BYTES_WRITTEN.labels(instanceId, processGroupName, processGroupId).set(status.getBytesWritten());
- AMOUNT_BYTES_RECEIVED.labels(instanceId, processGroupName, processGroupId).set(status.getBytesReceived());
- AMOUNT_BYTES_TRANSFERRED.labels(instanceId, processGroupName, processGroupId).set(status.getBytesTransferred());
+ // Report metrics for child process groups if specified
+ if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
+ status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, parentPGId, "ProcessGroup", metricsStrategy));
+ }
- SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getOutputContentSize());
- SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getInputContentSize());
- SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getQueuedContentSize());
+ if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
+ // Report metrics for all components
+ for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
+ Map<String, Long> counters = processorStatus.getCounters();
- AMOUNT_ITEMS_OUTPUT.labels(instanceId, processGroupName, processGroupId).set(status.getOutputCount());
- AMOUNT_ITEMS_INPUT.labels(instanceId, processGroupName, processGroupId).set(status.getInputCount());
- AMOUNT_ITEMS_QUEUED.labels(instanceId, processGroupName, processGroupId).set(status.getQueuedCount());
+ if(counters != null) {
+ counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
+ .labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue()));
+ }
+ }
+ for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
+ final String connComponentId = connectionStatus.getId();
+ final String connComponentName = connectionStatus.getName();
+ final String sourceId = connectionStatus.getSourceId();
+ final String sourceName = connectionStatus.getSourceName();
+ final String destinationId = connectionStatus.getDestinationId();
+ final String destinationName = connectionStatus.getDestinationName();
+ final String parentId = connectionStatus.getGroupId();
+ final String connComponentType = "Connection";
+ SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(connectionStatus.getOutputBytes());
+ SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(connectionStatus.getInputBytes());
+ SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(connectionStatus.getQueuedBytes());
+
+ AMOUNT_ITEMS_OUTPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(connectionStatus.getOutputCount());
+ AMOUNT_ITEMS_INPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(connectionStatus.getInputCount());
+ AMOUNT_ITEMS_QUEUED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(connectionStatus.getQueuedCount());
+
+ BACKPRESSURE_BYTES_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(connectionStatus.getBackPressureBytesThreshold());
+ BACKPRESSURE_OBJECT_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(connectionStatus.getBackPressureObjectThreshold());
+ boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount())
+ || (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getMaxQueuedBytes());
+ IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+ .set(isBackpressureEnabled ? 1 : 0);
+ }
+ for(PortStatus portStatus : status.getInputPortStatus()) {
+ final String portComponentId = portStatus.getId();
+ final String portComponentName = portStatus.getName();
+ final String parentId = portStatus.getGroupId();
+ final String portComponentType = "InputPort";
+ AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
+ AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived());
+
+ AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
+ AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
+ AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+ AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
+
+ AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+ .set(portStatus.getOutputCount());
+ AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+ .set(portStatus.getInputCount());
+
+ final Boolean isTransmitting = portStatus.isTransmitting();
+ IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
+ .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
+
+ AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
+ }
+ for(PortStatus portStatus : status.getOutputPortStatus()) {
+ final String portComponentId = portStatus.getId();
+ final String portComponentName = portStatus.getName();
+ final String parentId = portStatus.getGroupId();
+ final String portComponentType = "OutputPort";
+ AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
+ AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId).set(portStatus.getFlowFilesReceived());
+
+ AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
+ AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
+ AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+ AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
+
+ AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+ .set(portStatus.getOutputCount());
+ AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+ .set(portStatus.getInputCount());
+
+ final Boolean isTransmitting = portStatus.isTransmitting();
+ IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
+ .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
+
+ AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
+ }
+ for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
+ final String rpgComponentId = remoteProcessGroupStatus.getId();
+ final String rpgComponentName = remoteProcessGroupStatus.getName();
+ final String parentId = remoteProcessGroupStatus.getGroupId();
+ final String rpgComponentType = "RemoteProcessGroup";
+
+ AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize());
+ AMOUNT_BYTES_RECEIVED.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getReceivedContentSize());
+ AMOUNT_ITEMS_OUTPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
+ .set(remoteProcessGroupStatus.getSentCount());
+ AMOUNT_ITEMS_INPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
+ .set(remoteProcessGroupStatus.getReceivedCount());
+ ACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveRemotePortCount());
+ INACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getInactiveRemotePortCount());
- AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, processGroupName, processGroupId).set(status.getActiveThreadCount());
+ AVERAGE_LINEAGE_DURATION.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getAverageLineageDuration());
- for (ProcessorStatus pstatus : processorStatus) {
- Map<String, Long> counters = pstatus.getCounters();
+ IS_TRANSMITTING.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name())
+ .set(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0);
- if(counters != null) {
- counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
- .labels(pstatus.getName(), entry.getKey(), pstatus.getId(), instanceId).set(entry.getValue()));
+ AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveThreadCount());
}
}
return NIFI_REGISTRY;
-
}
public static CollectorRegistry createJvmMetrics(VirtualMachineMetrics jvmMetrics, String instanceId) {
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
index bba7a8b..8cb18b2 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
@@ -103,9 +103,9 @@ public class TestPrometheusReportingTask {
HttpEntity entity = response.getEntity();
String content = EntityUtils.toString(entity);
Assert.assertEquals(true, content.contains(
- "nifi_process_group_amount_flowfiles_received{instance=\"localhost\",process_group_name=\"root\",process_group_id=\"1234\",} 5.0"));
+ "nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
Assert.assertEquals(true, content.contains(
- "nifi_process_group_amount_threads_active{instance=\"localhost\",process_group_name=\"root\",process_group_id=\"1234\",} 5.0"));
+ "nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
}
}