You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ym...@apache.org on 2019/07/11 03:46:23 UTC

[nifi] branch master updated: NIFI-5417: Add missing component status and metrics to S2SStatusReportingTask and PrometheusReportingTask (#3554)

This is an automated email from the ASF dual-hosted git repository.

ymdavis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ccc346  NIFI-5417: Add missing component status and metrics to S2SStatusReportingTask and PrometheusReportingTask (#3554)
0ccc346 is described below

commit 0ccc346aec1828d0522631247b3ef22de6a8cee2
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Jul 10 23:46:13 2019 -0400

    NIFI-5417: Add missing component status and metrics to S2SStatusReportingTask and PrometheusReportingTask (#3554)
    
    * NIFI-5417: Add missing component status and metrics to S2SReportingTask and PrometheusReportingTask
    
    * NIFI-5417: Added executionNode to schema and doc
    
    This closes #3554
    
    Signed-off-by: Yolanda M. Davis <yo...@gmail.com>
---
 .../prometheus/api/PrometheusMetricsUtil.java      | 849 +++++++++++----------
 .../reporting/SiteToSiteStatusReportingTask.java   |  35 +
 .../additionalDetails.html                         |  15 +-
 .../src/main/resources/schema-status.avsc          |  15 +-
 .../TestSiteToSiteStatusReportingTask.java         | 104 ++-
 5 files changed, 597 insertions(+), 421 deletions(-)

diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
index 6227260..b894bee 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
@@ -1,415 +1,434 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.reporting.prometheus.api;
-
-import java.util.Map;
-
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.controller.status.ConnectionStatus;
-import org.apache.nifi.controller.status.PortStatus;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-
-import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.Gauge;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.TransmissionStatus;
-import org.apache.nifi.metrics.jvm.JvmMetrics;
-import org.apache.nifi.processor.DataUnit;
-
-public class PrometheusMetricsUtil {
-
-    public static final AllowableValue METRICS_STRATEGY_ROOT = new AllowableValue("Root Process Group", "Root Process Group",
-            "Send rollup metrics for the entire root process group");
-    public static final AllowableValue METRICS_STRATEGY_PG = new AllowableValue("All Process Groups", "All Process Groups",
-            "Send metrics for each process group");
-    public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components",
-            "Send metrics for each component in the system, to include processors, connections, controller services, etc.");
-
-    private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
-    private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
-
-    // Process Group metrics
-    private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build()
-            .name("nifi_amount_flowfiles_sent")
-            .help("Total number of FlowFiles sent by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AMOUNT_FLOWFILES_TRANSFERRED = Gauge.build()
-            .name("nifi_amount_flowfiles_transferred")
-            .help("Total number of FlowFiles transferred by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AMOUNT_FLOWFILES_RECEIVED = Gauge.build()
-            .name("nifi_amount_flowfiles_received")
-            .help("Total number of FlowFiles received by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AMOUNT_BYTES_SENT = Gauge.build()
-            .name("nifi_amount_bytes_sent")
-            .help("Total number of bytes sent by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AMOUNT_BYTES_READ = Gauge.build()
-            .name("nifi_amount_bytes_read")
-            .help("Total number of bytes read by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build()
-            .name("nifi_amount_bytes_written")
-            .help("Total number of bytes written by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AMOUNT_BYTES_RECEIVED = Gauge.build()
-            .name("nifi_amount_bytes_received")
-            .help("Total number of bytes received by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AMOUNT_BYTES_TRANSFERRED = Gauge.build()
-            .name("nifi_amount_bytes_transferred")
-            .help("Total number of Bytes transferred by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AMOUNT_THREADS_TOTAL_ACTIVE = Gauge.build()
-            .name("nifi_amount_threads_active")
-            .help("Total number of threads active for the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build()
-            .name("nifi_size_content_output_total")
-            .help("Total size of content output by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge SIZE_CONTENT_INPUT_TOTAL = Gauge.build()
-            .name("nifi_size_content_input_total")
-            .help("Total size of content input by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge SIZE_CONTENT_QUEUED_TOTAL = Gauge.build()
-            .name("nifi_size_content_queued_total")
-            .help("Total size of content queued in the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AMOUNT_ITEMS_OUTPUT = Gauge.build()
-            .name("nifi_amount_items_output")
-            .help("Total number of items output by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AMOUNT_ITEMS_INPUT = Gauge.build()
-            .name("nifi_amount_items_input")
-            .help("Total number of items input by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AMOUNT_ITEMS_QUEUED = Gauge.build()
-            .name("nifi_amount_items_queued")
-            .help("Total number of items queued by the component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge PROCESSOR_COUNTERS = Gauge.build()
-            .name("nifi_processor_counters")
-            .help("Counters exposed by NiFi Processors")
-            .labelNames("processor_name", "counter_name", "processor_id", "instance")
-            .register(NIFI_REGISTRY);
-
-    // Connection metrics
-    private static final Gauge BACKPRESSURE_BYTES_THRESHOLD = Gauge.build()
-            .name("nifi_backpressure_bytes_threshold")
-            .help("The number of bytes that can be queued before backpressure is applied")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge BACKPRESSURE_OBJECT_THRESHOLD = Gauge.build()
-            .name("nifi_backpressure_object_threshold")
-            .help("The number of flow files that can be queued before backpressure is applied")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge IS_BACKPRESSURE_ENABLED = Gauge.build()
-            .name("nifi_backpressure_enabled")
-            .help("Whether backpressure has been applied for this component. Values are 0 or 1")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    // Port metrics
-    private static final Gauge IS_TRANSMITTING = Gauge.build()
-            .name("nifi_transmitting")
-            .help("Whether this component is transmitting data. Values are 0 or 1")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "run_status")
-            .register(NIFI_REGISTRY);
-
-    // Remote Process Group (RPG) metrics
-    private static final Gauge ACTIVE_REMOTE_PORT_COUNT = Gauge.build()
-            .name("nifi_active_remote_port_count")
-            .help("The number of active remote ports associated with this component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge INACTIVE_REMOTE_PORT_COUNT = Gauge.build()
-            .name("nifi_inactive_remote_port_count")
-            .help("The number of inactive remote ports associated with this component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    private static final Gauge AVERAGE_LINEAGE_DURATION = Gauge.build()
-            .name("nifi_average_lineage_duration")
-            .help("The average lineage duration (in milliseconds) for all flow file processed by this component")
-            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
-                    "source_id", "source_name", "destination_id", "destination_name")
-            .register(NIFI_REGISTRY);
-
-    ///////////////////////////////////////////////////////////////
-    // JVM Metrics
-    ///////////////////////////////////////////////////////////////
-    private static final Gauge JVM_HEAP_USED = Gauge.build()
-            .name("nifi_jvm_heap_used")
-            .help("NiFi JVM heap used")
-            .labelNames("instance")
-            .register(JVM_REGISTRY);
-
-    private static final Gauge JVM_HEAP_USAGE = Gauge.build()
-            .name("nifi_jvm_heap_usage")
-            .help("NiFi JVM heap usage")
-            .labelNames("instance")
-            .register(JVM_REGISTRY);
-
-    private static final Gauge JVM_HEAP_NON_USAGE = Gauge.build()
-            .name("nifi_jvm_heap_non_usage")
-            .help("NiFi JVM heap non usage")
-            .labelNames("instance")
-            .register(JVM_REGISTRY);
-
-    private static final Gauge JVM_THREAD_COUNT = Gauge.build()
-            .name("nifi_jvm_thread_count")
-            .help("NiFi JVM thread count")
-            .labelNames("instance")
-            .register(JVM_REGISTRY);
-
-    private static final Gauge JVM_DAEMON_THREAD_COUNT = Gauge.build()
-            .name("nifi_jvm_daemon_thread_count")
-            .help("NiFi JVM daemon thread count")
-            .labelNames("instance")
-            .register(JVM_REGISTRY);
-
-    private static final Gauge JVM_UPTIME = Gauge.build()
-            .name("nifi_jvm_uptime")
-            .help("NiFi JVM uptime")
-            .labelNames("instance")
-            .register(JVM_REGISTRY);
-
-    private static final Gauge JVM_FILE_DESCRIPTOR_USAGE = Gauge.build()
-            .name("nifi_jvm_file_descriptor_usage")
-            .help("NiFi JVM file descriptor usage")
-            .labelNames("instance")
-            .register(JVM_REGISTRY);
-
-    public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) {
-
-        final String componentId = status.getId();
-        final String componentName = status.getName();
-
-        AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent());
-        AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred());
-        AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesReceived());
-
-        AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent());
-        AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead());
-        AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten());
-        AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived());
-        AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred());
-
-        SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
-                .set(status.getOutputContentSize());
-        SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
-                .set(status.getInputContentSize());
-        SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
-                .set(status.getQueuedContentSize());
-
-        AMOUNT_ITEMS_OUTPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
-                .set(status.getOutputCount());
-        AMOUNT_ITEMS_INPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
-                .set(status.getInputCount());
-        AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "")
-                .set(status.getQueuedCount());
-
-        AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getActiveThreadCount());
-
-        // Report metrics for child process groups if specified
-        if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
-            status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, parentPGId, "ProcessGroup", metricsStrategy));
-        }
-
-        if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
-            // Report metrics for all components
-            for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
-                Map<String, Long> counters = processorStatus.getCounters();
-
-                if(counters != null) {
-                    counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
-                            .labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue()));
-                }
-            }
-            for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
-                final String connComponentId = connectionStatus.getId();
-                final String connComponentName = connectionStatus.getName();
-                final String sourceId = connectionStatus.getSourceId();
-                final String sourceName = connectionStatus.getSourceName();
-                final String destinationId = connectionStatus.getDestinationId();
-                final String destinationName = connectionStatus.getDestinationName();
-                final String parentId = connectionStatus.getGroupId();
-                final String connComponentType = "Connection";
-                SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
-                        .set(connectionStatus.getOutputBytes());
-                SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
-                        .set(connectionStatus.getInputBytes());
-                SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
-                        .set(connectionStatus.getQueuedBytes());
-
-                AMOUNT_ITEMS_OUTPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
-                        .set(connectionStatus.getOutputCount());
-                AMOUNT_ITEMS_INPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
-                        .set(connectionStatus.getInputCount());
-                AMOUNT_ITEMS_QUEUED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
-                        .set(connectionStatus.getQueuedCount());
-
-                BACKPRESSURE_BYTES_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
-                        .set(connectionStatus.getBackPressureBytesThreshold());
-                BACKPRESSURE_OBJECT_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
-                        .set(connectionStatus.getBackPressureObjectThreshold());
-                boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount())
-                        || (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getMaxQueuedBytes());
-                IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
-                        .set(isBackpressureEnabled ? 1 : 0);
-            }
-            for(PortStatus portStatus : status.getInputPortStatus()) {
-                final String portComponentId = portStatus.getId();
-                final String portComponentName = portStatus.getName();
-                final String parentId = portStatus.getGroupId();
-                final String portComponentType = "InputPort";
-                AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
-                AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived());
-
-                AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
-                AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
-                AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
-                AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
-
-                AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
-                        .set(portStatus.getOutputCount());
-                AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
-                        .set(portStatus.getInputCount());
-
-                final Boolean isTransmitting = portStatus.isTransmitting();
-                IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
-                        .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
-
-                AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
-            }
-            for(PortStatus portStatus : status.getOutputPortStatus()) {
-                final String portComponentId = portStatus.getId();
-                final String portComponentName = portStatus.getName();
-                final String parentId = portStatus.getGroupId();
-                final String portComponentType = "OutputPort";
-                AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
-                AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId).set(portStatus.getFlowFilesReceived());
-
-                AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
-                AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
-                AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
-                AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
-
-                AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
-                        .set(portStatus.getOutputCount());
-                AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
-                        .set(portStatus.getInputCount());
-
-                final Boolean isTransmitting = portStatus.isTransmitting();
-                IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
-                        .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
-
-                AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
-            }
-            for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
-                final String rpgComponentId = remoteProcessGroupStatus.getId();
-                final String rpgComponentName = remoteProcessGroupStatus.getName();
-                final String parentId = remoteProcessGroupStatus.getGroupId();
-                final String rpgComponentType = "RemoteProcessGroup";
-
-                AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize());
-                AMOUNT_BYTES_RECEIVED.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getReceivedContentSize());
-
-                AMOUNT_ITEMS_OUTPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
-                        .set(remoteProcessGroupStatus.getSentCount());
-                AMOUNT_ITEMS_INPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
-                        .set(remoteProcessGroupStatus.getReceivedCount());
-
-                ACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveRemotePortCount());
-                INACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getInactiveRemotePortCount());
-
-                AVERAGE_LINEAGE_DURATION.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getAverageLineageDuration());
-
-                IS_TRANSMITTING.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name())
-                        .set(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0);
-
-                AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveThreadCount());
-            }
-        }
-
-        return NIFI_REGISTRY;
-    }
-
-    public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instanceId) {
-        JVM_HEAP_USED.labels(instanceId).set(jvmMetrics.heapUsed(DataUnit.B));
-        JVM_HEAP_USAGE.labels(instanceId).set(jvmMetrics.heapUsage());
-        JVM_HEAP_NON_USAGE.labels(instanceId).set(jvmMetrics.nonHeapUsage());
-
-        JVM_THREAD_COUNT.labels(instanceId).set(jvmMetrics.threadCount());
-        JVM_DAEMON_THREAD_COUNT.labels(instanceId).set(jvmMetrics.daemonThreadCount());
-
-        JVM_UPTIME.labels(instanceId).set(jvmMetrics.uptime());
-        JVM_FILE_DESCRIPTOR_USAGE.labels(instanceId).set(jvmMetrics.fileDescriptorUsage());
-
-        return JVM_REGISTRY;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting.prometheus.api;
+
+import java.util.Map;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Gauge;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.metrics.jvm.JvmMetrics;
+import org.apache.nifi.processor.DataUnit;
+
+public class PrometheusMetricsUtil {
+
+    public static final AllowableValue METRICS_STRATEGY_ROOT = new AllowableValue("Root Process Group", "Root Process Group",
+            "Send rollup metrics for the entire root process group");
+    public static final AllowableValue METRICS_STRATEGY_PG = new AllowableValue("All Process Groups", "All Process Groups",
+            "Send metrics for each process group");
+    public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components",
+            "Send metrics for each component in the system, to include processors, connections, controller services, etc.");
+
+    private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
+    private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
+
+    // Process Group metrics
+    private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build()
+            .name("nifi_amount_flowfiles_sent")
+            .help("Total number of FlowFiles sent by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_FLOWFILES_TRANSFERRED = Gauge.build()
+            .name("nifi_amount_flowfiles_transferred")
+            .help("Total number of FlowFiles transferred by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_FLOWFILES_RECEIVED = Gauge.build()
+            .name("nifi_amount_flowfiles_received")
+            .help("Total number of FlowFiles received by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_BYTES_SENT = Gauge.build()
+            .name("nifi_amount_bytes_sent")
+            .help("Total number of bytes sent by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_BYTES_READ = Gauge.build()
+            .name("nifi_amount_bytes_read")
+            .help("Total number of bytes read by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build()
+            .name("nifi_amount_bytes_written")
+            .help("Total number of bytes written by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_BYTES_RECEIVED = Gauge.build()
+            .name("nifi_amount_bytes_received")
+            .help("Total number of bytes received by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_BYTES_TRANSFERRED = Gauge.build()
+            .name("nifi_amount_bytes_transferred")
+            .help("Total number of Bytes transferred by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_THREADS_TOTAL_ACTIVE = Gauge.build()
+            .name("nifi_amount_threads_active")
+            .help("Total number of threads active for the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_THREADS_TOTAL_TERMINATED = Gauge.build()
+            .name("nifi_amount_threads_terminated")
+            .help("Total number of threads terminated for the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build()
+            .name("nifi_size_content_output_total")
+            .help("Total size of content output by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge SIZE_CONTENT_INPUT_TOTAL = Gauge.build()
+            .name("nifi_size_content_input_total")
+            .help("Total size of content input by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge SIZE_CONTENT_QUEUED_TOTAL = Gauge.build()
+            .name("nifi_size_content_queued_total")
+            .help("Total size of content queued in the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_ITEMS_OUTPUT = Gauge.build()
+            .name("nifi_amount_items_output")
+            .help("Total number of items output by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_ITEMS_INPUT = Gauge.build()
+            .name("nifi_amount_items_input")
+            .help("Total number of items input by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AMOUNT_ITEMS_QUEUED = Gauge.build()
+            .name("nifi_amount_items_queued")
+            .help("Total number of items queued by the component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge PROCESSOR_COUNTERS = Gauge.build()
+            .name("nifi_processor_counters")
+            .help("Counters exposed by NiFi Processors")
+            .labelNames("processor_name", "counter_name", "processor_id", "instance")
+            .register(NIFI_REGISTRY);
+
+    // Connection metrics
+    private static final Gauge BACKPRESSURE_BYTES_THRESHOLD = Gauge.build()
+            .name("nifi_backpressure_bytes_threshold")
+            .help("The number of bytes that can be queued before backpressure is applied")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge BACKPRESSURE_OBJECT_THRESHOLD = Gauge.build()
+            .name("nifi_backpressure_object_threshold")
+            .help("The number of flow files that can be queued before backpressure is applied")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge IS_BACKPRESSURE_ENABLED = Gauge.build()
+            .name("nifi_backpressure_enabled")
+            .help("Whether backpressure has been applied for this component. Values are 0 or 1")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    // Port metrics
+    private static final Gauge IS_TRANSMITTING = Gauge.build()
+            .name("nifi_transmitting")
+            .help("Whether this component is transmitting data. Values are 0 or 1")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "run_status")
+            .register(NIFI_REGISTRY);
+
+    // Remote Process Group (RPG) metrics
+    private static final Gauge ACTIVE_REMOTE_PORT_COUNT = Gauge.build()
+            .name("nifi_active_remote_port_count")
+            .help("The number of active remote ports associated with this component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge INACTIVE_REMOTE_PORT_COUNT = Gauge.build()
+            .name("nifi_inactive_remote_port_count")
+            .help("The number of inactive remote ports associated with this component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    private static final Gauge AVERAGE_LINEAGE_DURATION = Gauge.build()
+            .name("nifi_average_lineage_duration")
+            .help("The average lineage duration (in milliseconds) for all flow file processed by this component")
+            .labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
+                    "source_id", "source_name", "destination_id", "destination_name")
+            .register(NIFI_REGISTRY);
+
+    ///////////////////////////////////////////////////////////////
+    // JVM Metrics
+    ///////////////////////////////////////////////////////////////
+    private static final Gauge JVM_HEAP_USED = Gauge.build()
+            .name("nifi_jvm_heap_used")
+            .help("NiFi JVM heap used")
+            .labelNames("instance")
+            .register(JVM_REGISTRY);
+
+    private static final Gauge JVM_HEAP_USAGE = Gauge.build()
+            .name("nifi_jvm_heap_usage")
+            .help("NiFi JVM heap usage")
+            .labelNames("instance")
+            .register(JVM_REGISTRY);
+
+    private static final Gauge JVM_HEAP_NON_USAGE = Gauge.build()
+            .name("nifi_jvm_heap_non_usage")
+            .help("NiFi JVM heap non usage")
+            .labelNames("instance")
+            .register(JVM_REGISTRY);
+
+    private static final Gauge JVM_THREAD_COUNT = Gauge.build()
+            .name("nifi_jvm_thread_count")
+            .help("NiFi JVM thread count")
+            .labelNames("instance")
+            .register(JVM_REGISTRY);
+
+    private static final Gauge JVM_DAEMON_THREAD_COUNT = Gauge.build()
+            .name("nifi_jvm_daemon_thread_count")
+            .help("NiFi JVM daemon thread count")
+            .labelNames("instance")
+            .register(JVM_REGISTRY);
+
+    private static final Gauge JVM_UPTIME = Gauge.build()
+            .name("nifi_jvm_uptime")
+            .help("NiFi JVM uptime")
+            .labelNames("instance")
+            .register(JVM_REGISTRY);
+
+    private static final Gauge JVM_FILE_DESCRIPTOR_USAGE = Gauge.build()
+            .name("nifi_jvm_file_descriptor_usage")
+            .help("NiFi JVM file descriptor usage")
+            .labelNames("instance")
+            .register(JVM_REGISTRY);
+
+    public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) {
+
+        final String componentId = status.getId();
+        final String componentName = status.getName();
+
+        AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent());
+        AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred());
+        AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesReceived());
+
+        AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent());
+        AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead());
+        AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten());
+        AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived());
+        AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred());
+
+        SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+                .set(status.getOutputContentSize());
+        SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+                .set(status.getInputContentSize());
+        SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+                .set(status.getQueuedContentSize());
+
+        AMOUNT_ITEMS_OUTPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+                .set(status.getOutputCount());
+        AMOUNT_ITEMS_INPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
+                .set(status.getInputCount());
+        AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "")
+                .set(status.getQueuedCount());
+
+        AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId)
+                .set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount());
+        AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, componentType, componentName, componentId, parentPGId)
+                .set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount());
+
+        // Report metrics for child process groups if specified
+        if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
+            status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, parentPGId, "ProcessGroup", metricsStrategy));
+        }
+
+        if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
+            // Report metrics for all components
+            for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
+                Map<String, Long> counters = processorStatus.getCounters();
+
+                if(counters != null) {
+                    counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
+                            .labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue()));
+                }
+
+                final String procComponentType = "Processor";
+                final String procComponentId = processorStatus.getId();
+                final String procComponentName = processorStatus.getName();
+                final String parentId = processorStatus.getGroupId();
+                AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
+                        .set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount());
+                AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
+                        .set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount());
+
+            }
+            for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
+                final String connComponentId = connectionStatus.getId();
+                final String connComponentName = connectionStatus.getName();
+                final String sourceId = connectionStatus.getSourceId();
+                final String sourceName = connectionStatus.getSourceName();
+                final String destinationId = connectionStatus.getDestinationId();
+                final String destinationName = connectionStatus.getDestinationName();
+                final String parentId = connectionStatus.getGroupId();
+                final String connComponentType = "Connection";
+                SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getOutputBytes());
+                SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getInputBytes());
+                SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getQueuedBytes());
+
+                AMOUNT_ITEMS_OUTPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getOutputCount());
+                AMOUNT_ITEMS_INPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getInputCount());
+                AMOUNT_ITEMS_QUEUED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getQueuedCount());
+
+                BACKPRESSURE_BYTES_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getBackPressureBytesThreshold());
+                BACKPRESSURE_OBJECT_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(connectionStatus.getBackPressureObjectThreshold());
+                boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount())
+                        || (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getMaxQueuedBytes());
+                IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
+                        .set(isBackpressureEnabled ? 1 : 0);
+            }
+            for(PortStatus portStatus : status.getInputPortStatus()) {
+                final String portComponentId = portStatus.getId();
+                final String portComponentName = portStatus.getName();
+                final String parentId = portStatus.getGroupId();
+                final String portComponentType = "InputPort";
+                AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
+                AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived());
+
+                AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
+                AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
+                AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+                AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
+
+                AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+                        .set(portStatus.getOutputCount());
+                AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+                        .set(portStatus.getInputCount());
+
+                final Boolean isTransmitting = portStatus.isTransmitting();
+                IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
+                        .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
+
+                AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
+            }
+            for(PortStatus portStatus : status.getOutputPortStatus()) {
+                final String portComponentId = portStatus.getId();
+                final String portComponentName = portStatus.getName();
+                final String parentId = portStatus.getGroupId();
+                final String portComponentType = "OutputPort";
+                AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
+                AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId).set(portStatus.getFlowFilesReceived());
+
+                AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
+                AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
+                AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+                AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
+
+                AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+                        .set(portStatus.getOutputCount());
+                AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
+                        .set(portStatus.getInputCount());
+
+                final Boolean isTransmitting = portStatus.isTransmitting();
+                IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
+                        .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
+
+                AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
+            }
+            for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
+                final String rpgComponentId = remoteProcessGroupStatus.getId();
+                final String rpgComponentName = remoteProcessGroupStatus.getName();
+                final String parentId = remoteProcessGroupStatus.getGroupId();
+                final String rpgComponentType = "RemoteProcessGroup";
+
+                AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize());
+                AMOUNT_BYTES_RECEIVED.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getReceivedContentSize());
+
+                AMOUNT_ITEMS_OUTPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
+                        .set(remoteProcessGroupStatus.getSentCount());
+                AMOUNT_ITEMS_INPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
+                        .set(remoteProcessGroupStatus.getReceivedCount());
+
+                ACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveRemotePortCount());
+                INACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getInactiveRemotePortCount());
+
+                AVERAGE_LINEAGE_DURATION.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getAverageLineageDuration());
+
+                IS_TRANSMITTING.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name())
+                        .set(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0);
+
+                AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveThreadCount());
+            }
+        }
+
+        return NIFI_REGISTRY;
+    }
+
+    public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instanceId) {
+        JVM_HEAP_USED.labels(instanceId).set(jvmMetrics.heapUsed(DataUnit.B));
+        JVM_HEAP_USAGE.labels(instanceId).set(jvmMetrics.heapUsage());
+        JVM_HEAP_NON_USAGE.labels(instanceId).set(jvmMetrics.nonHeapUsage());
+
+        JVM_THREAD_COUNT.labels(instanceId).set(jvmMetrics.threadCount());
+        JVM_DAEMON_THREAD_COUNT.labels(instanceId).set(jvmMetrics.daemonThreadCount());
+
+        JVM_UPTIME.labels(instanceId).set(jvmMetrics.uptime());
+        JVM_FILE_DESCRIPTOR_USAGE.labels(instanceId).set(jvmMetrics.fileDescriptorUsage());
+
+        return JVM_REGISTRY;
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
index 618c40c..1c3f810 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
@@ -254,7 +254,9 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
             addField(builder, "outputCount", status.getOutputCount());
             addField(builder, "queuedContentSize", status.getQueuedContentSize());
             addField(builder, "activeThreadCount", status.getActiveThreadCount());
+            addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount());
             addField(builder, "queuedCount", status.getQueuedCount());
