You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2019/06/07 20:31:17 UTC

[nifi] branch master updated: NIFI-6352: Add ability to report all component metrics to Prometheus

This is an automated email from the ASF dual-hosted git repository.

kdoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 99d6ed2  NIFI-6352: Add ability to report all component metrics to Prometheus
99d6ed2 is described below

commit 99d6ed244cd8808646eaa9fbf446e4be36defafc
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Jun 5 12:32:22 2019 -0400

    NIFI-6352: Add ability to report all component metrics to Prometheus
    
    This closes #3519.
    
    Signed-off-by: Kevin Doran <kd...@apache.org>
---
 .../prometheus/PrometheusReportingTask.java        |  32 +-
 .../reporting/prometheus/PrometheusServer.java     |  11 +-
 .../prometheus/api/PrometheusMetricsUtil.java      | 330 ++++++++++++++++-----
 .../prometheus/TestPrometheusReportingTask.java    |   4 +-
 4 files changed, 296 insertions(+), 81 deletions(-)

diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
index daaddd3..7dcfe73 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
@@ -40,6 +40,10 @@ import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.eclipse.jetty.server.Server;
 
+import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
+import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
+import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
+
 @Tags({ "reporting", "prometheus", "metrics", "time series data" })
 @CapabilityDescription("Reports metrics in Prometheus format by creating /metrics http endpoint which can be used for external monitoring of the application."
         + " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance")
@@ -59,7 +63,8 @@ public class PrometheusReportingTask extends AbstractReportingTask {
             + "specified in the SSL Context Service");
 
     public static final PropertyDescriptor METRICS_ENDPOINT_PORT = new PropertyDescriptor.Builder()