+            addField(builder, "versionedFlowState", status.getVersionedFlowState() == null ? null : status.getVersionedFlowState().name());
 
             arrayBuilder.add(builder.build());
         }
@@ -305,6 +307,8 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
             addField(builder, "sentContentSize", status.getSentContentSize());
             addField(builder, "sentCount", status.getSentCount());
             addField(builder, "averageLineageDuration", status.getAverageLineageDuration());
+            addField(builder, "transmissionStatus", status.getTransmissionStatus() == null ? null : status.getTransmissionStatus().name());
+            addField(builder, "targetURI", status.getTargetUri());
 
             arrayBuilder.add(builder.build());
         }
@@ -329,6 +333,8 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
             addField(builder, "inputCount", status.getInputCount());
             addField(builder, "outputBytes", status.getOutputBytes());
             addField(builder, "outputCount", status.getOutputCount());
+            addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name());
+            addField(builder, "transmitting", status.isTransmitting());
 
             arrayBuilder.add(builder.build());
         }
@@ -359,6 +365,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
             addField(builder, "outputCount", status.getOutputCount());
             addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold());
             addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold());
+            addField(builder, "backPressureDataSizeThreshold", status.getBackPressureDataSizeThreshold());
             addField(builder, "isBackPressureEnabled", Boolean.toString((status.getBackPressureObjectThreshold() > 0 && status.getBackPressureObjectThreshold() <= status.getQueuedCount())
                     || (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes())));
 
@@ -390,8 +397,12 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
             addField(builder, "outputCount", status.getOutputCount());
             addField(builder, "outputBytes", status.getOutputBytes());
             addField(builder, "activeThreadCount", status.getActiveThreadCount());
+            addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount());
             addField(builder, "invocations", status.getInvocations());
             addField(builder, "processingNanos", status.getProcessingNanos());
+            addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name());
+            addField(builder, "executionNode", status.getExecutionNode() == null ? null : status.getExecutionNode().name());
+            addField(builder, factory, "counters", status.getCounters());
 
             arrayBuilder.add(builder.build());
         }
@@ -411,4 +422,28 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
         addField(builder, "application", applicationName);
     }
 
+    private void addField(final JsonObjectBuilder builder, final String key, final Boolean value) {
+        if (value == null) {
+            return;
+        }
+
+        builder.add(key, value);
+    }
+
+    private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, Long> values) {
+        if (values == null) {
+            return;
+        }
+
+        final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
+        for (final Map.Entry<String, Long> entry : values.entrySet()) {
+            if (entry.getKey() == null || entry.getValue() == null) {
+                continue;
+            }
+
+            mapBuilder.add(entry.getKey(), entry.getValue());
+        }
+
+        builder.add(key, mapBuilder);
+    }
 }
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html
index 2d0be38..ab32bad 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html
@@ -72,13 +72,18 @@
 	// PG + Processors
 	{ "name" : "bytesRead", "type" : ["long", "null"]},
 	{ "name" : "bytesWritten", "type" : ["long", "null"]},