-            .name("Prometheus Metrics Endpoint Port")
+            .name("prometheus-reporting-task-metrics-endpoint-port")
+            .displayName("Prometheus Metrics Endpoint Port")
             .description("The Port where prometheus metrics can be accessed")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -68,7 +73,8 @@ public class PrometheusReportingTask extends AbstractReportingTask {
             .build();
 
     public static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
-            .name("Instance ID")
+            .name("prometheus-reporting-task-instance-id")
+            .displayName("Instance ID")
             .description("Id of this NiFi instance to be included in the metrics sent to Prometheus")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -76,16 +82,27 @@ public class PrometheusReportingTask extends AbstractReportingTask {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor METRICS_STRATEGY = new PropertyDescriptor.Builder()
+            .name("prometheus-reporting-task-metrics-strategy")
+            .displayName("Metrics Reporting Strategy")
+            .description("The granularity on which to report metrics. Options include only the root process group, all process groups, or all components")
+            .allowableValues(METRICS_STRATEGY_ROOT, METRICS_STRATEGY_PG, METRICS_STRATEGY_COMPONENTS)
+            .defaultValue(METRICS_STRATEGY_COMPONENTS.getValue())
+            .required(true)
+            .build();
+
     public static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder()
-            .name("Send JVM-metrics")
-            .description("Send JVM-metrics in addition to the Nifi-metrics")
+            .name("prometheus-reporting-task-metrics-send-jvm")
+            .displayName("Send JVM metrics")
+            .description("Send JVM metrics in addition to the NiFi metrics")
             .allowableValues("true", "false")
             .defaultValue("false")
             .required(true)
             .build();
 
     public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
-            .name("SSL Context Service")
+            .name("prometheus-reporting-task-ssl-context")
+            .displayName("SSL Context Service")
             .description("The SSL Context Service to use in order to secure the server. If specified, the server will"
                     + "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
             .required(false)
@@ -93,7 +110,8 @@ public class PrometheusReportingTask extends AbstractReportingTask {
             .build();
 
     public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
-            .name("Client Authentication")
+            .name("prometheus-reporting-task-client-auth")
+            .displayName("Client Authentication")
             .description("Specifies whether or not the Reporting Task should authenticate clients. This value is ignored if the <SSL Context Service> "
                     + "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.")
             .required(true)
@@ -107,6 +125,7 @@ public class PrometheusReportingTask extends AbstractReportingTask {
         List<PropertyDescriptor> props = new ArrayList<>();
         props.add(METRICS_ENDPOINT_PORT);
         props.add(INSTANCE_ID);
+        props.add(METRICS_STRATEGY);
         props.add(SEND_JVM_METRICS);
         props.add(SSL_CONTEXT);
         props.add(CLIENT_AUTH);
@@ -165,5 +184,6 @@ public class PrometheusReportingTask extends AbstractReportingTask {
     @Override
     public void onTrigger(final ReportingContext context) {
         this.prometheusServer.setReportingContext(context);
+        this.prometheusServer.setMetricsStrategy(context.getProperty(METRICS_STRATEGY).getValue());
     }
 }
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
index acaa447..76ef126 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
@@ -54,6 +54,7 @@ public class PrometheusServer {
     private Server server;
     private ServletContextHandler handler;
     private ReportingContext context;
+    private String metricsStrategy;
     private boolean sendJvmMetrics;
     private String instanceId;
 
@@ -70,7 +71,8 @@ public class PrometheusServer {
             rootGroupStatus = PrometheusServer.this.context.getEventAccess().getControllerStatus();
             ServletOutputStream response = resp.getOutputStream();
             OutputStreamWriter osw = new OutputStreamWriter(response);
-            nifiRegistry = PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, PrometheusServer.this.instanceId);
+
+            nifiRegistry = PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, PrometheusServer.this.instanceId, "", "RootProcessGroup", metricsStrategy);
             TextFormat.write004(osw, nifiRegistry.metricFamilySamples());
 
             if (PrometheusServer.this.sendJvmMetrics == true) {
@@ -167,4 +169,11 @@ public class PrometheusServer {
         this.instanceId = iid;
     }
 
+    public String getMetricsStrategy() {
+        return metricsStrategy;
+    }
+
+    public void setMetricsStrategy(String metricsStrategy) {
+        this.metricsStrategy = metricsStrategy;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
index ebd0102..f4fa642 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
@@ -17,9 +17,11 @@
 
 package org.apache.nifi.reporting.prometheus.api;
 
-import java.util.Collection;
 import java.util.Map;
 
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 
@@ -27,100 +29,116 @@ import com.yammer.metrics.core.VirtualMachineMetrics;
 
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.TransmissionStatus;
 
 public class PrometheusMetricsUtil {
+
+    public static final AllowableValue METRICS_STRATEGY_ROOT = new AllowableValue("Root Process Group", "Root Process Group",
+            "Send rollup metrics for the entire root process group");
+    public static final AllowableValue METRICS_STRATEGY_PG = new AllowableValue("All Process Groups", "All Process Groups",
+            "Send metrics for each process group");
+    public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components",
+            "Send metrics for each component in the system, to include processors, connections, controller services, etc.");
+
     private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
     private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
 
+    // Process Group metrics
     private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build()
-            .name("nifi_process_group_amount_flowfiles_sent")
-            .help("Total number of FlowFiles in ProcessGroup sent")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_flowfiles_sent")
+            .help("Total number of FlowFiles sent by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
             .register(NIFI_REGISTRY);
 
     private static final Gauge AMOUNT_FLOWFILES_TRANSFERRED = Gauge.build()
-            .name("nifi_process_group_amount_flowfiles_transferred")
-            .help("Total number of FlowFiles in ProcessGroup transferred")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_flowfiles_transferred")
+            .help("Total number of FlowFiles transferred by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
             .register(NIFI_REGISTRY);
 
     private static final Gauge AMOUNT_FLOWFILES_RECEIVED = Gauge.build()
-            .name("nifi_process_group_amount_flowfiles_received")
-            .help("Total number of FlowFiles in ProcessGroup received")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_flowfiles_received")
+            .help("Total number of FlowFiles received by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
             .register(NIFI_REGISTRY);
 
     private static final Gauge AMOUNT_BYTES_SENT = Gauge.build()
-            .name("nifi_process_group_amount_bytes_sent")
-            .help("Total number of Bytes in ProcessGroup sent")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_bytes_sent")
+            .help("Total number of bytes sent by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
             .register(NIFI_REGISTRY);
 
     private static final Gauge AMOUNT_BYTES_READ = Gauge.build()
-            .name("nifi_process_group_amount_bytes_read")
-            .help("Total number of Bytes in ProcessGroup read")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_bytes_read")
+            .help("Total number of bytes read by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
             .register(NIFI_REGISTRY);
 
     private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build()
-            .name("nifi_process_group_amount_bytes_written")
-            .help("Total number of Bytes in ProcessGroup written")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_bytes_written")
+            .help("Total number of bytes written by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
             .register(NIFI_REGISTRY);
 
     private static final Gauge AMOUNT_BYTES_RECEIVED = Gauge.build()
-            .name("nifi_process_group_amount_bytes_received")
-            .help("Total number of Bytes in ProcessGroup received")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_bytes_received")
+            .help("Total number of bytes received by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
             .register(NIFI_REGISTRY);
 
     private static final Gauge AMOUNT_BYTES_TRANSFERRED = Gauge.build()
-            .name("nifi_process_group_amount_bytes_transferred")
-            .help("Total number of Bytes in ProcessGroup transferred")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_bytes_transferred")
+            .help("Total number of Bytes transferred by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
             .register(NIFI_REGISTRY);
 
-
     private static final Gauge AMOUNT_THREADS_TOTAL_ACTIVE = Gauge.build()
-            .name("nifi_process_group_amount_threads_active")
-            .help("Total number of threads in ProcessGroup active")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_threads_active")
+            .help("Total number of threads active for the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
             .register(NIFI_REGISTRY);
 
     private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build()
-            .name("nifi_process_group_size_content_output_total")
-            .help("Total size of content output in ProcessGroup")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_size_content_output_total")
+            .help("Total size of content output by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
             .register(NIFI_REGISTRY);
 
     private static final Gauge SIZE_CONTENT_INPUT_TOTAL = Gauge.build()
-            .name("nifi_process_group_size_content_input_total")
-            .help("Total size of content input in ProcessGroup")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_size_content_input_total")
+            .help("Total size of content input by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
             .register(NIFI_REGISTRY);
 
     private static final Gauge SIZE_CONTENT_QUEUED_TOTAL = Gauge.build()
-            .name("nifi_process_group_size_content_queued_total")
-            .help("Total size of content queued in ProcessGroup")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_size_content_queued_total")
+            .help("Total size of content queued in the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
             .register(NIFI_REGISTRY);
 
     private static final Gauge AMOUNT_ITEMS_OUTPUT = Gauge.build()
-            .name("nifi_process_group_amount_items_output")
-            .help("Total amount of items in ProcessGroup output")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_items_output")
+            .help("Total number of items output by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
             .register(NIFI_REGISTRY);
 
     private static final Gauge AMOUNT_ITEMS_INPUT = Gauge.build()
-            .name("nifi_process_group_amount_items_input")
-            .help("Total amount of items in ProcessGroup input")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_items_input")
+            .help("Total number of items input by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
             .register(NIFI_REGISTRY);
 
     private static final Gauge AMOUNT_ITEMS_QUEUED = Gauge.build()
-            .name("nifi_process_group_amount_items_queued")
-            .help("Total amount of items in ProcessGroup queued")
-            .labelNames("instance", "process_group_name", "process_group_id")
+            .name("nifi_amount_items_queued")
+            .help("Total number of items queued by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
             .register(NIFI_REGISTRY);
 
     private static final Gauge PROCESSOR_COUNTERS = Gauge.build()
@@ -129,6 +147,60 @@ public class PrometheusMetricsUtil {
             .labelNames("processor_name", "counter_name", "processor_id", "instance")
             .register(NIFI_REGISTRY);
 
+    // Connection metrics
+    private static final Gauge BACKPRESSURE_BYTES_THRESHOLD = Gauge.build()
+            .name("nifi_backpressure_bytes_threshold")
+            .help("The number of bytes that can be queued before backpressure is applied")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge BACKPRESSURE_OBJECT_THRESHOLD = Gauge.build()
+            .name("nifi_backpressure_object_threshold")
+            .help("The number of flow files that can be queued before backpressure is applied")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge IS_BACKPRESSURE_ENABLED = Gauge.build()
+            .name("nifi_backpressure_enabled")
+            .help("Whether backpressure has been applied for this component. Values are 0 or 1")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    // Port metrics
+    private static final Gauge IS_TRANSMITTING = Gauge.build()
+            .name("nifi_transmitting")
+            .help("Whether this component is transmitting data. Values are 0 or 1")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "run_status")
+            .register(NIFI_REGISTRY);
+
+    // Remote Process Group (RPG) metrics
+    private static final Gauge ACTIVE_REMOTE_PORT_COUNT = Gauge.build()
+            .name("nifi_active_remote_port_count")
+            .help("The number of active remote ports associated with this component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge INACTIVE_REMOTE_PORT_COUNT = Gauge.build()
+            .name("nifi_inactive_remote_port_count")
+            .help("The number of inactive remote ports associated with this component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AVERAGE_LINEAGE_DURATION = Gauge.build()
+            .name("nifi_average_lineage_duration")
+            .help("The average lineage duration (in milliseconds) for all flow file processed by this component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    ///////////////////////////////////////////////////////////////
+    // JVM Metrics
+    ///////////////////////////////////////////////////////////////
     private static final Gauge JVM_HEAP_USED = Gauge.build()
             .name("nifi_jvm_heap_used")
             .help("NiFi JVM heap used")
@@ -171,45 +243,159 @@ public class PrometheusMetricsUtil {
             .labelNames("instance")
             .register(JVM_REGISTRY);
 
-    public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId) {
+    public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) {
+
+        final String componentId = status.getId();
+        final String componentName = status.getName();
+
+        AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent());
+        AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred());
+        AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesReceived());
+
+        AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent());
+        AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead());
+        AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten());
+        AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived());
+        AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred());
+
+        SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+                .set(status.getOutputContentSize());
+        SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+                .set(status.getInputContentSize());
+        SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+                .set(status.getQueuedContentSize());
 
-        final String processGroupId = status.getId();
-        final String processGroupName = status.getName();
-        Collection<ProcessorStatus> processorStatus = status.getProcessorStatus();
+        AMOUNT_ITEMS_OUTPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+                .set(status.getOutputCount());
+        AMOUNT_ITEMS_INPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+                .set(status.getInputCount());
+        AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "")
+                .set(status.getQueuedCount());
 
-        AMOUNT_FLOWFILES_SENT.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesSent());
-        AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesTransferred());
-        AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesReceived());
+        AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getActiveThreadCount());
 
-        AMOUNT_BYTES_SENT.labels(instanceId, processGroupName, processGroupId).set(status.getBytesSent());
-        AMOUNT_BYTES_READ.labels(instanceId, processGroupName, processGroupId).set(status.getBytesRead());
-        AMOUNT_BYTES_WRITTEN.labels(instanceId, processGroupName, processGroupId).set(status.getBytesWritten());
-        AMOUNT_BYTES_RECEIVED.labels(instanceId, processGroupName, processGroupId).set(status.getBytesReceived());
-        AMOUNT_BYTES_TRANSFERRED.labels(instanceId, processGroupName, processGroupId).set(status.getBytesTransferred());
+        // Report metrics for child process groups if specified
+        if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
+            status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, parentPGId, "ProcessGroup", metricsStrategy));
+        }
 
-        SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getOutputContentSize());
-        SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getInputContentSize());
-        SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getQueuedContentSize());
+        if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
+            // Report metrics for all components
+            for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
+                Map<String, Long> counters = processorStatus.getCounters();
 
-        AMOUNT_ITEMS_OUTPUT.labels(instanceId, processGroupName, processGroupId).set(status.getOutputCount());
-        AMOUNT_ITEMS_INPUT.labels(instanceId, processGroupName, processGroupId).set(status.getInputCount());
-        AMOUNT_ITEMS_QUEUED.labels(instanceId, processGroupName, processGroupId).set(status.getQueuedCount());
+                if(counters != null) {
+                    counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
+                            .labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue()));
+                }
+            }
+            for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
+                final String connComponentId = connectionStatus.getId();
+                final String connComponentName = connectionStatus.getName();
+                final String sourceId = connectionStatus.getSourceId();
+                final String sourceName = connectionStatus.getSourceName();
+                final String destinationId = connectionStatus.getDestinationId();
+                final String destinationName = connectionStatus.getDestinationName();
+                final String parentId = connectionStatus.getGroupId();
+                final String connComponentType = "Connection";
+                SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getOutputBytes());
+                SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getInputBytes());
+                SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getQueuedBytes());
+
+                AMOUNT_ITEMS_OUTPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getOutputCount());
+                AMOUNT_ITEMS_INPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getInputCount());
+                AMOUNT_ITEMS_QUEUED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getQueuedCount());
+
+                BACKPRESSURE_BYTES_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getBackPressureBytesThreshold());
+                BACKPRESSURE_OBJECT_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getBackPressureObjectThreshold());
+                boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount())
+                        || (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getMaxQueuedBytes());
+                IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(isBackpressureEnabled ? 1 : 0);
+            }
+            for(PortStatus portStatus : status.getInputPortStatus()) {
+                final String portComponentId = portStatus.getId();
+                final String portComponentName = portStatus.getName();
+                final String parentId = portStatus.getGroupId();
+                final String portComponentType = "InputPort";
+                AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
+                AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived());
+
+                AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
+                AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
+                AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+                AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
+
+                AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+                        .set(portStatus.getOutputCount());
+                AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+                        .set(portStatus.getInputCount());
+
+                final Boolean isTransmitting = portStatus.isTransmitting();
+                IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
+                        .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
+
+                AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
+            }
+            for(PortStatus portStatus : status.getOutputPortStatus()) {
+                final String portComponentId = portStatus.getId();
+                final String portComponentName = portStatus.getName();
+                final String parentId = portStatus.getGroupId();
+                final String portComponentType = "OutputPort";
+                AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
+                AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId).set(portStatus.getFlowFilesReceived());
+
+                AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
+                AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
+                AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+                AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
+
+                AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+                        .set(portStatus.getOutputCount());
+                AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+                        .set(portStatus.getInputCount());
+
+                final Boolean isTransmitting = portStatus.isTransmitting();
+                IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
+                        .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
+
+                AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
+            }
+            for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
+                final String rpgComponentId = remoteProcessGroupStatus.getId();
+                final String rpgComponentName = remoteProcessGroupStatus.getName();
+                final String parentId = remoteProcessGroupStatus.getGroupId();
+                final String rpgComponentType = "RemoteProcessGroup";
+
+                AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize());
+                AMOUNT_BYTES_RECEIVED.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getReceivedContentSize());
 