-	
+	{ "name" : "terminatedThreadCount", "type" : ["long", "null"]},
+
+	// Processors + Ports
+	{ "name" : "runStatus", "type" : ["string", "null"]},
+
 	// fields for process group status
 	{ "name" : "bytesTransferred", "type" : ["long", "null"]},
 	{ "name" : "flowFilesTransferred", "type" : ["long", "null"]},
 	{ "name" : "inputContentSize", "type" : ["long", "null"]},
 	{ "name" : "outputContentSize", "type" : ["long", "null"]},
 	{ "name" : "queuedContentSize", "type" : ["long", "null"]},
+	{ "name" : "versionedFlowStatus", "type" : ["string", "null"]},
 	
 	// fields for remote process groups
 	{ "name" : "activeRemotePortCount", "type" : ["long", "null"]},
@@ -88,12 +93,15 @@
 	{ "name" : "sentContentSize", "type" : ["long", "null"]},
 	{ "name" : "sentCount", "type" : ["long", "null"]},
 	{ "name" : "averageLineageDuration", "type" : ["long", "null"]},
+	{ "name" : "transmissionStatus", "type" : ["string", "null"]},
+	{ "name" : "targetURI", "type" : ["string", "null"]},
 	
 	// fields for input/output ports + connections + PG
 	{ "name" : "inputBytes", "type" : ["long", "null"]},
 	{ "name" : "inputCount", "type" : ["long", "null"]},
 	{ "name" : "outputBytes", "type" : ["long", "null"]},
 	{ "name" : "outputCount", "type" : ["long", "null"]},
+	{ "name" : "transmitting", "type" : ["boolean", "null"]},
 	
 	// fields for connections
 	{ "name" : "sourceId", "type" : ["string", "null"]},
@@ -105,6 +113,7 @@
 	{ "name" : "queuedBytes", "type" : ["long", "null"]},
 	{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
 	{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
+	{ "name" : "backPressureDataSizeThreshold", "type" : ["string", "null"]},
 	{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},
 	
     // fields for processors
@@ -112,7 +121,9 @@
 	{ "name" : "averageLineageDurationMS", "type" : ["long", "null"]},
 	{ "name" : "flowFilesRemoved", "type" : ["long", "null"]},
 	{ "name" : "invocations", "type" : ["long", "null"]},
-	{ "name" : "processingNanos", "type" : ["long", "null"]}
+	{ "name" : "processingNanos", "type" : ["long", "null"]},
+	{ "name" : "executionNode", "type" : ["string", "null"]},
+	{ "name" : "counters", "type": { "type": "map", "values": "string" }}
   ]
 }
 			</code>
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc
index 6f16d0e..4190015 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc
@@ -33,13 +33,18 @@
 	// PG + Processors
 	{ "name" : "bytesRead", "type" : ["long", "null"]},
 	{ "name" : "bytesWritten", "type" : ["long", "null"]},