+                AMOUNT_ITEMS_OUTPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
+                        .set(remoteProcessGroupStatus.getSentCount());
+                AMOUNT_ITEMS_INPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
+                        .set(remoteProcessGroupStatus.getReceivedCount());
 
+                ACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveRemotePortCount());
+                INACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getInactiveRemotePortCount());
 
-        AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, processGroupName, processGroupId).set(status.getActiveThreadCount());
+                AVERAGE_LINEAGE_DURATION.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getAverageLineageDuration());
 
-        for (ProcessorStatus pstatus : processorStatus) {
-            Map<String, Long> counters = pstatus.getCounters();
+                IS_TRANSMITTING.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name())
+                        .set(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0);
 
-            if(counters != null) {
-                counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
-                        .labels(pstatus.getName(), entry.getKey(), pstatus.getId(), instanceId).set(entry.getValue()));
+                AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveThreadCount());
             }
         }
 
         return NIFI_REGISTRY;
-
     }
 
     public static CollectorRegistry createJvmMetrics(VirtualMachineMetrics jvmMetrics, String instanceId) {
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
index bba7a8b..8cb18b2 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
@@ -103,9 +103,9 @@ public class TestPrometheusReportingTask {
         HttpEntity entity = response.getEntity();
         String content = EntityUtils.toString(entity);
         Assert.assertEquals(true, content.contains(
-                "nifi_process_group_amount_flowfiles_received{instance=\"localhost\",process_group_name=\"root\",process_group_id=\"1234\",} 5.0"));
+                "nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
         Assert.assertEquals(true, content.contains(
-                "nifi_process_group_amount_threads_active{instance=\"localhost\",process_group_name=\"root\",process_group_id=\"1234\",} 5.0"));
+                "nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
     }
 
 }