-	
+	{ "name" : "terminatedThreadCount", "type" : ["long", "null"]},
+
+    // Processors + Ports
+	{ "name" : "runStatus", "type" : ["string", "null"]},
+
 	// fields for process group status
 	{ "name" : "bytesTransferred", "type" : ["long", "null"]},
 	{ "name" : "flowFilesTransferred", "type" : ["long", "null"]},
 	{ "name" : "inputContentSize", "type" : ["long", "null"]},
 	{ "name" : "outputContentSize", "type" : ["long", "null"]},
 	{ "name" : "queuedContentSize", "type" : ["long", "null"]},
+	{ "name" : "versionedFlowStatus", "type" : ["string", "null"]},
 	
 	// fields for remote process groups
 	{ "name" : "activeRemotePortCount", "type" : ["long", "null"]},
@@ -49,12 +54,15 @@
 	{ "name" : "sentContentSize", "type" : ["long", "null"]},
 	{ "name" : "sentCount", "type" : ["long", "null"]},
 	{ "name" : "averageLineageDuration", "type" : ["long", "null"]},
+	{ "name" : "transmissionStatus", "type" : ["string", "null"]},
+	{ "name" : "targetURI", "type" : ["string", "null"]},
 	
 	// fields for input/output ports + connections + PG
 	{ "name" : "inputBytes", "type" : ["long", "null"]},
 	{ "name" : "inputCount", "type" : ["long", "null"]},
 	{ "name" : "outputBytes", "type" : ["long", "null"]},
 	{ "name" : "outputCount", "type" : ["long", "null"]},
+	{ "name" : "transmitting", "type" : ["boolean", "null"]},
 	
 	// fields for connections
 	{ "name" : "sourceId", "type" : ["string", "null"]},
@@ -66,6 +74,7 @@
 	{ "name" : "queuedBytes", "type" : ["long", "null"]},
 	{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
 	{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
+	{ "name" : "backPressureDataSizeThreshold", "type" : ["string", "null"]},
 	{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},
 	
     // fields for processors
@@ -73,6 +82,8 @@
 	{ "name" : "averageLineageDurationMS", "type" : ["long", "null"]},
 	{ "name" : "flowFilesRemoved", "type" : ["long", "null"]},
 	{ "name" : "invocations", "type" : ["long", "null"]},
-	{ "name" : "processingNanos", "type" : ["long", "null"]}
+	{ "name" : "processingNanos", "type" : ["long", "null"]},
+	{ "name" : "executionNode", "type" : ["string", "null"]},
+	{ "name" : "counters", "type": { "type": "map", "values": "string" }}
   ]
 }
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
index 0d537c2..46be776 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
@@ -19,6 +19,8 @@ package org.apache.nifi.reporting;
 
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -31,6 +33,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import javax.json.Json;
+import javax.json.JsonNumber;
 import javax.json.JsonObject;
 import javax.json.JsonReader;
 import javax.json.JsonString;
@@ -42,8 +45,11 @@ import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.RunStatus;
+import org.apache.nifi.controller.status.TransmissionStatus;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.registry.flow.VersionedFlowState;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
@@ -108,8 +114,13 @@ public class TestSiteToSiteStatusReportingTask {
         assertEquals(16, task.dataSent.size());
         final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
         JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
-        JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId");
+        JsonObject firstElement = jsonReader.readArray().getJsonObject(0);
+        JsonString componentId = firstElement.getJsonString("componentId");
         assertEquals(pgStatus.getId(), componentId.getString());
+        JsonNumber terminatedThreads = firstElement.getJsonNumber("terminatedThreadCount");
+        assertEquals(1, terminatedThreads.longValue());
+        JsonString versionedFlowState = firstElement.getJsonString("versionedFlowState");
+        assertEquals("UP_TO_DATE", versionedFlowState.getString());
     }
 
     @Test
@@ -150,6 +161,10 @@ public class TestSiteToSiteStatusReportingTask {
         JsonString source = object.getJsonString("sourceName");
         assertEquals("true", backpressure.getString());
         assertEquals("source", source.getString());
+        JsonString dataSizeThreshold = object.getJsonString("backPressureDataSizeThreshold");
+        JsonNumber bytesThreshold = object.getJsonNumber("backPressureBytesThreshold");
+        assertEquals("1 KB", dataSizeThreshold.getString());
+        assertEquals(1024, bytesThreshold.intValue());
     }
 
     @Test
@@ -190,6 +205,80 @@ public class TestSiteToSiteStatusReportingTask {
         assertEquals("root.1.1.processor.1", componentId.getString());
     }
 
+    @Test
+    public void testPortStatus() throws IOException, InitializationException {
+        final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(InputPort)");
+
+        MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
+        task.onTrigger(context);
+
+        final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+        JsonObject object = jsonReader.readArray().getJsonObject(0);
+        JsonString runStatus = object.getJsonString("runStatus");
+        assertEquals(RunStatus.Stopped.name(), runStatus.getString());
+        boolean isTransmitting = object.getBoolean("transmitting");
+        assertFalse(isTransmitting);
+        JsonNumber inputBytes = object.getJsonNumber("inputBytes");
+        assertEquals(5, inputBytes.intValue());
+    }
+
+    @Test
+    public void testRemoteProcessGroupStatus() throws IOException, InitializationException {
+        final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(RemoteProcessGroup)");
+
+        MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
+        task.onTrigger(context);
+
+        assertEquals(3, task.dataSent.size());
+        final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+        JsonObject firstElement = jsonReader.readArray().getJsonObject(0);
+        JsonNumber activeThreadCount = firstElement.getJsonNumber("activeThreadCount");
+        assertEquals(1L, activeThreadCount.longValue());
+        JsonString transmissionStatus = firstElement.getJsonString("transmissionStatus");
+        assertEquals("Transmitting", transmissionStatus.getString());
+    }
+
+    @Test
+    public void testProcessorStatus() throws IOException, InitializationException {
+        final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
+        properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Processor)");
+
+        MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
+        task.onTrigger(context);
+
+        final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+        JsonObject object = jsonReader.readArray().getJsonObject(0);
+        JsonString runStatus = object.getJsonString("runStatus");
+        assertEquals(RunStatus.Running.name(), runStatus.getString());
+        JsonNumber inputBytes = object.getJsonNumber("inputBytes");
+        assertEquals(9, inputBytes.intValue());
+        JsonObject counterMap = object.getJsonObject("counters");
+        assertNotNull(counterMap);
+        assertEquals(10, counterMap.getInt("counter1"));
+        assertEquals(5, counterMap.getInt("counter2"));
+    }
+
+    /***********************************
+     * Test component generator methods
+     ***********************************/
+
     public static ProcessGroupStatus generateProcessGroupStatus(String id, String namePrefix,
             int maxRecursion, int currentDepth) {
         Collection<ConnectionStatus> cStatus = new ArrayList<>();
@@ -229,6 +318,7 @@ public class TestSiteToSiteStatusReportingTask {
         pgStatus.setProcessGroupStatus(childPgStatus);
         pgStatus.setRemoteProcessGroupStatus(rpgStatus);
         pgStatus.setProcessorStatus(pStatus);
+        pgStatus.setVersionedFlowState(VersionedFlowState.UP_TO_DATE);
 
         pgStatus.setActiveThreadCount(1);
         pgStatus.setBytesRead(2L);
@@ -246,6 +336,7 @@ public class TestSiteToSiteStatusReportingTask {
         pgStatus.setOutputCount(13);
         pgStatus.setQueuedContentSize(14l);
         pgStatus.setQueuedCount(15);
+        pgStatus.setTerminatedThreadCount(1);
 
         return pgStatus;
     }
@@ -263,6 +354,8 @@ public class TestSiteToSiteStatusReportingTask {
         pStatus.setInputCount(6);
         pStatus.setOutputBytes(7l);
         pStatus.setOutputCount(8);
+        pStatus.setRunStatus(RunStatus.Stopped);
+        pStatus.setTransmitting(false);
 
         return pStatus;
     }
@@ -287,6 +380,12 @@ public class TestSiteToSiteStatusReportingTask {
         pStatus.setOutputCount(13);
         pStatus.setProcessingNanos(14l);
         pStatus.setType("type");
+        pStatus.setTerminatedThreadCount(1);
+        pStatus.setRunStatus(RunStatus.Running);
+        pStatus.setCounters(new HashMap<String, Long>() {{
+            put("counter1", 10L);
+            put("counter2", 5L);
+        }});
 
         return pStatus;
     }
@@ -304,6 +403,7 @@ public class TestSiteToSiteStatusReportingTask {
         rpgStatus.setSentContentSize(6l);
         rpgStatus.setSentCount(7);
         rpgStatus.setTargetUri("uri");
+        rpgStatus.setTransmissionStatus(TransmissionStatus.Transmitting);
 
         return rpgStatus;
     }
@@ -312,7 +412,7 @@ public class TestSiteToSiteStatusReportingTask {
         ConnectionStatus cStatus = new ConnectionStatus();
         cStatus.setId(id);
         cStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
-        cStatus.setBackPressureBytesThreshold(0l);
+        cStatus.setBackPressureDataSizeThreshold("1 KB"); // sets backPressureBytesThreshold too
         cStatus.setBackPressureObjectThreshold(1l);
         cStatus.setInputBytes(2l);
         cStatus.setInputCount(3);