You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2022/06/28 13:37:21 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1829 Integrate Prometheus client lib MINIFICPP-1849 Fix C2 JSON representation of empty metrics

This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new e12c464f8 MINIFICPP-1829 Integrate Prometheus client lib MINIFICPP-1849 Fix C2 JSON representation of empty metrics
e12c464f8 is described below

commit e12c464f8292ee37ac8182bc5a0690a72a4f02d5
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Jun 28 15:34:00 2022 +0200

    MINIFICPP-1829 Integrate Prometheus client lib
    MINIFICPP-1849 Fix C2 JSON representation of empty metrics
    
    Signed-off-by: Adam Debreceni <ad...@apache.org>
    
    This closes #1340
---
 .github/workflows/ci.yml                           |   8 +-
 C2.md                                              | 103 +++++++---
 CMakeLists.txt                                     |   7 +
 LICENSE                                            |  30 +++
 METRICS.md                                         | 155 ++++++++++++++
 NOTICE                                             |   1 +
 README.md                                          |   3 +
 Windows.md                                         |   1 +
 bootstrap.sh                                       |   2 +
 bstrp_functions.sh                                 |   6 +-
 cmake/BundledCivetWeb.cmake                        |   4 +-
 cmake/DockerConfig.cmake                           |   5 +
 cmake/Prometheus.cmake                             |  36 ++++
 conf/minifi.properties                             |  15 +-
 docker/Dockerfile                                  |   3 +-
 docker/bionic/Dockerfile                           |   3 +-
 docker/centos/Dockerfile                           |   3 +-
 docker/fedora/Dockerfile                           |   3 +-
 docker/focal/Dockerfile                            |   3 +-
 docker/requirements.txt                            |   1 +
 .../integration/MiNiFi_integration_test_driver.py  |   6 +
 .../test/integration/features/prometheus.feature   |  17 ++
 .../integration/minifi/core/DockerTestCluster.py   |  81 +++++++-
 docker/test/integration/minifi/core/ImageStore.py  |   8 +
 .../integration/minifi/core/PrometheusContainer.py |  38 ++++
 .../minifi/core/SingleNodeDockerCluster.py         |   3 +
 .../integration/resources/prometheus/Dockerfile    |   2 +
 .../resources/prometheus/conf/prometheus.yml       |   7 +
 docker/test/integration/steps/steps.py             |  17 ++
 encrypt-config/tests/ConfigFileEncryptorTests.cpp  |  12 +-
 encrypt-config/tests/ConfigFileTests.cpp           |   8 +-
 encrypt-config/tests/resources/minifi.properties   |  15 +-
 ...th-additional-sensitive-props.minifi.properties |  15 +-
 extensions/civetweb/CMakeLists.txt                 |   4 -
 extensions/http-curl/tests/C2EmptyMetricTest.cpp   | 115 +++++++++++
 extensions/http-curl/tests/C2MetricsTest.cpp       | 212 +++++++++++++++++++
 extensions/http-curl/tests/CMakeLists.txt          |   8 +-
 extensions/{civetweb => prometheus}/CMakeLists.txt |  23 +--
 .../prometheus/MetricsExposer.h                    |  30 ++-
 .../prometheus/PrometheusExposerWrapper.cpp        |  34 ++--
 .../prometheus/PrometheusExposerWrapper.h          |  37 ++--
 .../prometheus/PrometheusMetricsPublisher.cpp      |  87 ++++++++
 extensions/prometheus/PrometheusMetricsPublisher.h |  56 +++++
 .../prometheus/PublishedMetricGaugeCollection.cpp  |  49 +++++
 .../prometheus/PublishedMetricGaugeCollection.h    |  35 ++--
 extensions/prometheus/tests/CMakeLists.txt         |  34 ++++
 .../tests/PrometheusMetricsPublisherTest.cpp       | 111 ++++++++++
 .../standard-processors/processors/GetFile.h       |  53 +++--
 extensions/standard-processors/processors/GetTCP.h |  48 ++---
 libminifi/include/FlowController.h                 |  12 +-
 libminifi/include/c2/C2Agent.h                     |   5 -
 libminifi/include/c2/C2Client.h                    |  17 +-
 libminifi/include/core/state/ConnectionStore.h     |  60 ++++++
 .../core/state/MetricsPublisher.h}                 |  31 ++-
 .../core/state/PublishedMetricProvider.h}          |  36 ++--
 .../include/core/state/nodes/AgentInformation.h    |   2 +-
 .../include/core/state/nodes/BuildInformation.h    |   2 +-
 .../core/state/nodes/ConfigurationChecksums.h      |   2 +-
 .../include/core/state/nodes/DeviceInformation.h   |  18 +-
 .../include/core/state/nodes/FlowInformation.h     |  28 +--
 libminifi/include/core/state/nodes/MetricsBase.h   |  45 ++--
 libminifi/include/core/state/nodes/QueueMetrics.h  |  44 ++--
 .../include/core/state/nodes/RepositoryMetrics.h   |  42 ++--
 .../include/core/state/nodes/ResponseNodeLoader.h  |  70 +++++++
 .../include/core/state/nodes/SchedulingNodes.h     |   4 +-
 .../include/core/state/nodes/SupportedOperations.h |   2 +-
 libminifi/include/properties/Configuration.h       |   7 +-
 libminifi/include/utils/Id.h                       |  26 +++
 libminifi/src/Configuration.cpp                    |   7 +-
 libminifi/src/FlowController.cpp                   | 121 +++++++----
 libminifi/src/c2/C2Agent.cpp                       |  12 +-
 libminifi/src/c2/C2Client.cpp                      | 226 ++++++---------------
 .../src/core/state/nodes/AgentInformation.cpp      |  14 +-
 .../{AgentInformation.cpp => QueueMetrics.cpp}     |  21 +-
 ...{AgentInformation.cpp => RepositoryMetrics.cpp} |  21 +-
 .../src/core/state/nodes/ResponseNodeLoader.cpp    | 193 ++++++++++++++++++
 libminifi/test/resources/TestC2Metrics.yml         |  69 +++++++
 libminifi/test/resources/TestC2MetricsUpdate.yml   |  66 ++++++
 .../test/resources/encrypted.minifi.properties     |  15 +-
 .../unit/{C2MetricsTests.cpp => MetricsTests.cpp}  |   2 +-
 .../prometheus-cpp/remove-find_package.patch       |  13 ++
 win_build_vs.bat                                   |   4 +-
 82 files changed, 2160 insertions(+), 634 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 3d3d9a5b5..c2bcab3ab 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -81,7 +81,7 @@ jobs:
         run: |
           PATH %PATH%;C:\Program Files (x86)\Windows Kits\10\bin\10.0.19041.0\x64
           PATH %PATH%;C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\MSBuild\Current\Bin\Roslyn
-          win_build_vs.bat ..\b /64 /CI /S /A /PDH /SPLUNK /GCP /K /L /R /Z /N /RO
+          win_build_vs.bat ..\b /64 /CI /S /A /PDH /SPLUNK /GCP /K /L /R /Z /N /RO /PR
         shell: cmd
       - name: test
         run: cd ..\b && ctest --timeout 300 --parallel %NUMBER_OF_PROCESSORS% -C Release --output-on-failure
@@ -119,7 +119,7 @@ jobs:
           cmake -DUSE_SHARED_LIBS=ON -DCMAKE_BUILD_TYPE=Release -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON -DENABLE_AWS=ON -DENABLE_AZURE=ON -DENABLE_BUSTACHE=ON -DENABLE_COAP=ON \
               -DENABLE_ENCRYPT_CONFIG=ON -DENABLE_GPS=ON -DENABLE_JNI=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_LINTER=ON -DENABLE_MQTT=ON -DENABLE_NANOFI=ON -DENABLE_OPC=ON -DENABLE_OPENCV=ON \
               -DENABLE_OPENWSMAN=ON -DENABLE_OPS=ON -DENABLE_PCAP=ON -DENABLE_PYTHON=ON -DENABLE_SENSORS=ON -DENABLE_SFTP=ON -DENABLE_SQL=ON -DENABLE_SYSTEMD=ON -DENABLE_TENSORFLOW=OFF \
-              -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON -DENABLE_PROCFS=ON ..
+              -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON -DENABLE_PROCFS=ON -DENABLE_PROMETHEUS=ON ..
           make -j$(nproc) VERBOSE=1
       - name: test
         run: cd build && make test ARGS="--timeout 300 -j2 --output-on-failure"
@@ -166,7 +166,7 @@ jobs:
           cmake -DUSE_SHARED_LIBS=ON -DCMAKE_BUILD_TYPE=Release -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON -DENABLE_AWS=ON -DENABLE_AZURE=ON -DENABLE_BUSTACHE=ON -DENABLE_COAP=ON \
               -DENABLE_ENCRYPT_CONFIG=ON -DENABLE_GPS=ON -DENABLE_JNI=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_LINTER=ON -DENABLE_MQTT=ON -DENABLE_NANOFI=ON -DENABLE_OPC=ON -DENABLE_OPENCV=ON \
               -DENABLE_OPENWSMAN=ON -DENABLE_OPS=ON -DENABLE_PCAP=ON -DENABLE_PYTHON=ON -DENABLE_SENSORS=ON -DENABLE_SFTP=ON -DENABLE_SQL=ON -DENABLE_SYSTEMD=ON -DENABLE_TENSORFLOW=OFF \
-              -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON -DENABLE_PROCFS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON ..
+              -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON -DENABLE_PROCFS=ON -DENABLE_PROMETHEUS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON ..
           cmake --build . --parallel $(nproc)
       - name: test
         run: cd build && make test ARGS="--timeout 300 -j8 --output-on-failure"
@@ -221,7 +221,7 @@ jobs:
           if [ -d ~/.ccache ]; then mv ~/.ccache .; fi
           mkdir build
           cd build
-          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_SPLUNK=ON -DENABLE_GCP=ON -DENABLE_OPC=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_TEST_PROCESSORS=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
+          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_SPLUNK=ON -DENABLE_GCP=ON -DENABLE_OPC=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_TEST_PROCESSORS=ON -DENABLE_PROMETHEUS=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
           make docker
       - id: install_deps
         run: |
diff --git a/C2.md b/C2.md
index 4dd2a465a..4e2a815ae 100644
--- a/C2.md
+++ b/C2.md
@@ -110,10 +110,12 @@ a configuration of an agent
 
 	nifi.c2.root.class.definitions=metrics
 	nifi.c2.root.class.definitions.metrics.name=metrics
-	nifi.c2.root.class.definitions.metrics.metrics=typedmetrics,processorMetrics
-	nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.name=RuntimeMetrics
-	nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.classes=ProcessMetrics,SystemInformation
-	nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetrics
+	nifi.c2.root.class.definitions.metrics.metrics=runtimemetrics,loadmetrics,processorMetrics
+	nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.name=RuntimeMetrics
+	nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes=DeviceInfoNode,FlowInformation
+	nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name=LoadMetrics
+	nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes=QueueMetrics,RepositoryMetrics
+	nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetric
 	nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileMetrics
 
 This example shows a metrics sub tree defined by the option 'nifi.c2.root.class.definitions'.
@@ -124,34 +126,85 @@ The options below metrics define the sub-trees within metrics: typedmetrics and
 The classes sub option will define the metrics classes that are placed within this sub-tree. For the RESTProtocol, the above
 configuration produces the following JSON:
 
-	"metrics": {
-        "ProcessorMetrics": {
-            "GetFileMetrics": {
-                "AcceptedFiles": 22,
-                "InputBytes": 61755,
-                "OnTriggerInvocations": 1
-            }
+  "metrics": {
+    "RuntimeMetrics": {
+        "deviceInfo": {
+            "systemInfo": {
+                "cpuUtilization": -1,
+                "machinearch": "x86_64",
+                "memoryUsage": 13103550464,
+                "operatingSystem": "Linux",
+                "physicalMem": 67024097280,
+                "vCores": 12
+            },
+            "networkInfo": {
+                "hostname": "ggyimesi-5540-ubuntu",
+                "ipAddress": "192.168.50.181"
+            },
+            "identifier": "13396751919892753964"
         },
-        "RuntimeMetrics": {
-            "ProcessMetrics": {
-                "CpuMetrics": {
-                    "involcs": 1
-                },
-                "MemoryMetrics": {
-                    "maxrss": 145804
+        "flowInfo": {
+            "versionedFlowSnapshotURI": {
+                "bucketId": "default",
+                "flowId": "8db40550-db5d-11ec-95d7-0433c2c9832b"
+            },
+            "queues": {
+                "2438e3c8-015a-1000-79ca-83af40ec1997": {
+                    "dataSize": 0,
+                    "dataSizeMax": 1048576,
+                    "name": "GetFile/success/LogAttribute",
+                    "size": 0,
+                    "sizeMax": 0,
+                    "uuid": "2438e3c8-015a-1000-79ca-83af40ec1997"
                 }
             },
-            "systeminfo": {
-                "systemInfo": {
-                    "machinearch": "x86_64",
-                    "physicalMem": 67361411072,
-                    "vCores": 12
+            "components": {
+                "FlowController": {
+                    "running": true,
+                    "uuid": "2438e3c8-015a-1000-79ca-83af40ec1990"
+                },
+                "GetFile": {
+                    "running": false,
+                    "uuid": "2438e3c8-015a-1000-79ca-83af40ec1991"
                 },
-                "identifier": "identifier"
+                "LogAttribute": {
+                    "running": true,
+                    "uuid": "2438e3c8-015a-1000-79ca-83af40ec1992"
+                }
+            },
+            "flowId": "8db40550-db5d-11ec-95d7-0433c2c9832b"
+        }
+    },
+    "LoadMetrics": {
+        "QueueMetrics": {
+            "GetFile/success/LogAttribute": {
+                "datasize": "0",
+                "datasizemax": "1048576",
+                "queued": "0",
+                "queuedmax": "0"
             }
+        },
+        "RepositoryMetrics": {
+            "ff": {
+                "full": false,
+                "running": false,
+                "size": "0"
+            },
+            "repo_name": {
+                "full": false,
+                "running": true,
+                "size": "0"
+            }
+        }
+    },
+    "ProcessorMetrics": {
+        "GetFileMetrics": {
+            "AcceptedFiles": 0,
+            "InputBytes": 0,
+            "OnTriggerInvocations": 0
         }
     }
-
+  }
 
 ### Protocols
 
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ee6ca4575..ce5bc75fd 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -120,6 +120,7 @@ option(ENABLE_GCP "Enable Google Cloud support" ON)
 option(DOCKER_BUILD_ONLY "Disables all targets except docker build scripts. Ideal for systems without an up-to-date compiler." OFF)
 option(ENABLE_KUBERNETES "Enables the Kubernetes extensions." OFF)
 option(ENABLE_TEST_PROCESSORS "Enables test processors" OFF)
+option(ENABLE_PROMETHEUS "Enables Prometheus support." OFF)
 
 if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
 	option(ENABLE_PROCFS "Enables the procfs extension." ON)
@@ -410,6 +411,12 @@ if ((ENABLE_OPENWSMAN AND NOT DISABLE_CIVET AND NOT DISABLE_CURL) OR ENABLE_ALL
 	list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/libxml2/dummy")
 endif()
 
+if (ENABLE_ALL OR ENABLE_PROMETHEUS OR NOT DISABLE_CIVET)
+	include(BundledCivetWeb)
+	use_bundled_civetweb(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
+	list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/civetweb/dummy")
+endif()
+
 ## Add extensions
 file(GLOB extension-directories "extensions/*")
 foreach(extension-dir ${extension-directories})
diff --git a/LICENSE b/LICENSE
index 4dba5c977..966d7e6dd 100644
--- a/LICENSE
+++ b/LICENSE
@@ -3611,3 +3611,33 @@ This product bundles 'kubernetes-client/c', which is available under the Apache
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+--------------------------------------------------------------------------
+
+This product bundles 'prometheus-cpp' which is available under an MIT license.
+
+  MIT License
+
+  Copyright (c) 2016-2021 Jupp Mueller
+  Copyright (c) 2017-2022 Gregor Jasny
+
+  And many contributors, see
+  https://github.com/jupp0r/prometheus-cpp/graphs/contributors
+
+  Permission is hereby granted, free of charge, to any person obtaining a copy
+  of this software and associated documentation files (the "Software"), to deal
+  in the Software without restriction, including without limitation the rights
+  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+  copies of the Software, and to permit persons to whom the Software is
+  furnished to do so, subject to the following conditions:
+
+  The above copyright notice and this permission notice shall be included in all
+  copies or substantial portions of the Software.
+
+  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+  SOFTWARE.
diff --git a/METRICS.md b/METRICS.md
new file mode 100644
index 000000000..b53465a28
--- /dev/null
+++ b/METRICS.md
@@ -0,0 +1,155 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# Apache NiFi - MiNiFi - C++ Metrics Readme.
+
+
+This readme defines the metrics published by Apache NiFi. All options defined are located in minifi.properties.
+
+## Table of Contents
+
+- [Description](#description)
+- [Configuration](#configuration)
+- [Metrics](#metrics)
+
+## Description
+
+Apache NiFi MiNiFi C++ can communicate metrics about the agent's status, that can be a system level or component level metric.
+These metrics are exposed through the agent implemented metric publishers that can be configured in the minifi.properties.
+Aside from the publisher exposed metrics, metrics are also sent through C2 protocol of which there is more information in the
+[C2 documentation](C2.md#metrics).
+
+## Configuration
+
+To configure the a metrics publisher first we have to set which publisher class should be used:
+
+	# in minifi.properties
+
+	nifi.metrics.publisher.class=PrometheusMetricsPublisher
+
+Currently PrometheusMetricsPublisher is the only available publisher in MiNiFi C++ which publishes metrics to a Prometheus server.
+To use the publisher a port should also be configured where the metrics will be available to be scraped through:
+
+	# in minifi.properties
+
+	nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936
+
+The last option defines which metric classes should be exposed through the metrics publisher in configured with a comma separated value:
+
+	# in minifi.properties
+
+	nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
+
+## Metrics
+
+The following section defines the currently available metrics to be published by the MiNiFi C++ agent.
+
+NOTE: In Prometheus all metrics are extended with a `minifi_` prefix to mark the domain of the metric. For example the `connection_name` metric is published as `minifi_connection_name` in Prometheus.
+
+### QueueMetrics
+
+QueueMetrics is a system level metric that reports queue metrics for every connection in the flow.
+
+| Metric name          | Labels                                         | Description                                |
+|----------------------|------------------------------------------------|--------------------------------------------|
+| queue_data_size      | metric_class, connection_uuid, connection_name | Current queue data size                    |
+| queue_data_size_max  | metric_class, connection_uuid, connection_name | Max queue data size to apply back pressure |
+| queue_size           | metric_class, connection_uuid, connection_name | Current queue size                         |
+| queue_size_max       | metric_class, connection_uuid, connection_name | Max queue size to apply back pressure      |
+
+| Label                    | Description                                                |
+|--------------------------|------------------------------------------------------------|
+| metric_class             | Class name to filter for this metric, set to QueueMetrics  |
+| connection_uuid          | UUID of the connection defined in the flow configuration   |
+| connection_name          | Name of the connection defined in the flow configuration   |
+
+### RepositoryMetrics
+
+RepositoryMetrics is a system level metric that reports metrics for the registered repositories (by default flowfile and provenance repository)
+
+| Metric name          | Labels                        | Description                           |
+|----------------------|-------------------------------|---------------------------------------|
+| is_running           | metric_class, repository_name | Is the repository running (1 or 0)    |
+| is_full              | metric_class, repository_name | Is the repository full (1 or 0)       |
+| repository_size      | metric_class, repository_name | Current size of the repository        |
+
+| Label                    | Description                                                     |
+|--------------------------|-----------------------------------------------------------------|
+| metric_class             | Class name to filter for this metric, set to RepositoryMetrics  |
+| repository_name          | Name of the reported repository                                 |
+
+### GetFileMetrics
+
+Processor level metric that reports metrics for the GetFile processor if defined in the flow configuration
+
+| Metric name           | Labels                                       | Description                                    |
+|-----------------------|----------------------------------------------|------------------------------------------------|
+| onTrigger_invocations | metric_class, processor_name, processor_uuid | Number of times the processor was triggered    |
+| accepted_files        | metric_class, processor_name, processor_uuid | Number of files that matched the set criterias |
+| input_bytes           | metric_class, processor_name, processor_uuid | Sum of file sizes processed                    |
+
+| Label          | Description                                                    |
+|----------------|----------------------------------------------------------------|
+| metric_class   | Class name to filter for this metric, set to GetFileMetrics    |
+| processor_name | Name of the processor                                          |
+| processor_uuid | UUID of the processor                                          |
+
+### GetTCPMetrics
+
+Processor level metric that reports metrics for the GetTCPMetrics processor if defined in the flow configuration
+
+| Metric name           | Labels                                       | Description                                    |
+|-----------------------|----------------------------------------------|------------------------------------------------|
+| onTrigger_invocations | metric_class, processor_name, processor_uuid | Number of times the processor was triggered    |
+
+| Label          | Description                                                    |
+|----------------|----------------------------------------------------------------|
+| metric_class   | Class name to filter for this metric, set to GetTCPMetrics     |
+| processor_name | Name of the processor                                          |
+| processor_uuid | UUID of the processor                                          |
+
+### DeviceInfoNode
+
+DeviceInfoNode is a system level metric that reports metrics about the system resources used and available
+
+| Metric name     | Labels       | Description               |
+|-----------------|--------------|---------------------------|
+| physical_mem    | metric_class | Physical memory available |
+| memory_usage    | metric_class | Memory used by the agent  |
+| cpu_utilization | metric_class | CPU utilized by the agent |
+
+| Label          | Description                                                 |
+|----------------|-------------------------------------------------------------|
+| metric_class   | Class name to filter for this metric, set to DeviceInfoNode |
+
+### FlowInformation
+
+DeviceInfoNode is a system level metric that reports metrics about the system resources used and available
+
+| Metric name          | Labels                                         | Description                                |
+|----------------------|------------------------------------------------|--------------------------------------------|
+| queue_data_size      | metric_class, connection_uuid, connection_name | Current queue data size                    |
+| queue_data_size_max  | metric_class, connection_uuid, connection_name | Max queue data size to apply back pressure |
+| queue_size           | metric_class, connection_uuid, connection_name | Current queue size                         |
+| queue_size_max       | metric_class, connection_uuid, connection_name | Max queue size to apply back pressure      |
+| is_running           | metric_class, component_uuid, component_name   | Check if the component is running (1 or 0) |
+
+| Label           | Description                                                  |
+|-----------------|--------------------------------------------------------------|
+| metric_class    | Class name to filter for this metric, set to FlowInformation |
+| connection_uuid | UUID of the connection defined in the flow configuration     |
+| connection_name | Name of the connection defined in the flow configuration     |
+| component_uuid  | UUID of the component                                        |
+| component_name  | Name of the component                                        |
diff --git a/NOTICE b/NOTICE
index 3f81cc3e5..7502e6c9e 100644
--- a/NOTICE
+++ b/NOTICE
@@ -67,6 +67,7 @@ This software includes third party software subject to the following copyrights:
 - nlohmann json - Copyright (c) 2013-2022 Niels Lohmann
 - abseil-cpp - Google Inc.
 - crc32c - Google Inc., Fangming Fang, Vadim Skipin, Rodrigo Tobar, Harry Mallon
+- prometheus-cpp - Copyright (c) 2016-2021 Jupp Mueller, Copyright (c) 2017-2022 Gregor Jasny
 
 The licenses for these third party components are included in LICENSE.txt
 
diff --git a/README.md b/README.md
index d0a98b99f..a19b26144 100644
--- a/README.md
+++ b/README.md
@@ -554,6 +554,9 @@ Antivirus software can take a long time to scan directories and the files within
 ## Operations
 See our [operations documentation for additional information on how to manage instances](OPS.md)
 
+## Monitoring
+See our [metrics documentation for information about self published metrics](METRICS.md)
+
 ## Issue Tracking
 See https://issues.apache.org/jira/projects/MINIFICPP/issues for the issue tracker.
 
diff --git a/Windows.md b/Windows.md
index 73f452b48..8a7bfb1cc 100644
--- a/Windows.md
+++ b/Windows.md
@@ -67,6 +67,7 @@ After the build directory it will take optional parameters modifying the CMake c
 | /L | Enables Linter |
 | /O | Enables OpenCV |
 | /PDH | Enables Performance Monitor |
+| /PR | Enables Prometheus |
 | /RO | Use real ODBC driver in tests instead of mock SQL driver |
 | /M | Creates installer with merge modules |
 | /64 | Creates 64-bit build instead of a 32-bit one |
diff --git a/bootstrap.sh b/bootstrap.sh
index 70a1534e3..4baf98f56 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -339,6 +339,8 @@ add_option GCP_ENABLED ${TRUE} "ENABLE_GCP"
 
 add_option PROCFS_ENABLED ${TRUE} "ENABLE_PROCFS"
 
+add_disabled_option PROMETHEUS_ENABLED ${FALSE} "ENABLE_PROMETHEUS"
+
 USE_SHARED_LIBS=${TRUE}
 ASAN_ENABLED=${FALSE}
 FAIL_ON_WARNINGS=${FALSE}
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index cb30ff132..3af419d98 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -386,6 +386,7 @@ show_supported_features() {
   echo "AB. Kubernetes Support .........$(print_feature_status KUBERNETES_ENABLED)"
   echo "AC. Google Cloud Support .......$(print_feature_status GCP_ENABLED)"
   echo "AD. ProcFs Support .............$(print_feature_status PROCFS_ENABLED)"
+  echo "AE. Prometheus Support .........$(print_feature_status PROMETHEUS_ENABLED)"
   echo "****************************************"
   echo "            Build Options."
   echo "****************************************"
@@ -408,7 +409,7 @@ show_supported_features() {
 
 read_feature_options(){
   local choice
-  echo -n "Enter choice [A-Z or AA-AD or 1-7] "
+  echo -n "Enter choice [A-Z or AA-AE or 1-7] "
   read -r choice
   choice=$(echo "${choice}" | tr '[:upper:]' '[:lower:]')
   case $choice in
@@ -444,6 +445,7 @@ read_feature_options(){
     ab) ToggleFeature KUBERNETES_ENABLED ;;
     ac) ToggleFeature GCP_ENABLED ;;
     ad) ToggleFeature PROCFS_ENABLED ;;
+    ae) ToggleFeature PROMETHEUS_ENABLED ;;
     1) ToggleFeature TESTS_ENABLED ;;
     2) EnableAllFeatures ;;
     3) ToggleFeature JNI_ENABLED;;
@@ -462,7 +464,7 @@ read_feature_options(){
       fi
       ;;
     q) exit 0;;
-    *) echo -e "${RED}Please enter an option A-Z or AA-AD or 1-7...${NO_COLOR}" && sleep 2
+    *) echo -e "${RED}Please enter an option A-Z or AA-AE or 1-7...${NO_COLOR}" && sleep 2
   esac
 }
 
diff --git a/cmake/BundledCivetWeb.cmake b/cmake/BundledCivetWeb.cmake
index 540ff4cad..88e5b4d15 100644
--- a/cmake/BundledCivetWeb.cmake
+++ b/cmake/BundledCivetWeb.cmake
@@ -46,7 +46,8 @@ function(use_bundled_civetweb SOURCE_DIR BINARY_DIR)
 
     # Set build options
     set(CIVETWEB_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
-            "-DCMAKE_INSTALL_PREFIX=${CIVETWEB_BIN_DIR}"
+            -DCMAKE_INSTALL_PREFIX=${CIVETWEB_BIN_DIR}
+            -DCMAKE_PREFIX_PATH=${CIVETWEB_BIN_DIR}
             -DCIVETWEB_ENABLE_SSL_DYNAMIC_LOADING=OFF
             -DCIVETWEB_BUILD_TESTING=OFF
             -DCIVETWEB_ENABLE_DUKTAPE=OFF
@@ -102,4 +103,5 @@ function(use_bundled_civetweb SOURCE_DIR BINARY_DIR)
     set_target_properties(CIVETWEB::civetweb-cpp PROPERTIES IMPORTED_LOCATION "${CIVETWEB_BIN_DIR}/${LIBDIR}/${PREFIX}civetweb-cpp.${SUFFIX}")
     target_link_libraries(CIVETWEB::civetweb-cpp INTERFACE CIVETWEB::c-library)
     add_dependencies(CIVETWEB::civetweb-cpp civetweb-external)
+    add_library(civetweb::civetweb-cpp ALIAS CIVETWEB::civetweb-cpp)
 endfunction(use_bundled_civetweb)
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index 8e6387270..f8f7e7d8d 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -51,6 +51,7 @@ add_custom_target(
         -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
         -c ENABLE_PROCFS=${ENABLE_PROCFS}
         -c ENABLE_TEST_PROCESSORS=${ENABLE_TEST_PROCESSORS}
+        -c ENABLE_PROMETHEUS=${ENABLE_PROMETHEUS}
         -c DISABLE_CURL=${DISABLE_CURL}
         -c DISABLE_JEMALLOC=${DISABLE_JEMALLOC}
         -c DISABLE_CIVET=${DISABLE_CIVET}
@@ -122,6 +123,7 @@ add_custom_target(
         -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
         -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
         -c ENABLE_PROCFS=${ENABLE_PROCFS}
+        -c ENABLE_PROMETHEUS=${ENABLE_PROMETHEUS}
         -c ENABLE_SYSTEMD=${ENABLE_SYSTEMD}
         -c DISABLE_CURL=${DISABLE_CURL}
         -c DISABLE_JEMALLOC=${DISABLE_JEMALLOC}
@@ -173,6 +175,7 @@ add_custom_target(
         -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
         -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
         -c ENABLE_PROCFS=${ENABLE_PROCFS}
+        -c ENABLE_PROMETHEUS=${ENABLE_PROMETHEUS}
         -c ENABLE_SYSTEMD=${ENABLE_SYSTEMD}
         -c DISABLE_CURL=${DISABLE_CURL}
         -c DISABLE_JEMALLOC=${DISABLE_JEMALLOC}
@@ -224,6 +227,7 @@ add_custom_target(
         -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
         -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
         -c ENABLE_PROCFS=${ENABLE_PROCFS}
+        -c ENABLE_PROMETHEUS=${ENABLE_PROMETHEUS}
         -c ENABLE_SYSTEMD=${ENABLE_SYSTEMD}
         -c DISABLE_CURL=${DISABLE_CURL}
         -c DISABLE_JEMALLOC=${DISABLE_JEMALLOC}
@@ -275,6 +279,7 @@ add_custom_target(
         -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
         -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
         -c ENABLE_PROCFS=${ENABLE_PROCFS}
+        -c ENABLE_PROMETHEUS=${ENABLE_PROMETHEUS}
         -c ENABLE_TEST_PROCESSORS=${ENABLE_TEST_PROCESSORS}
         -c ENABLE_SYSTEMD=${ENABLE_SYSTEMD}
         -c DISABLE_CURL=${DISABLE_CURL}
diff --git a/cmake/Prometheus.cmake b/cmake/Prometheus.cmake
new file mode 100644
index 000000000..2a1560bd5
--- /dev/null
+++ b/cmake/Prometheus.cmake
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+include(FetchContent)
+
+set(ENABLE_PUSH OFF CACHE BOOL "" FORCE)
+set(ENABLE_TESTING OFF CACHE BOOL "" FORCE)
+set(USE_THIRDPARTY_LIBRARIES OFF CACHE BOOL "" FORCE)
+set(ENABLE_COMPRESSION OFF CACHE BOOL "" FORCE)
+
+set(PATCH_FILE "${CMAKE_SOURCE_DIR}/thirdparty/prometheus-cpp/remove-find_package.patch")
+set(PC ${Bash_EXECUTABLE}  -c "set -x &&\
+        (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE}\\\")")
+
+FetchContent_Declare(
+    prometheus-cpp
+    URL "https://github.com/jupp0r/prometheus-cpp/archive/refs/tags/v1.0.1.tar.gz"
+    URL_HASH "SHA256=593e028d401d3298eada804d252bc38d8cab3ea1c9e88bcd72095281f85e6d16"
+    PATCH_COMMAND "${PC}"
+)
+
+FetchContent_MakeAvailable(prometheus-cpp)
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 0d51905db..0dbf918a9 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -90,11 +90,11 @@ nifi.c2.full.heartbeat=false
 ## define metrics reported
 nifi.c2.root.class.definitions=metrics
 nifi.c2.root.class.definitions.metrics.name=metrics
-nifi.c2.root.class.definitions.metrics.metrics=typedmetrics
-nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.name=RuntimeMetrics
-nifi.c2.root.class.definitions.metrics.metrics.queuemetrics.name=QueueMetrics
-nifi.c2.root.class.definitions.metrics.metrics.queuemetrics.classes=QueueMetrics
-nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.classes=ProcessMetrics,SystemInformation
+nifi.c2.root.class.definitions.metrics.metrics=runtimemetrics,loadmetrics,processorMetrics
+nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.name=RuntimeMetrics
+nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes=DeviceInfoNode,FlowInformation
+nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name=LoadMetrics
+nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes=QueueMetrics,RepositoryMetrics
 nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetric
 nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileMetrics
 
@@ -121,3 +121,8 @@ nifi.nar.docs.directory=${MINIFI_HOME}/minifi-jni/nardocs
 # must be comma separated
 nifi.jvm.options=-Xmx1G
 nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/
+
+# Publish metrics to external consumers
+# nifi.metrics.publisher.class=PrometheusMetricsPublisher
+# nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936
+# nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 565a51357..632a782b3 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -66,6 +66,7 @@ ARG DISABLE_PYTHON_SCRIPTING=
 ARG ENABLE_LUA_SCRIPTING=
 ARG ENABLE_KUBERNETES=OFF
 ARG ENABLE_PROCFS=OFF
+ARG ENABLE_PROMETHEUS=OFF
 ARG DISABLE_CONTROLLER=OFF
 ARG CMAKE_BUILD_TYPE=Release
 
@@ -125,7 +126,7 @@ RUN cmake -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_ALL="${ENABLE_ALL}" -DENABL
     -DENABLE_TEST_PROCESSORS="${ENABLE_TEST_PROCESSORS}" -DDISABLE_EXPRESSION_LANGUAGE="${DISABLE_EXPRESSION_LANGUAGE}" -DDISABLE_ROCKSDB="${DISABLE_ROCKSDB}" \
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
-    -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" \
+    -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" -DENABLE_PROMETHEUS="${ENABLE_PROMETHEUS}" \
     -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. && \
     make -j "$(nproc)" package && \
     tar -xzvf "${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}.tar.gz" -C "${MINIFI_BASE_DIR}"
diff --git a/docker/bionic/Dockerfile b/docker/bionic/Dockerfile
index 5226a42c0..b2030e130 100644
--- a/docker/bionic/Dockerfile
+++ b/docker/bionic/Dockerfile
@@ -75,6 +75,7 @@ ARG DISABLE_PYTHON_SCRIPTING=ON
 ARG ENABLE_LUA_SCRIPTING=ON
 ARG ENABLE_KUBERNETES=OFF
 ARG ENABLE_PROCFS=OFF
+ARG ENABLE_PROMETHEUS=OFF
 ARG DISABLE_CONTROLLER=OFF
 ARG CMAKE_BUILD_TYPE=Release
 
@@ -95,6 +96,6 @@ RUN cd $MINIFI_BASE_DIR \
     -DENABLE_TEST_PROCESSORS=OFF -DDISABLE_EXPRESSION_LANGUAGE="${DISABLE_EXPRESSION_LANGUAGE}" -DDISABLE_ROCKSDB="${DISABLE_ROCKSDB}" \
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
-    -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" \
+    -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" -DENABLE_PROMETHEUS="${ENABLE_PROMETHEUS}" \
     -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
     && make -j "$(nproc)" package
diff --git a/docker/centos/Dockerfile b/docker/centos/Dockerfile
index 352d66d42..da6ad170f 100644
--- a/docker/centos/Dockerfile
+++ b/docker/centos/Dockerfile
@@ -73,6 +73,7 @@ ARG DISABLE_PYTHON_SCRIPTING=ON
 ARG ENABLE_LUA_SCRIPTING=ON
 ARG ENABLE_KUBERNETES=OFF
 ARG ENABLE_PROCFS=OFF
+ARG ENABLE_PROMETHEUS=OFF
 ARG DISABLE_CONTROLLER=OFF
 ARG CMAKE_BUILD_TYPE=Release
 # Perform the build
@@ -89,7 +90,7 @@ RUN cd $MINIFI_BASE_DIR \
     -DENABLE_TEST_PROCESSORS=OFF -DDISABLE_EXPRESSION_LANGUAGE="${DISABLE_EXPRESSION_LANGUAGE}" -DDISABLE_ROCKSDB="${DISABLE_ROCKSDB}" \
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
-    -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" \
+    -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" -DENABLE_PROMETHEUS="${ENABLE_PROMETHEUS}" \
     -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
     && scl enable devtoolset-10 -- make -j "$(nproc)" package
 
diff --git a/docker/fedora/Dockerfile b/docker/fedora/Dockerfile
index c5618c3b2..0989b03f4 100644
--- a/docker/fedora/Dockerfile
+++ b/docker/fedora/Dockerfile
@@ -77,6 +77,7 @@ ARG DISABLE_PYTHON_SCRIPTING=ON
 ARG ENABLE_LUA_SCRIPTING=ON
 ARG ENABLE_KUBERNETES=OFF
 ARG ENABLE_PROCFS=OFF
+ARG ENABLE_PROMETHEUS=OFF
 ARG DISABLE_CONTROLLER=OFF
 ARG CMAKE_BUILD_TYPE=Release
 # Perform the build
@@ -95,7 +96,7 @@ RUN cd $MINIFI_BASE_DIR \
     -DENABLE_TEST_PROCESSORS=OFF -DDISABLE_EXPRESSION_LANGUAGE="${DISABLE_EXPRESSION_LANGUAGE}" -DDISABLE_ROCKSDB="${DISABLE_ROCKSDB}" \
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
-    -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" \
+    -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" -DENABLE_PROMETHEUS="${ENABLE_PROMETHEUS}" \
     -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
     && make -j "$(nproc)" package
 
diff --git a/docker/focal/Dockerfile b/docker/focal/Dockerfile
index 96f62cf5e..d2c8e743a 100644
--- a/docker/focal/Dockerfile
+++ b/docker/focal/Dockerfile
@@ -76,6 +76,7 @@ ARG DISABLE_PYTHON_SCRIPTING=ON
 ARG ENABLE_LUA_SCRIPTING=ON
 ARG ENABLE_KUBERNETES=OFF
 ARG ENABLE_PROCFS=OFF
+ARG ENABLE_PROMETHEUS=OFF
 ARG DISABLE_CONTROLLER=OFF
 ARG CMAKE_BUILD_TYPE=Release
 ENV CC gcc-11
@@ -95,6 +96,6 @@ RUN cd $MINIFI_BASE_DIR \
     -DENABLE_TEST_PROCESSORS=OFF -DDISABLE_EXPRESSION_LANGUAGE="${DISABLE_EXPRESSION_LANGUAGE}" -DDISABLE_ROCKSDB="${DISABLE_ROCKSDB}" \
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
-    -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" \
+    -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" -DENABLE_PROMETHEUS="${ENABLE_PROMETHEUS}" \
     -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
     && make -j "$(nproc)" package
diff --git a/docker/requirements.txt b/docker/requirements.txt
index d8914ec05..19b50056e 100644
--- a/docker/requirements.txt
+++ b/docker/requirements.txt
@@ -8,3 +8,4 @@ m2crypto==0.37.1
 watchdog==2.1.2
 pyopenssl==21.0.0
 azure-storage-blob==12.9.0
+prometheus-api-client==0.5.0
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index f450bf751..d668beb08 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -261,3 +261,9 @@ class MiNiFi_integration_test:
 
     def check_azure_blob_and_snapshot_count(self, blob_and_snapshot_count, timeout_seconds):
         assert self.cluster.check_azure_blob_and_snapshot_count(blob_and_snapshot_count, timeout_seconds)
+
+    def check_metric_class_on_prometheus(self, metric_class, timeout_seconds):
+        assert self.cluster.wait_for_metric_class_on_prometheus(metric_class, timeout_seconds)
+
+    def check_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
+        assert self.cluster.wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name)
diff --git a/docker/test/integration/features/prometheus.feature b/docker/test/integration/features/prometheus.feature
new file mode 100644
index 000000000..7f1f1aa29
--- /dev/null
+++ b/docker/test/integration/features/prometheus.feature
@@ -0,0 +1,17 @@
+Feature: MiNiFi can publish metrics to Prometheus server
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: Published metrics are scraped by Prometheus server
+    Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile1 processor is connected to the PutFile
+    And a Prometheus server is set up
+    When all instances start up
+    Then "RepositoryMetrics" is published to the Prometheus server in less than 60 seconds
+    And "QueueMetrics" is published to the Prometheus server in less than 60 seconds
+    And "GetFileMetrics" processor metric is published to the Prometheus server in less than 60 seconds for "GetFile1" processor
+    And "FlowInformation" is published to the Prometheus server in less than 60 seconds
+    And "DeviceInfoNode" is published to the Prometheus server in less than 60 seconds
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index 09cc8c1ea..1cab875ea 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -12,8 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
 import json
 import logging
 import sys
@@ -29,6 +27,7 @@ from .SingleNodeDockerCluster import SingleNodeDockerCluster
 from .utils import retry_check
 from azure.storage.blob import BlobServiceClient
 from azure.core.exceptions import ResourceExistsError
+from prometheus_api_client import PrometheusConnect
 
 
 class DockerTestCluster(SingleNodeDockerCluster):
@@ -249,7 +248,7 @@ class DockerTestCluster(SingleNodeDockerCluster):
 
     @retry_check()
     def check_google_cloud_storage(self, gcs_container_name, content):
-        (code, output) = self.client.containers.get(gcs_container_name).exec_run(["grep", "-r", content, "/storage"])
+        (code, _) = self.client.containers.get(gcs_container_name).exec_run(["grep", "-r", content, "/storage"])
         return code == 0
 
     @retry_check()
@@ -286,3 +285,79 @@ class DockerTestCluster(SingleNodeDockerCluster):
                 tar.addfile(info, io.BytesIO(content.encode('utf-8')))
             with open(os.path.join(td, 'content.tar'), 'rb') as data:
                 return container.put_archive(os.path.dirname(dst_path), data.read())
+
+    def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_metric_class(metric_class):
+                return True
+            time.sleep(1)
+        return False
+
+    def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_processor_metric(metric_class, processor_name):
+                return True
+            time.sleep(1)
+        return False
+
+    def verify_processor_metric(self, metric_class, processor_name):
+        if metric_class == "GetFileMetrics":
+            return self.verify_getfile_metrics(processor_name)
+        else:
+            raise Exception("Metric class '%s' verification is not implemented" % metric_class)
+
+    def verify_metric_class(self, metric_class):
+        if metric_class == "RepositoryMetrics":
+            return self.verify_repository_metrics()
+        elif metric_class == "QueueMetrics":
+            return self.verify_queue_metrics()
+        elif metric_class == "FlowInformation":
+            return self.verify_flow_information_metrics()
+        elif metric_class == "DeviceInfoNode":
+            return self.verify_device_info_node_metrics()
+        else:
+            raise Exception("Metric class '%s' verification is not implemented" % metric_class)
+
+    def verify_repository_metrics(self):
+        prom = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
+        label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}]
+        for labels in label_list:
+            if not (self.verify_metric_exists(prom, 'minifi_is_running', 'RepositoryMetrics', labels)
+                    and self.verify_metric_exists(prom, 'minifi_is_full', 'RepositoryMetrics', labels)
+                    and self.verify_metric_exists(prom, 'minifi_repository_size', 'RepositoryMetrics', labels)):
+                return False
+        return True
+
+    def verify_queue_metrics(self):
+        prom = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
+        return self.verify_metric_exists(prom, 'minifi_queue_data_size', 'QueueMetrics') and \
+            self.verify_metric_exists(prom, 'minifi_queue_data_size_max', 'QueueMetrics') and \
+            self.verify_metric_exists(prom, 'minifi_queue_size', 'QueueMetrics') and \
+            self.verify_metric_exists(prom, 'minifi_queue_size_max', 'QueueMetrics')
+
+    def verify_getfile_metrics(self, processor_name):
+        prom = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
+        labels = {'processor_name': processor_name}
+        return self.verify_metric_exists(prom, 'minifi_onTrigger_invocations', 'GetFileMetrics', labels) and \
+            self.verify_metric_exists(prom, 'minifi_accepted_files', 'GetFileMetrics', labels) and \
+            self.verify_metric_exists(prom, 'minifi_input_bytes', 'GetFileMetrics', labels)
+
+    def verify_flow_information_metrics(self):
+        prom = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
+        return self.verify_metric_exists(prom, 'minifi_queue_data_size', 'FlowInformation') and \
+            self.verify_metric_exists(prom, 'minifi_queue_data_size_max', 'FlowInformation') and \
+            self.verify_metric_exists(prom, 'minifi_queue_size', 'FlowInformation') and \
+            self.verify_metric_exists(prom, 'minifi_queue_size_max', 'FlowInformation') and \
+            self.verify_metric_exists(prom, 'minifi_is_running', 'FlowInformation', {'component_name': 'FlowController'})
+
+    def verify_device_info_node_metrics(self):
+        prom = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
+        return self.verify_metric_exists(prom, 'minifi_physical_mem', 'DeviceInfoNode') and \
+            self.verify_metric_exists(prom, 'minifi_memory_usage', 'DeviceInfoNode') and \
+            self.verify_metric_exists(prom, 'minifi_cpu_utilization', 'DeviceInfoNode')
+
+    def verify_metric_exists(self, prometheus_client, metric_name, metric_class, labels={}):
+        labels['metric_class'] = metric_class
+        return len(prometheus_client.get_current_metric_value(metric_name=metric_name, label_config=labels)) > 0
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index 1d2def6a4..8db07d264 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -58,6 +58,8 @@ class ImageStore:
             image = self.__build_splunk_image()
         elif container_engine == "tcp-client":
             image = self.__build_tcp_client_image()
+        elif container_engine == "prometheus":
+            image = self.__build_prometheus_image()
         else:
             raise Exception("There is no associated image for " + container_engine)
 
@@ -96,6 +98,9 @@ class ImageStore:
                     echo "Database = postgres" >> /etc/odbc.ini
                 RUN sed -i -e 's/INFO/DEBUG/g' {minifi_root}/conf/minifi-log.properties
                 RUN echo nifi.flow.engine.threads=5 >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,FlowInformation,DeviceInfoNode >> {minifi_root}/conf/minifi.properties
                 USER minificpp
                 """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION,
                            minifi_root=MinifiContainer.MINIFI_ROOT))
@@ -176,6 +181,9 @@ class ImageStore:
 
         return self.__build_image(dockerfile)
 
+    def __build_prometheus_image(self):
+        return self.__build_image_by_path(self.test_dir + "/resources/prometheus", 'minifi-prometheus')
+
     def __build_image(self, dockerfile, context_files=[]):
         conf_dockerfile_buffer = BytesIO()
         docker_context_buffer = BytesIO()
diff --git a/docker/test/integration/minifi/core/PrometheusContainer.py b/docker/test/integration/minifi/core/PrometheusContainer.py
new file mode 100644
index 000000000..7ba2068e9
--- /dev/null
+++ b/docker/test/integration/minifi/core/PrometheusContainer.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from .Container import Container
+
+
+class PrometheusContainer(Container):
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'prometheus', vols, network, image_store, command)
+
+    def get_startup_finished_log_entry(self):
+        return "Server is ready to receive web requests."
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Creating and running Prometheus docker container...')
+        self.client.containers.run(
+            self.image_store.get_image(self.get_engine()),
+            detach=True,
+            name=self.name,
+            network=self.network.name,
+            ports={'9090/tcp': 9090},
+            entrypoint=self.command)
+        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index d98d4dfc5..15e0660f1 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -37,6 +37,7 @@ from .SyslogUdpClientContainer import SyslogUdpClientContainer
 from .SyslogTcpClientContainer import SyslogTcpClientContainer
 from .MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster
 from .TcpClientContainer import TcpClientContainer
+from .PrometheusContainer import PrometheusContainer
 
 
 class SingleNodeDockerCluster(Cluster):
@@ -124,6 +125,8 @@ class SingleNodeDockerCluster(Cluster):
             return self.containers.setdefault(name, SyslogTcpClientContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == "tcp-client":
             return self.containers.setdefault(name, TcpClientContainer(name, self.vols, self.network, self.image_store, command))
+        elif engine == "prometheus":
+            return self.containers.setdefault(name, PrometheusContainer(name, self.vols, self.network, self.image_store, command))
         else:
             raise Exception('invalid flow engine: \'%s\'' % engine)
 
diff --git a/docker/test/integration/resources/prometheus/Dockerfile b/docker/test/integration/resources/prometheus/Dockerfile
new file mode 100644
index 000000000..566dbb25f
--- /dev/null
+++ b/docker/test/integration/resources/prometheus/Dockerfile
@@ -0,0 +1,2 @@
+FROM prom/prometheus:v2.35.0
+ADD conf/prometheus.yml /etc/prometheus/
diff --git a/docker/test/integration/resources/prometheus/conf/prometheus.yml b/docker/test/integration/resources/prometheus/conf/prometheus.yml
new file mode 100644
index 000000000..8a75e2aa7
--- /dev/null
+++ b/docker/test/integration/resources/prometheus/conf/prometheus.yml
@@ -0,0 +1,7 @@
+global:
+  scrape_interval: 2s
+  evaluation_interval: 15s
+scrape_configs:
+  - job_name: "minifi"
+    static_configs:
+      - targets: ["minifi-cpp-flow:9936"]
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index db4d826a5..46e79b1de 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -819,3 +819,20 @@ def step_imp(context, content):
 def step_imp(context, content, source, source_type, host):
     attr = {"source": source, "sourcetype": source_type, "host": host}
     context.test.check_splunk_event_with_attributes("splunk", content, attr)
+
+
+# Prometheus
+@given("a Prometheus server is set up")
+def step_impl(context):
+    context.test.acquire_container("prometheus", "prometheus")
+
+
+@then("\"{metric_class}\" are published to the Prometheus server in less than {timeout_seconds:d} seconds")
+@then("\"{metric_class}\" is published to the Prometheus server in less than {timeout_seconds:d} seconds")
+def step_impl(context, metric_class, timeout_seconds):
+    context.test.check_metric_class_on_prometheus(metric_class, timeout_seconds)
+
+
+@then("\"{metric_class}\" processor metric is published to the Prometheus server in less than {timeout_seconds:d} seconds for \"{processor_name}\" processor")
+def step_impl(context, metric_class, timeout_seconds, processor_name):
+    context.test.check_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name)
diff --git a/encrypt-config/tests/ConfigFileEncryptorTests.cpp b/encrypt-config/tests/ConfigFileEncryptorTests.cpp
index 61439ed73..c9aa62cd2 100644
--- a/encrypt-config/tests/ConfigFileEncryptorTests.cpp
+++ b/encrypt-config/tests/ConfigFileEncryptorTests.cpp
@@ -51,10 +51,7 @@ bool check_encryption(const ConfigFile& test_file, const std::string& property_n
 }
 }  // namespace
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 // NOTE(fgerlits): these ==/!= operators are in the test file on purpose, and should not be part of production code,
 // as they take a varying amount of time depending on which character in the line differs, so they would open up
@@ -68,10 +65,7 @@ bool operator==(const  ConfigFile& left, const  ConfigFile& right) { return left
 bool operator!=(const  ConfigFile& left, const  ConfigFile& right) { return !(left == right); }
 
 }  // namespace encrypt_config
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
 
 TEST_CASE("ConfigFileEncryptor can encrypt the sensitive properties", "[encrypt-config][encryptSensitivePropertiesInFile]") {
   utils::crypto::Bytes KEY = utils::StringUtils::from_base64("6q9u8LEDy1/CdmSBm8oSqPS/Ds5UOD2nRouP8yUoK10=");
@@ -83,7 +77,7 @@ TEST_CASE("ConfigFileEncryptor can encrypt the sensitive properties", "[encrypt-
     uint32_t num_properties_encrypted = encryptSensitivePropertiesInFile(test_file, KEY);
 
     REQUIRE(num_properties_encrypted == 1);
-    REQUIRE(test_file.size() == 101);
+    REQUIRE(test_file.size() == 106);
     REQUIRE(check_encryption(test_file, Configuration::nifi_rest_api_password, original_password.length()));
 
     SECTION("calling encryptSensitiveProperties a second time does nothing") {
diff --git a/encrypt-config/tests/ConfigFileTests.cpp b/encrypt-config/tests/ConfigFileTests.cpp
index 04e2258a5..b8b24ccc0 100644
--- a/encrypt-config/tests/ConfigFileTests.cpp
+++ b/encrypt-config/tests/ConfigFileTests.cpp
@@ -90,7 +90,7 @@ TEST_CASE("ConfigFile creates an empty object from a nonexistent file", "[encryp
 
 TEST_CASE("ConfigFile can parse a simple config file", "[encrypt-config][constructor]") {
   ConfigFile test_file{std::ifstream{"resources/minifi.properties"}};
-  REQUIRE(test_file.size() == 100);
+  REQUIRE(test_file.size() == 105);
 }
 
 TEST_CASE("ConfigFile can test whether a key is present", "[encrypt-config][hasValue]") {
@@ -102,7 +102,7 @@ TEST_CASE("ConfigFile can test whether a key is present", "[encrypt-config][hasV
 
 TEST_CASE("ConfigFile can read empty properties correctly", "[encrypt-config][constructor]") {
   ConfigFile test_file{std::ifstream{"resources/with-additional-sensitive-props.minifi.properties"}};
-  REQUIRE(test_file.size() == 102);
+  REQUIRE(test_file.size() == 107);
 
   auto empty_property = test_file.getValue(Configuration::nifi_security_need_ClientAuth);
   REQUIRE(empty_property);
@@ -143,7 +143,7 @@ TEST_CASE("ConfigFile can add a new setting after an existing setting", "[encryp
 
   SECTION("valid key") {
     test_file.insertAfter(Configuration::nifi_rest_api_password, "nifi.rest.api.password.protected", "my-cipher-name");
-    REQUIRE(test_file.size() == 101);
+    REQUIRE(test_file.size() == 106);
     REQUIRE(test_file.getValue("nifi.rest.api.password.protected") == "my-cipher-name");
   }
 
@@ -158,7 +158,7 @@ TEST_CASE("ConfigFile can add a new setting at the end", "[encrypt-config][appen
   const std::string KEY = "nifi.bootstrap.sensitive.key";
   const std::string VALUE = "aa411f289c91685ef9d5a9e5a4fad9393ff4c7a78ab978484323488caed7a9ab";
   test_file.append(KEY, VALUE);
-  REQUIRE(test_file.size() == 101);
+  REQUIRE(test_file.size() == 106);
   REQUIRE(test_file.getValue(KEY) == std::make_optional(VALUE));
 }
 
diff --git a/encrypt-config/tests/resources/minifi.properties b/encrypt-config/tests/resources/minifi.properties
index d7cb215e0..7d1c73b01 100644
--- a/encrypt-config/tests/resources/minifi.properties
+++ b/encrypt-config/tests/resources/minifi.properties
@@ -74,11 +74,11 @@ nifi.c2.agent.identifier=EncryptConfigTester-001
 ## define metrics reported
 nifi.c2.root.class.definitions=metrics
 nifi.c2.root.class.definitions.metrics.name=metrics
-nifi.c2.root.class.definitions.metrics.metrics=typedmetrics
-nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.name=RuntimeMetrics
-nifi.c2.root.class.definitions.metrics.metrics.queuemetrics.name=QueueMetrics
-nifi.c2.root.class.definitions.metrics.metrics.queuemetrics.classes=QueueMetrics
-nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.classes=ProcessMetrics,SystemInformation
+nifi.c2.root.class.definitions.metrics.metrics=runtimemetrics,loadmetrics,processorMetrics
+nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.name=RuntimeMetrics
+nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes=DeviceInfoNode,FlowInformation
+nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name=LoadMetrics
+nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes=QueueMetrics,RepositoryMetrics
 nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetric
 nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileMetrics
 
@@ -98,3 +98,8 @@ nifi.jvm.options=-Xmx1G
 nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/
 nifi.c2.flow.id=
 nifi.c2.flow.url=
+
+# Publish metrics to external consumers
+# nifi.metrics.publisher.class=PrometheusMetricsPublisher
+# nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936
+# nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
diff --git a/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties b/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties
index e7f4b00bd..b3c7d8559 100644
--- a/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties
+++ b/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties
@@ -76,11 +76,11 @@ nifi.c2.agent.identifier=EncryptConfigTester-001
 ## define metrics reported
 nifi.c2.root.class.definitions=metrics
 nifi.c2.root.class.definitions.metrics.name=metrics
-nifi.c2.root.class.definitions.metrics.metrics=typedmetrics
-nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.name=RuntimeMetrics
-nifi.c2.root.class.definitions.metrics.metrics.queuemetrics.name=QueueMetrics
-nifi.c2.root.class.definitions.metrics.metrics.queuemetrics.classes=QueueMetrics
-nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.classes=ProcessMetrics,SystemInformation
+nifi.c2.root.class.definitions.metrics.metrics=runtimemetrics,loadmetrics,processorMetrics
+nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.name=RuntimeMetrics
+nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes=DeviceInfoNode,FlowInformation
+nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name=LoadMetrics
+nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes=QueueMetrics,RepositoryMetrics
 nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetric
 nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileMetrics
 
@@ -100,3 +100,8 @@ nifi.jvm.options=-Xmx1G
 nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/
 nifi.c2.flow.id=
 nifi.c2.flow.url=
+
+# Publish metrics to external consumers
+# nifi.metrics.publisher.class=PrometheusMetricsPublisher
+# nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936
+# nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
diff --git a/extensions/civetweb/CMakeLists.txt b/extensions/civetweb/CMakeLists.txt
index 1c875bce1..e7ac23da5 100644
--- a/extensions/civetweb/CMakeLists.txt
+++ b/extensions/civetweb/CMakeLists.txt
@@ -21,10 +21,6 @@ if (DISABLE_CIVET)
   return()
 endif()
 
-include(BundledCivetWeb)
-use_bundled_civetweb(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
-list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/civetweb/dummy")
-
 include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 
 include_directories(${CMAKE_SOURCE_DIR}/libminifi/include
diff --git a/extensions/http-curl/tests/C2EmptyMetricTest.cpp b/extensions/http-curl/tests/C2EmptyMetricTest.cpp
new file mode 100644
index 000000000..84e2435ac
--- /dev/null
+++ b/extensions/http-curl/tests/C2EmptyMetricTest.cpp
@@ -0,0 +1,115 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <string>
+#include <iostream>
+#include <filesystem>
+
+#include "TestBase.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "processors/TailFile.h"
+#include "state/ProcessorController.h"
+#include "utils/file/FileUtils.h"
+#include "utils/TestUtils.h"
+#include "processors/GetTCP.h"
+#include "utils/StringUtils.h"
+#include "utils/file/PathUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+class VerifyEmptyC2Metric : public VerifyC2Base {
+ public:
+  explicit VerifyEmptyC2Metric(const std::atomic_bool& metrics_found) : metrics_found_(metrics_found) {
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setTrace<minifi::c2::C2Client>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setOff<minifi::processors::GetTCP>();
+    VerifyC2Base::testSetup();
+  }
+
+  void runAssertions() override {
+    using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+    assert(verifyEventHappenedInPollTime(40s, [&] { return metrics_found_.load(); }, 1s));
+  }
+
+ private:
+  const std::atomic_bool& metrics_found_;
+};
+
+class MetricsHandler: public HeartbeatHandler {
+ public:
+  explicit MetricsHandler(std::atomic_bool& metrics_found, std::shared_ptr<minifi::Configure> configuration)
+    : HeartbeatHandler(std::move(configuration)),
+      metrics_found_(metrics_found) {
+  }
+
+  void handleHeartbeat(const rapidjson::Document& root, struct mg_connection* conn) override {
+    verifyMetrics(root);
+    sendEmptyHeartbeatResponse(conn);
+  }
+
+ private:
+  static void sendEmptyHeartbeatResponse(struct mg_connection* conn) {
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+  }
+
+  void verifyMetrics(const rapidjson::Document& root) {
+    auto initial_metrics_verified =
+      root.HasMember("metrics") &&
+      root["metrics"].HasMember("LoadMetrics") &&
+      verifyLoadMetrics(root["metrics"]["LoadMetrics"]);
+    if (initial_metrics_verified) {
+      metrics_found_ = true;
+    }
+  }
+
+  static bool verifyLoadMetrics(const rapidjson::Value& load_metrics) {
+    return load_metrics.HasMember("RepositoryMetrics") &&
+      !load_metrics.HasMember("QueueMetrics") &&
+      load_metrics["RepositoryMetrics"].HasMember("ff") &&
+      load_metrics["RepositoryMetrics"].HasMember("repo_name");
+  }
+
+  std::atomic_bool& metrics_found_;
+};
+
+}  // namespace org::apache::nifi::minifi::test
+
+int main(int argc, char **argv) {
+  std::atomic_bool metrics_found{false};
+  const cmd_args args = parse_cmdline_args(argc, argv, "api/heartbeat");
+  org::apache::nifi::minifi::test::VerifyEmptyC2Metric harness(metrics_found);
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions", "metrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.name", "metrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics", "loadmetrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name", "LoadMetrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes", "QueueMetrics,RepositoryMetrics");
+  harness.setKeyDir(args.key_dir);
+  org::apache::nifi::minifi::test::MetricsHandler handler(metrics_found, harness.getConfiguration());
+  harness.setUrl(args.url, &handler);
+  harness.run(args.test_file);
+  return 0;
+}
diff --git a/extensions/http-curl/tests/C2MetricsTest.cpp b/extensions/http-curl/tests/C2MetricsTest.cpp
new file mode 100644
index 000000000..7661fdd84
--- /dev/null
+++ b/extensions/http-curl/tests/C2MetricsTest.cpp
@@ -0,0 +1,212 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <string>
+#include <iostream>
+#include <filesystem>
+
+#include "TestBase.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "processors/TailFile.h"
+#include "state/ProcessorController.h"
+#include "utils/file/FileUtils.h"
+#include "utils/TestUtils.h"
+#include "processors/GetTCP.h"
+#include "utils/StringUtils.h"
+#include "utils/file/PathUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+class VerifyC2Metrics : public VerifyC2Base {
+ public:
+  explicit VerifyC2Metrics(const std::atomic_bool& metrics_updated_successfully) : metrics_updated_successfully_(metrics_updated_successfully) {
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setTrace<minifi::c2::C2Client>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setOff<minifi::processors::GetTCP>();
+    VerifyC2Base::testSetup();
+  }
+
+  void runAssertions() override {
+    using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+    assert(verifyEventHappenedInPollTime(40s, [&] { return metrics_updated_successfully_.load(); }, 1s));
+  }
+
+ private:
+  const std::atomic_bool& metrics_updated_successfully_;
+};
+
+class MetricsHandler: public HeartbeatHandler {
+ public:
+  explicit MetricsHandler(std::atomic_bool& metrics_updated_successfully, std::shared_ptr<minifi::Configure> configuration, const std::string& replacement_config_path)
+    : HeartbeatHandler(std::move(configuration)),
+      metrics_updated_successfully_(metrics_updated_successfully),
+      replacement_config_(getReplacementConfigAsJsonValue(replacement_config_path)) {
+  }
+
+  void handleHeartbeat(const rapidjson::Document& root, struct mg_connection* conn) override {
+    switch (test_state_) {
+      case TestState::VERIFY_INITIAL_METRICS: {
+        verifyMetrics(root);
+        sendEmptyHeartbeatResponse(conn);
+        break;
+      }
+      case TestState::SEND_NEW_CONFIG: {
+        sendHeartbeatResponse("UPDATE", "configuration", "889348", conn, {{"configuration_data", replacement_config_}});
+        test_state_ = TestState::VERIFY_UPDATED_METRICS;
+        break;
+      }
+      case TestState::VERIFY_UPDATED_METRICS: {
+        verifyUpdatedMetrics(root);
+        sendEmptyHeartbeatResponse(conn);
+        break;
+      }
+    }
+  }
+
+ private:
+  enum class TestState {
+    VERIFY_INITIAL_METRICS,
+    SEND_NEW_CONFIG,
+    VERIFY_UPDATED_METRICS
+  };
+
+  static void sendEmptyHeartbeatResponse(struct mg_connection* conn) {
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+  }
+
+  void verifyMetrics(const rapidjson::Document& root) {
+    auto initial_metrics_verified =
+      root.HasMember("metrics") &&
+      root["metrics"].HasMember("RuntimeMetrics") &&
+      root["metrics"].HasMember("LoadMetrics") &&
+      root["metrics"].HasMember("ProcessorMetrics") &&
+      verifyRuntimeMetrics(root["metrics"]["RuntimeMetrics"]) &&
+      verifyLoadMetrics(root["metrics"]["LoadMetrics"]) &&
+      verifyProcessorMetrics(root["metrics"]["ProcessorMetrics"]);
+    if (initial_metrics_verified) {
+      test_state_ = TestState::SEND_NEW_CONFIG;
+    }
+  }
+
+  void verifyUpdatedMetrics(const rapidjson::Document& root) {
+    auto updated_metrics_verified =
+      root.HasMember("metrics") &&
+      root["metrics"].HasMember("RuntimeMetrics") &&
+      root["metrics"].HasMember("LoadMetrics") &&
+      !root["metrics"].HasMember("ProcessorMetrics") &&
+      verifyUpdatedRuntimeMetrics(root["metrics"]["RuntimeMetrics"]) &&
+      verifyUpdatedLoadMetrics(root["metrics"]["LoadMetrics"]);
+
+    if (updated_metrics_verified) {
+      metrics_updated_successfully_ = true;
+    }
+  }
+
+  static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
+    return runtime_metrics.HasMember("deviceInfo") &&
+      runtime_metrics.HasMember("flowInfo") &&
+      runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
+      runtime_metrics["flowInfo"].HasMember("queues") &&
+      runtime_metrics["flowInfo"].HasMember("components") &&
+      runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997") &&
+      runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
+      runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") &&
+      runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
+  }
+
+  static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
+    return runtime_metrics.HasMember("deviceInfo") &&
+      runtime_metrics.HasMember("flowInfo") &&
+      runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
+      runtime_metrics["flowInfo"].HasMember("queues") &&
+      runtime_metrics["flowInfo"].HasMember("components") &&
+      runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332") &&
+      runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
+      runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile") &&
+      runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
+  }
+
+  static bool verifyLoadMetrics(const rapidjson::Value& load_metrics) {
+    return load_metrics.HasMember("RepositoryMetrics") &&
+      load_metrics.HasMember("QueueMetrics") &&
+      load_metrics["RepositoryMetrics"].HasMember("ff") &&
+      load_metrics["RepositoryMetrics"].HasMember("repo_name") &&
+      load_metrics["QueueMetrics"].HasMember("GetTCP/success/LogAttribute");
+  }
+
+  static bool verifyUpdatedLoadMetrics(const rapidjson::Value& load_metrics) {
+    return load_metrics.HasMember("RepositoryMetrics") &&
+      load_metrics.HasMember("QueueMetrics") &&
+      load_metrics["RepositoryMetrics"].HasMember("ff") &&
+      load_metrics["RepositoryMetrics"].HasMember("repo_name") &&
+      load_metrics["QueueMetrics"].HasMember("GenerateFlowFile/success/LogAttribute") &&
+      std::stoi(load_metrics["QueueMetrics"]["GenerateFlowFile/success/LogAttribute"]["queued"].GetString()) > 0;
+  }
+
+  static bool verifyProcessorMetrics(const rapidjson::Value& processor_metrics) {
+    return processor_metrics.HasMember("GetTCPMetrics") &&
+      processor_metrics["GetTCPMetrics"].HasMember("OnTriggerInvocations") &&
+      processor_metrics["GetTCPMetrics"]["OnTriggerInvocations"].GetUint() > 0;
+  }
+
+  [[nodiscard]] static std::string getReplacementConfigAsJsonValue(const std::string& replacement_config_path) {
+    std::ifstream is(replacement_config_path);
+    auto content = std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>());
+    content = utils::StringUtils::replaceAll(content, "\n", "\\n");
+    content = utils::StringUtils::replaceAll(content, "\"", "\\\"");
+    return content;
+  }
+
+  std::atomic_bool& metrics_updated_successfully_;
+  TestState test_state_ = TestState::VERIFY_INITIAL_METRICS;
+  std::string replacement_config_;
+};
+
+}  // namespace org::apache::nifi::minifi::test
+
+int main(int argc, char **argv) {
+  std::atomic_bool metrics_updated_successfully{false};
+  const cmd_args args = parse_cmdline_args(argc, argv, "api/heartbeat");
+  org::apache::nifi::minifi::test::VerifyC2Metrics harness(metrics_updated_successfully);
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions", "metrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.name", "metrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics", "runtimemetrics,loadmetrics,processorMetrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.name", "RuntimeMetrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes", "DeviceInfoNode,FlowInformation");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name", "LoadMetrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes", "QueueMetrics,RepositoryMetrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name", "ProcessorMetrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes", "GetTCPMetrics");
+  harness.setKeyDir(args.key_dir);
+  auto replacement_path = args.test_file;
+  utils::StringUtils::replaceAll(replacement_path, "TestC2Metrics", "TestC2MetricsUpdate");
+  utils::StringUtils::replaceAll(replacement_path, "/", std::string(1, org::apache::nifi::minifi::utils::file::FileUtils::get_separator()));
+  org::apache::nifi::minifi::test::MetricsHandler handler(metrics_updated_successfully, harness.getConfiguration(), replacement_path);
+  harness.setUrl(args.url, &handler);
+  harness.run(args.test_file);
+  return 0;
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index cd4ce560d..8bc0c134f 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -96,11 +96,13 @@ add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/T
 add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/")
 add_test(NAME VerifyInvokeHTTPPostTest COMMAND VerifyInvokeHTTPPostTest "${TEST_RESOURCES}/TestInvokeHTTPPost.yml")
 add_test(NAME AbsoluteTimeoutTest COMMAND AbsoluteTimeoutTest)
-add_test(NAME C2PauseResumeTest COMMAND C2PauseResumeTest "${TEST_RESOURCES}/C2PauseResumeTest.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2PauseResumeTest COMMAND C2PauseResumeTest "${TEST_RESOURCES}/C2PauseResumeTest.yml")
 add_test(NAME C2LogHeartbeatTest COMMAND C2LogHeartbeatTest)
 add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest)
 add_test(NAME C2PropertiesUpdateTests COMMAND C2PropertiesUpdateTests)
-add_test(NAME C2ClearCoreComponentStateTest COMMAND C2ClearCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml"  "${TEST_RESOURCES}/")
-add_test(NAME C2MultipleCommandsTest COMMAND C2MultipleCommandsTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2ClearCoreComponentStateTest COMMAND C2ClearCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml")
+add_test(NAME C2MultipleCommandsTest COMMAND C2MultipleCommandsTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml")
 add_test(NAME C2UpdateAssetTest COMMAND C2UpdateAssetTest)
 add_test(NAME C2CompressTest COMMAND C2CompressTest)
+add_test(NAME C2MetricsTest COMMAND C2MetricsTest "${TEST_RESOURCES}/TestC2Metrics.yml")
+add_test(NAME C2EmptyMetricTest COMMAND C2EmptyMetricTest "${TEST_RESOURCES}/TestEmpty.yml")
diff --git a/extensions/civetweb/CMakeLists.txt b/extensions/prometheus/CMakeLists.txt
similarity index 51%
copy from extensions/civetweb/CMakeLists.txt
copy to extensions/prometheus/CMakeLists.txt
index 1c875bce1..b6d1001a8 100644
--- a/extensions/civetweb/CMakeLists.txt
+++ b/extensions/prometheus/CMakeLists.txt
@@ -17,26 +17,19 @@
 # under the License.
 #
 
-if (DISABLE_CIVET)
+if (NOT (ENABLE_PROMETHEUS OR ENABLE_ALL))
   return()
 endif()
 
-include(BundledCivetWeb)
-use_bundled_civetweb(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
-list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/civetweb/dummy")
-
+include(Prometheus)
 include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 
-include_directories(${CMAKE_SOURCE_DIR}/libminifi/include
-                    ${CMAKE_SOURCE_DIR}/libminifi/include/core
-                    ${CMAKE_SOURCE_DIR}/thirdparty/
-                    ./include)
+file(GLOB SOURCES "*.cpp")
 
-file(GLOB SOURCES  "processors/*.cpp" "protocols/*.cpp")
+add_library(minifi-prometheus SHARED ${SOURCES})
 
-add_library(minifi-civet-extensions SHARED ${SOURCES})
-target_link_libraries(minifi-civet-extensions ${LIBMINIFI} Threads::Threads)
-target_link_libraries(minifi-civet-extensions CIVETWEB::c-library CIVETWEB::civetweb-cpp)
+target_link_libraries(minifi-prometheus ${LIBMINIFI} prometheus-cpp::core  prometheus-cpp::pull)
+target_include_directories(minifi-prometheus PUBLIC ${prometheus-cpp_INCLUDE_DIRS})
 
-register_extension(minifi-civet-extensions CIVETWEB CIVETWEB "This enables ListenHTTP" "extensions/civetweb/tests")
-register_extension_linter(minifi-civet-extensions-linter)
+register_extension(minifi-prometheus "PROMETHEUS EXTENSIONS" PROMETHEUS-EXTENSIONS "This enables Prometheus support" "extensions/prometheus/tests")
+register_extension_linter(minifi-prometheus-linter)
diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/extensions/prometheus/MetricsExposer.h
similarity index 61%
copy from libminifi/src/core/state/nodes/AgentInformation.cpp
copy to extensions/prometheus/MetricsExposer.h
index 01f404729..ff12d546e 100644
--- a/libminifi/src/core/state/nodes/AgentInformation.cpp
+++ b/extensions/prometheus/MetricsExposer.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,24 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "core/state/nodes/AgentInformation.h"
-#include "core/Resource.h"
+#pragma once
+
+#include <memory>
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+#include "PublishedMetricGaugeCollection.h"
 
-utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_;
-std::mutex AgentStatus::cpu_load_tracker_mutex_;
+namespace org::apache::nifi::minifi::extensions::prometheus {
 
-REGISTER_RESOURCE(AgentInformation, DescriptionOnly);
+class MetricsExposer  {
+ public:
+  virtual void registerMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) = 0;
+  virtual void removeMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) = 0;
+  virtual ~MetricsExposer() = default;
+};
 
-} /* namespace response */
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::extensions::prometheus
diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/extensions/prometheus/PrometheusExposerWrapper.cpp
similarity index 54%
copy from libminifi/src/core/state/nodes/AgentInformation.cpp
copy to extensions/prometheus/PrometheusExposerWrapper.cpp
index 01f404729..48ee9b71f 100644
--- a/libminifi/src/core/state/nodes/AgentInformation.cpp
+++ b/extensions/prometheus/PrometheusExposerWrapper.cpp
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,24 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "core/state/nodes/AgentInformation.h"
-#include "core/Resource.h"
+#include "PrometheusExposerWrapper.h"
+
+#include <cinttypes>
+
+namespace org::apache::nifi::minifi::extensions::prometheus {
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+PrometheusExposerWrapper::PrometheusExposerWrapper(uint32_t port)
+    : exposer_(std::to_string(port)) {
+  logger_->log_info("Started Prometheus metrics publisher on port %" PRIu32, port);
+}
 
-utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_;
-std::mutex AgentStatus::cpu_load_tracker_mutex_;
+void PrometheusExposerWrapper::registerMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) {
+  exposer_.RegisterCollectable(metric);
+}
 
-REGISTER_RESOURCE(AgentInformation, DescriptionOnly);
+void PrometheusExposerWrapper::removeMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) {
+  exposer_.RemoveCollectable(metric);
+}
 
-} /* namespace response */
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::extensions::prometheus
diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/extensions/prometheus/PrometheusExposerWrapper.h
similarity index 51%
copy from libminifi/src/core/state/nodes/AgentInformation.cpp
copy to extensions/prometheus/PrometheusExposerWrapper.h
index 01f404729..2cbe992fa 100644
--- a/libminifi/src/core/state/nodes/AgentInformation.cpp
+++ b/extensions/prometheus/PrometheusExposerWrapper.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,24 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "core/state/nodes/AgentInformation.h"
-#include "core/Resource.h"
+#pragma once
+
+#include <memory>
+
+#include "MetricsExposer.h"
+#include "prometheus/exposer.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::extensions::prometheus {
 
-utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_;
-std::mutex AgentStatus::cpu_load_tracker_mutex_;
+class PrometheusExposerWrapper : public MetricsExposer {
+ public:
+  explicit PrometheusExposerWrapper(uint32_t port);
+  void registerMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) override;
+  void removeMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) override;
 
-REGISTER_RESOURCE(AgentInformation, DescriptionOnly);
+ private:
+  ::prometheus::Exposer exposer_;
+  std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<PrometheusExposerWrapper>::getLogger()};
+};
 
-} /* namespace response */
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::extensions::prometheus
diff --git a/extensions/prometheus/PrometheusMetricsPublisher.cpp b/extensions/prometheus/PrometheusMetricsPublisher.cpp
new file mode 100644
index 000000000..007723367
--- /dev/null
+++ b/extensions/prometheus/PrometheusMetricsPublisher.cpp
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PrometheusMetricsPublisher.h"
+
+#include <utility>
+
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "PrometheusExposerWrapper.h"
+
+namespace org::apache::nifi::minifi::extensions::prometheus {
+
+PrometheusMetricsPublisher::PrometheusMetricsPublisher(const std::string &name, const utils::Identifier &uuid, std::unique_ptr<MetricsExposer> exposer)
+  : CoreComponent(name, uuid),
+    exposer_(std::move(exposer)) {}
+
+void PrometheusMetricsPublisher::initialize(const std::shared_ptr<Configure>& configuration, state::response::ResponseNodeLoader& response_node_loader, core::ProcessGroup* root) {
+  gsl_Expects(configuration);
+  configuration_ = configuration;
+  response_node_loader_ = &response_node_loader;
+  if (!exposer_) {
+    exposer_ = std::make_unique<PrometheusExposerWrapper>(readPort());
+  }
+  loadMetricNodes(root);
+}
+
+uint32_t PrometheusMetricsPublisher::readPort() {
+  if (auto port = configuration_->get(Configuration::nifi_metrics_publisher_prometheus_metrics_publisher_port)) {
+    return std::stoul(*port);
+  }
+
+  throw Exception(GENERAL_EXCEPTION, "Port not configured for Prometheus metrics publisher!");
+}
+
+void PrometheusMetricsPublisher::clearMetricNodes() {
+  std::lock_guard<std::mutex> lock(registered_metrics_mutex_);
+  logger_->log_debug("Clearing all metric nodes.");
+  for (const auto& collection : gauge_collections_) {
+    exposer_->removeMetric(collection);
+  }
+  gauge_collections_.clear();
+}
+
+void PrometheusMetricsPublisher::loadMetricNodes(core::ProcessGroup* root) {
+  std::lock_guard<std::mutex> lock(registered_metrics_mutex_);
+  auto nodes = getMetricNodes(root);
+
+  for (const auto& metric_node : nodes) {
+    logger_->log_debug("Registering metric node '%s'", metric_node->getName());
+    gauge_collections_.push_back(std::make_shared<PublishedMetricGaugeCollection>(metric_node));
+    exposer_->registerMetric(gauge_collections_.back());
+  }
+}
+
+std::vector<std::shared_ptr<state::response::ResponseNode>> PrometheusMetricsPublisher::getMetricNodes(core::ProcessGroup* root) {
+  std::vector<std::shared_ptr<state::response::ResponseNode>> nodes;
+  if (auto metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_metrics)) {
+    auto metric_classes = utils::StringUtils::split(*metric_classes_str, ",");
+    for (const std::string& clazz : metric_classes) {
+      auto response_node = response_node_loader_->loadResponseNode(clazz, root);
+      if (!response_node) {
+        logger_->log_warn("Metric class '%s' could not be loaded.", clazz);
+        continue;
+      }
+      nodes.push_back(response_node);
+    }
+  }
+  return nodes;
+}
+
+REGISTER_RESOURCE(PrometheusMetricsPublisher, DescriptionOnly);
+
+}  // namespace org::apache::nifi::minifi::extensions::prometheus
diff --git a/extensions/prometheus/PrometheusMetricsPublisher.h b/extensions/prometheus/PrometheusMetricsPublisher.h
new file mode 100644
index 000000000..b3ac09644
--- /dev/null
+++ b/extensions/prometheus/PrometheusMetricsPublisher.h
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <vector>
+#include <string>
+#include <mutex>
+
+#include "core/state/MetricsPublisher.h"
+#include "PublishedMetricGaugeCollection.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "MetricsExposer.h"
+
+namespace org::apache::nifi::minifi::extensions::prometheus {
+
+class PrometheusMetricsPublisher : public core::CoreComponent, public state::MetricsPublisher {
+ public:
+  explicit PrometheusMetricsPublisher(const std::string &name, const utils::Identifier &uuid = {}, std::unique_ptr<MetricsExposer> exposer = nullptr);
+
+  EXTENSIONAPI static constexpr const char* Description = "HTTP server that exposes MiNiFi metrics for Prometheus to scrape";
+
+  void initialize(const std::shared_ptr<Configure>& configuration, state::response::ResponseNodeLoader& response_node_loader, core::ProcessGroup* root) override;
+  void clearMetricNodes() override;
+  void loadMetricNodes(core::ProcessGroup* root) override;
+
+ private:
+  uint32_t readPort();
+  std::vector<std::shared_ptr<state::response::ResponseNode>> getMetricNodes(core::ProcessGroup* root);
+
+  std::mutex registered_metrics_mutex_;
+  std::vector<std::shared_ptr<PublishedMetricGaugeCollection>> gauge_collections_;
+  std::unique_ptr<MetricsExposer> exposer_;
+  std::shared_ptr<Configure> configuration_;
+  state::response::ResponseNodeLoader* response_node_loader_ = nullptr;
+  std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<PrometheusMetricsPublisher>::getLogger()};
+};
+
+}  // namespace org::apache::nifi::minifi::extensions::prometheus
diff --git a/extensions/prometheus/PublishedMetricGaugeCollection.cpp b/extensions/prometheus/PublishedMetricGaugeCollection.cpp
new file mode 100644
index 000000000..e8974d386
--- /dev/null
+++ b/extensions/prometheus/PublishedMetricGaugeCollection.cpp
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PublishedMetricGaugeCollection.h"
+
+#include <utility>
+#include <algorithm>
+
+#include "prometheus/client_metric.h"
+#include "state/PublishedMetricProvider.h"
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/transform.hpp"
+
+namespace org::apache::nifi::minifi::extensions::prometheus {
+
+PublishedMetricGaugeCollection::PublishedMetricGaugeCollection(std::shared_ptr<state::PublishedMetricProvider> metric) : metric_{std::move(metric)} {
+}
+
+std::vector<::prometheus::MetricFamily> PublishedMetricGaugeCollection::Collect() const {
+  std::vector<::prometheus::MetricFamily> collection;
+  for (const auto& metric : metric_->calculateMetrics()) {
+    ::prometheus::ClientMetric client_metric;
+    client_metric.label = ranges::views::transform(metric.labels, [](auto&& kvp) { return ::prometheus::ClientMetric::Label{kvp.first, kvp.second}; })
+      | ranges::to<std::vector<::prometheus::ClientMetric::Label>>;
+    client_metric.gauge = ::prometheus::ClientMetric::Gauge{metric.value};
+    collection.push_back({
+      .name = "minifi_" + metric.name,
+      .help = "",
+      .type = ::prometheus::MetricType::Gauge,
+      .metric = { std::move(client_metric) }
+    });
+  }
+  return collection;
+}
+
+}  // namespace org::apache::nifi::minifi::extensions::prometheus
diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/extensions/prometheus/PublishedMetricGaugeCollection.h
similarity index 56%
copy from libminifi/src/core/state/nodes/AgentInformation.cpp
copy to extensions/prometheus/PublishedMetricGaugeCollection.h
index 01f404729..8d8982a8f 100644
--- a/libminifi/src/core/state/nodes/AgentInformation.cpp
+++ b/extensions/prometheus/PublishedMetricGaugeCollection.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,24 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "core/state/nodes/AgentInformation.h"
-#include "core/Resource.h"
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "state/PublishedMetricProvider.h"
+#include "prometheus/collectable.h"
+#include "prometheus/metric_family.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::extensions::prometheus {
 
-utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_;
-std::mutex AgentStatus::cpu_load_tracker_mutex_;
+class PublishedMetricGaugeCollection : public ::prometheus::Collectable {
+ public:
+  explicit PublishedMetricGaugeCollection(std::shared_ptr<state::PublishedMetricProvider> metric);
+  std::vector<::prometheus::MetricFamily> Collect() const override;
 
-REGISTER_RESOURCE(AgentInformation, DescriptionOnly);
+ private:
+  std::shared_ptr<state::PublishedMetricProvider> metric_;
+};
 
-} /* namespace response */
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::extensions::prometheus
diff --git a/extensions/prometheus/tests/CMakeLists.txt b/extensions/prometheus/tests/CMakeLists.txt
new file mode 100644
index 000000000..c84dc55a8
--- /dev/null
+++ b/extensions/prometheus/tests/CMakeLists.txt
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB PROMETHEUS_TESTS  "*.cpp")
+
+SET(PROMETHEUS_TEST_COUNT 0)
+FOREACH(testfile ${PROMETHEUS_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${testfile}")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/prometheus")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+    createTests("${testfilename}")
+    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+    target_link_libraries(${testfilename} minifi-prometheus)
+    MATH(EXPR PROMETHEUS_TEST_COUNT "${PROMETHEUS_TEST_COUNT}+1")
+    add_test(NAME "${testfilename}" COMMAND "${testfilename}")
+ENDFOREACH()
+message("-- Finished building ${PROMETHEUS_TEST_COUNT} Prometheus related test file(s)...")
diff --git a/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
new file mode 100644
index 000000000..111089909
--- /dev/null
+++ b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <vector>
+#include <algorithm>
+
+#include "Catch.h"
+#include "PrometheusMetricsPublisher.h"
+#include "properties/Configure.h"
+#include "MetricsExposer.h"
+#include "core/state/nodes/ResponseNodeLoader.h"
+#include "core/RepositoryFactory.h"
+#include "range/v3/algorithm/find.hpp"
+
+namespace org::apache::nifi::minifi::extensions::prometheus::test {
+
+class DummyMetricsExposer : public MetricsExposer {
+ public:
+  void registerMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) override {
+    metrics_.push_back(metric);
+  }
+
+  void removeMetric(const std::shared_ptr<PublishedMetricGaugeCollection>&) override {
+  }
+
+  [[nodiscard]] std::vector<std::shared_ptr<PublishedMetricGaugeCollection>> getMetrics() const {
+    return metrics_;
+  }
+
+ private:
+  std::vector<std::shared_ptr<PublishedMetricGaugeCollection>> metrics_;
+};
+
+class PrometheusPublisherTestFixture {
+ public:
+  explicit PrometheusPublisherTestFixture(bool user_dummy_exposer)
+    : configuration_(std::make_shared<Configure>()),
+      provenance_repo_(core::createRepository("provenancerepository", true)),
+      flow_file_repo_(core::createRepository("flowfilerepository", true)),
+      response_node_loader_(configuration_, provenance_repo_, flow_file_repo_, nullptr) {
+    std::unique_ptr<DummyMetricsExposer> dummy_exposer;
+    if (user_dummy_exposer) {
+      dummy_exposer = std::make_unique<DummyMetricsExposer>();
+      exposer_ = dummy_exposer.get();
+    }
+    publisher_ = std::make_unique<PrometheusMetricsPublisher>("publisher", utils::Identifier(), std::move(dummy_exposer));
+  }
+
+ protected:
+  std::shared_ptr<Configure> configuration_;
+  std::shared_ptr<core::Repository> provenance_repo_;
+  std::shared_ptr<core::Repository> flow_file_repo_;
+  state::response::ResponseNodeLoader response_node_loader_;
+  std::unique_ptr<PrometheusMetricsPublisher> publisher_;
+  DummyMetricsExposer* exposer_ = nullptr;
+};
+
+class PrometheusPublisherTestFixtureWithRealExposer : public PrometheusPublisherTestFixture {
+ public:
+  PrometheusPublisherTestFixtureWithRealExposer() : PrometheusPublisherTestFixture(false) {}
+};
+
+class PrometheusPublisherTestFixtureWithDummyExposer : public PrometheusPublisherTestFixture {
+ public:
+  PrometheusPublisherTestFixtureWithDummyExposer() : PrometheusPublisherTestFixture(true) {}
+};
+
+TEST_CASE_METHOD(PrometheusPublisherTestFixtureWithRealExposer, "Test prometheus empty port", "[prometheusPublisherTest]") {
+  REQUIRE_THROWS_WITH(publisher_->initialize(configuration_, response_node_loader_, nullptr), "General Operation: Port not configured for Prometheus metrics publisher!");
+}
+
+TEST_CASE_METHOD(PrometheusPublisherTestFixtureWithRealExposer, "Test prometheus invalid port", "[prometheusPublisherTest]") {
+  configuration_->set(Configure::nifi_metrics_publisher_prometheus_metrics_publisher_port, "invalid");
+  REQUIRE_THROWS_AS(publisher_->initialize(configuration_, response_node_loader_, nullptr), std::exception);
+}
+
+TEST_CASE_METHOD(PrometheusPublisherTestFixtureWithDummyExposer, "Test adding metrics to exposer", "[prometheusPublisherTest]") {
+  configuration_->set(Configure::nifi_metrics_publisher_metrics, "QueueMetrics,RepositoryMetrics,DeviceInfoNode,FlowInformation,AgentInformation,InvalidMetrics,GetFileMetrics,GetTCPMetrics");
+  publisher_->initialize(configuration_, response_node_loader_, nullptr);
+  auto stored_metrics = exposer_->getMetrics();
+  std::vector<std::string> valid_metrics_without_flow = {"QueueMetrics", "RepositoryMetrics", "DeviceInfoNode", "FlowInformation", "AgentInformation"};
+  REQUIRE(stored_metrics.size() == valid_metrics_without_flow.size());
+  for (const auto& stored_metric : stored_metrics) {
+    auto collection = stored_metric->Collect();
+    for (const auto& metric_family : collection) {
+      for (const auto& prometheus_metric : metric_family.metric) {
+        for (const auto& label : prometheus_metric.label) {
+          if (label.name == "metric_class") {
+            REQUIRE(ranges::find(valid_metrics_without_flow, label.value) != ranges::end(valid_metrics_without_flow));
+          }
+        }
+      }
+    }
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::extensions::prometheus::test
diff --git a/extensions/standard-processors/processors/GetFile.h b/extensions/standard-processors/processors/GetFile.h
index d142946bc..0ef739d54 100644
--- a/extensions/standard-processors/processors/GetFile.h
+++ b/extensions/standard-processors/processors/GetFile.h
@@ -15,14 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_GETFILE_H_
-#define EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_GETFILE_H_
+#pragma once
 
 #include <memory>
 #include <queue>
 #include <string>
 #include <vector>
 #include <atomic>
+#include <utility>
 
 #include "core/state/nodes/MetricsBase.h"
 #include "FlowFileRecord.h"
@@ -32,11 +32,7 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/Export.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 struct GetFileRequest {
   bool recursive = true;
@@ -54,20 +50,11 @@ struct GetFileRequest {
 
 class GetFileMetrics : public state::response::ResponseNode {
  public:
-  GetFileMetrics()
-      : state::response::ResponseNode("GetFileMetrics") {
-    iterations_ = 0;
-    accepted_files_ = 0;
-    input_bytes_ = 0;
+  explicit GetFileMetrics(const CoreComponent& source_component)
+    : state::response::ResponseNode("GetFileMetrics"),
+      source_component_(source_component) {
   }
 
-  GetFileMetrics(const std::string& name, const utils::Identifier& uuid)
-      : state::response::ResponseNode(name, uuid) {
-    iterations_ = 0;
-    accepted_files_ = 0;
-    input_bytes_ = 0;
-  }
-  virtual ~GetFileMetrics() = default;
   std::string getName() const override {
     return core::Connectable::getName();
   }
@@ -96,19 +83,31 @@ class GetFileMetrics : public state::response::ResponseNode {
     return resp;
   }
 
+  std::vector<state::PublishedMetric> calculateMetrics() override {
+    return {
+      {"onTrigger_invocations", static_cast<double>(iterations_.load()),
+        {{"metric_class", "GetFileMetrics"}, {"processor_name", source_component_.getName()}, {"processor_uuid", source_component_.getUUIDStr()}}},
+      {"accepted_files", static_cast<double>(accepted_files_.load()),
+        {{"metric_class", "GetFileMetrics"}, {"processor_name", source_component_.getName()}, {"processor_uuid", source_component_.getUUIDStr()}}},
+      {"input_bytes", static_cast<double>(input_bytes_.load()),
+        {{"metric_class", "GetFileMetrics"}, {"processor_name", source_component_.getName()}, {"processor_uuid", source_component_.getUUIDStr()}}}
+    };
+  }
+
  protected:
   friend class GetFile;
 
-  std::atomic<size_t> iterations_;
-  std::atomic<size_t> accepted_files_;
-  std::atomic<size_t> input_bytes_;
+  const CoreComponent& source_component_;
+  std::atomic<size_t> iterations_{0};
+  std::atomic<size_t> accepted_files_{0};
+  std::atomic<size_t> input_bytes_{0};
 };
 
 class GetFile : public core::Processor, public state::response::MetricsNodeSource {
  public:
   explicit GetFile(const std::string& name, const utils::Identifier& uuid = {})
       : Processor(name, uuid),
-        metrics_(std::make_shared<GetFileMetrics>()) {
+        metrics_(std::make_shared<GetFileMetrics>(*this)) {
   }
   ~GetFile() override = default;
 
@@ -178,10 +177,4 @@ class GetFile : public core::Processor, public state::response::MetricsNodeSourc
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<GetFile>::getLogger();
 };
 
-}  // namespace processors
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_GETFILE_H_
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h
index f42fa59de..a2fa3b819 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -99,14 +99,11 @@ class DataHandler {
 
 class GetTCPMetrics : public state::response::ResponseNode {
  public:
-  GetTCPMetrics()
-      : state::response::ResponseNode("GetTCPMetrics") {
+  explicit GetTCPMetrics(const CoreComponent& source_component)
+    : state::response::ResponseNode("GetTCPMetrics"),
+      source_component_(source_component) {
   }
 
-  GetTCPMetrics(const std::string& name, const utils::Identifier& uuid)
-      : state::response::ResponseNode(name, uuid) {
-  }
-  ~GetTCPMetrics() override = default;
   std::string getName() const override {
     return core::Connectable::getName();
   }
@@ -120,41 +117,34 @@ class GetTCPMetrics : public state::response::ResponseNode {
 
     resp.push_back(iter);
 
-    state::response::SerializedResponseNode accepted_files;
-    accepted_files.name = "AcceptedFiles";
-    accepted_files.value = (uint32_t)accepted_files_.load();
-
-    resp.push_back(accepted_files);
-
-    state::response::SerializedResponseNode input_bytes;
-    input_bytes.name = "InputBytes";
-    input_bytes.value = (uint32_t)input_bytes_.load();
-
-    resp.push_back(input_bytes);
-
     return resp;
   }
 
+  std::vector<state::PublishedMetric> calculateMetrics() override {
+    return {
+      {"onTrigger_invocations", static_cast<double>(iterations_.load()),
+        {{"metric_class", getName()}, {"processor_name", source_component_.getName()}, {"processor_uuid", source_component_.getUUIDStr()}}}
+    };
+  }
+
  protected:
   friend class GetTCP;
 
+  const CoreComponent& source_component_;
   std::atomic<size_t> iterations_{0};
-  std::atomic<size_t> accepted_files_{0};
-  std::atomic<size_t> input_bytes_{0};
 };
 
 class GetTCP : public core::Processor, public state::response::MetricsNodeSource {
  public:
   explicit GetTCP(const std::string& name, const utils::Identifier& uuid = {})
-      : Processor(name, uuid),
-        running_(false),
-        stay_connected_(true),
-        concurrent_handlers_(2),
-        endOfMessageByte(static_cast<std::byte>(13)),
-        receive_buffer_size_(16 * 1024 * 1024),
-        connection_attempt_limit_(3),
-        ssl_service_(nullptr) {
-    metrics_ = std::make_shared<GetTCPMetrics>();
+    : Processor(name, uuid),
+      running_(false),
+      stay_connected_(true),
+      concurrent_handlers_(2),
+      endOfMessageByte(static_cast<std::byte>(13)),
+      receive_buffer_size_(16 * 1024 * 1024),
+      connection_attempt_limit_(3),
+      metrics_(std::make_shared<GetTCPMetrics>(*this)) {
   }
 
   ~GetTCP() override {
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 92fa9aeb6..01edc0154 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_FLOWCONTROLLER_H_
-#define LIBMINIFI_INCLUDE_FLOWCONTROLLER_H_
+#pragma once
 
 #include <algorithm>
 #include <atomic>
@@ -57,6 +56,8 @@
 #include "TimerDrivenSchedulingAgent.h"
 #include "utils/Id.h"
 #include "utils/file/FileSystem.h"
+#include "core/state/nodes/ResponseNodeLoader.h"
+#include "core/state/MetricsPublisher.h"
 
 namespace org::apache::nifi::minifi {
 
@@ -181,7 +182,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
    * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest
    * @return the agent manifest response node
    */
-  std::shared_ptr<state::response::ResponseNode> getAgentManifest() override;
+  state::response::NodeReporter::ReportedNode getAgentManifest() override;
 
   uint64_t getUptime() override;
 
@@ -197,6 +198,8 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
    */
   std::unique_ptr<core::ProcessGroup> loadInitialFlow();
 
+  void loadMetricsPublisher();
+
  protected:
   // function to load the flow file repo.
   void loadFlowRepo();
@@ -252,8 +255,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
   // Thread pool for schedulers
   utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
   std::map<utils::Identifier, std::unique_ptr<state::ProcessorController>> processor_to_controller_;
+  std::unique_ptr<state::MetricsPublisher> metrics_publisher_;
 };
 
 }  // namespace org::apache::nifi::minifi
-
-#endif  // LIBMINIFI_INCLUDE_FLOWCONTROLLER_H_
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 5fb564786..f44ee4d23 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -86,11 +86,6 @@ class C2Agent : public state::UpdateController {
    */
   void performHeartBeat();
 
-  std::chrono::milliseconds getHeartBeatDelay() {
-    std::lock_guard<std::mutex> lock(heartbeat_mutex);
-    return heart_beat_period_;
-  }
-
   std::optional<std::string> fetchFlow(const std::string& uri) const;
 
  protected:
diff --git a/libminifi/include/c2/C2Client.h b/libminifi/include/c2/C2Client.h
index 7180b281e..26da4605b 100644
--- a/libminifi/include/c2/C2Client.h
+++ b/libminifi/include/c2/C2Client.h
@@ -24,6 +24,7 @@
 #include <optional>
 #include <string>
 #include <vector>
+#include <unordered_set>
 
 #include "c2/C2Agent.h"
 #include "core/controller/ControllerServiceProvider.h"
@@ -35,6 +36,8 @@
 #include "core/ProcessGroup.h"
 #include "core/Flow.h"
 #include "utils/file/FileSystem.h"
+#include "core/state/nodes/ResponseNodeLoader.h"
+#include "utils/Id.h"
 
 namespace org::apache::nifi::minifi::c2 {
 
@@ -48,22 +51,21 @@ class C2Client : public core::Flow, public state::response::NodeReporter {
       std::shared_ptr<core::logging::Logger> logger = core::logging::LoggerFactory<C2Client>::getLogger());
 
   void initialize(core::controller::ControllerServiceProvider *controller, state::Pausable *pause_handler, state::StateMonitor* update_sink);
-
-  std::shared_ptr<state::response::ResponseNode> getMetricsNode(const std::string& metrics_class) const override;
-
-  std::vector<std::shared_ptr<state::response::ResponseNode>> getHeartbeatNodes(bool include_manifest) const override;
+  std::optional<state::response::NodeReporter::ReportedNode> getMetricsNode(const std::string& metrics_class) const override;
+  std::vector<state::response::NodeReporter::ReportedNode> getHeartbeatNodes(bool include_manifest) const override;
 
   void stopC2();
+  void initializeResponseNodes(core::ProcessGroup* root);
+  void clearResponseNodes();
 
  protected:
   bool isC2Enabled() const;
   std::optional<std::string> fetchFlow(const std::string& uri) const;
-  void updateResponseNodeConnections();
 
  private:
-  void initializeComponentMetrics();
   void loadC2ResponseConfiguration(const std::string &prefix);
   std::shared_ptr<state::response::ResponseNode> loadC2ResponseConfiguration(const std::string &prefix, std::shared_ptr<state::response::ResponseNode> prev_node);
+  void loadNodeClasses(const std::string& class_definitions, const std::shared_ptr<state::response::ResponseNode>& new_node);
 
  protected:
   std::shared_ptr<Configure> configuration_;
@@ -74,14 +76,13 @@ class C2Client : public core::Flow, public state::response::NodeReporter {
   std::mutex initialization_mutex_;
   bool initialized_ = false;
   std::shared_ptr<core::logging::Logger> logger_;
-
   mutable std::mutex metrics_mutex_;
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_;
-  std::map<std::string, std::shared_ptr<state::response::ResponseNode>> component_metrics_;
 
  protected:
   std::atomic<bool> flow_update_{false};
   std::function<void()> request_restart_;
+  state::response::ResponseNodeLoader response_node_loader_;
 };
 
 }  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/include/core/state/ConnectionStore.h b/libminifi/include/core/state/ConnectionStore.h
new file mode 100644
index 000000000..cd7ed2082
--- /dev/null
+++ b/libminifi/include/core/state/ConnectionStore.h
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <vector>
+#include <unordered_map>
+
+#include "Connection.h"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::state {
+
+class ConnectionStore {
+ public:
+  void updateConnection(minifi::Connection* connection) {
+    if (nullptr != connection) {
+      connections_[connection->getUUID()] = connection;
+    }
+  }
+
+  std::vector<PublishedMetric> calculateConnectionMetrics(const std::string& metric_class) {
+    std::vector<PublishedMetric> metrics;
+
+    for (const auto& [_, connection] : connections_) {
+      metrics.push_back({"queue_data_size", static_cast<double>(connection->getQueueDataSize()),
+        {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
+      metrics.push_back({"queue_data_size_max", static_cast<double>(connection->getMaxQueueDataSize()),
+        {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
+      metrics.push_back({"queue_size", static_cast<double>(connection->getQueueSize()),
+        {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
+      metrics.push_back({"queue_size_max", static_cast<double>(connection->getMaxQueueSize()),
+        {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
+    }
+
+    return metrics;
+  }
+
+  virtual ~ConnectionStore() = default;
+
+ protected:
+  std::unordered_map<utils::Identifier, minifi::Connection*> connections_;
+};
+
+}  // namespace org::apache::nifi::minifi::state
diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/libminifi/include/core/state/MetricsPublisher.h
similarity index 60%
copy from libminifi/src/core/state/nodes/AgentInformation.cpp
copy to libminifi/include/core/state/MetricsPublisher.h
index 01f404729..f7cbc0e7d 100644
--- a/libminifi/src/core/state/nodes/AgentInformation.cpp
+++ b/libminifi/include/core/state/MetricsPublisher.h
@@ -15,24 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "core/state/nodes/AgentInformation.h"
-#include "core/Resource.h"
+#pragma once
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+#include <memory>
 
-utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_;
-std::mutex AgentStatus::cpu_load_tracker_mutex_;
+#include "nodes/ResponseNodeLoader.h"
+#include "properties/Configure.h"
 
-REGISTER_RESOURCE(AgentInformation, DescriptionOnly);
+namespace org::apache::nifi::minifi::state {
 
-} /* namespace response */
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+class MetricsPublisher {
+ public:
+  virtual void initialize(const std::shared_ptr<Configure>& configuration, response::ResponseNodeLoader& response_node_loader, core::ProcessGroup* root) = 0;
+  virtual void clearMetricNodes() = 0;
+  virtual void loadMetricNodes(core::ProcessGroup* root) = 0;
+  virtual ~MetricsPublisher() = default;
+};
+
+}  // namespace org::apache::nifi::minifi::state
diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/libminifi/include/core/state/PublishedMetricProvider.h
similarity index 62%
copy from libminifi/src/core/state/nodes/AgentInformation.cpp
copy to libminifi/include/core/state/PublishedMetricProvider.h
index 01f404729..c70b2c055 100644
--- a/libminifi/src/core/state/nodes/AgentInformation.cpp
+++ b/libminifi/include/core/state/PublishedMetricProvider.h
@@ -15,24 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "core/state/nodes/AgentInformation.h"
-#include "core/Resource.h"
+#pragma once
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+#include <unordered_map>
+#include <string>
+#include <vector>
 
-utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_;
-std::mutex AgentStatus::cpu_load_tracker_mutex_;
+namespace org::apache::nifi::minifi::state {
 
-REGISTER_RESOURCE(AgentInformation, DescriptionOnly);
+struct PublishedMetric {
+  std::string name;
+  double value;
+  std::unordered_map<std::string, std::string> labels;
+};
 
-} /* namespace response */
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+class PublishedMetricProvider {
+ public:
+  virtual std::vector<PublishedMetric> calculateMetrics() {
+    return {};
+  }
+  virtual ~PublishedMetricProvider() = default;
+};
+
+}  // namespace org::apache::nifi::minifi::state
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index c0f0bbfef..7c7408061 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -759,7 +759,7 @@ class AgentInformation : public AgentNode {
     setArray(false);
   }
 
-  MINIFIAPI static constexpr const char* Description = "Node part of an AST that defines all agent information, to include the manifest, and bundle information as part of a healthy hearbeat.";
+  MINIFIAPI static constexpr const char* Description = "Metric node that defines all agent information, to include the manifest, and bundle information as part of a healthy hearbeat.";
 
   std::string getName() const override {
     return "agentInfo";
diff --git a/libminifi/include/core/state/nodes/BuildInformation.h b/libminifi/include/core/state/nodes/BuildInformation.h
index aeff0a0d7..8331fc557 100644
--- a/libminifi/include/core/state/nodes/BuildInformation.h
+++ b/libminifi/include/core/state/nodes/BuildInformation.h
@@ -67,7 +67,7 @@ class BuildInformation : public DeviceInformation {
       : DeviceInformation(name) {
   }
 
-  MINIFIAPI static constexpr const char* Description = "Node part of an AST that defines the pertinent build information for this agent binary";
+  MINIFIAPI static constexpr const char* Description = "Metric node that defines the pertinent build information for this agent binary";
 
   std::string getName() const override {
     return "BuildInformation";
diff --git a/libminifi/include/core/state/nodes/ConfigurationChecksums.h b/libminifi/include/core/state/nodes/ConfigurationChecksums.h
index b18ccf5ae..901447893 100644
--- a/libminifi/include/core/state/nodes/ConfigurationChecksums.h
+++ b/libminifi/include/core/state/nodes/ConfigurationChecksums.h
@@ -31,7 +31,7 @@ class ConfigurationChecksums : public ResponseNode {
   ConfigurationChecksums() = default;
   explicit ConfigurationChecksums(const std::string& name, const utils::Identifier& uuid = {}) : ResponseNode(name, uuid) {}
 
-  MINIFIAPI static constexpr const char* Description = "Node part of an AST that defines checksums of configuration files in the C2 protocol";
+  MINIFIAPI static constexpr const char* Description = "Metric node that defines checksums of configuration files in the C2 protocol";
 
   void addChecksumCalculator(utils::ChecksumCalculator& checksum_calculator);
 
diff --git a/libminifi/include/core/state/nodes/DeviceInformation.h b/libminifi/include/core/state/nodes/DeviceInformation.h
index 4835f015b..0aa7b1cb1 100644
--- a/libminifi/include/core/state/nodes/DeviceInformation.h
+++ b/libminifi/include/core/state/nodes/DeviceInformation.h
@@ -303,7 +303,7 @@ class DeviceInfoNode : public DeviceInformation {
     device_id_ = device.device_id_;
   }
 
-  MINIFIAPI static constexpr const char* Description = "Node part of an AST that defines device characteristics to the C2 protocol";
+  MINIFIAPI static constexpr const char* Description = "Metric node that defines device characteristics to the C2 protocol";
 
   std::string getName() const override {
     return "deviceInfo";
@@ -319,6 +319,22 @@ class DeviceInfoNode : public DeviceInformation {
     return serialized;
   }
 
+  std::vector<PublishedMetric> calculateMetrics() override {
+    double system_cpu_usage = -1.0;
+    {
+      std::lock_guard<std::mutex> guard(cpu_load_tracker_mutex_);
+      system_cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection();
+    }
+    SerializedResponseNode cpu_usage;
+    cpu_usage.name = "cpuUtilization";
+    cpu_usage.value = system_cpu_usage;
+    return {
+      {"physical_mem", static_cast<double>(utils::OsUtils::getSystemTotalPhysicalMemory()), {{"metric_class", "DeviceInfoNode"}}},
+      {"memory_usage", static_cast<double>(utils::OsUtils::getSystemPhysicalMemoryUsage()), {{"metric_class", "DeviceInfoNode"}}},
+      {"cpu_utilization", system_cpu_usage, {{"metric_class", "DeviceInfoNode"}}},
+    };
+  }
+
  protected:
   SerializedResponseNode serializeIdentifier() const {
     SerializedResponseNode identifier;
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h
index 06b12a538..318c3c01d 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -39,6 +39,7 @@
 #include "../nodes/StateMonitor.h"
 #include "Connection.h"
 #include "io/ClientSocket.h"
+#include "../ConnectionStore.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
@@ -124,7 +125,7 @@ class FlowVersion : public DeviceInformation {
   std::shared_ptr<FlowIdentifier> identifier;
 };
 
-class FlowMonitor : public StateMonitorNode {
+class FlowMonitor : public StateMonitorNode, public ConnectionStore {
  public:
   FlowMonitor(const std::string &name, const utils::Identifier &uuid)
       : StateMonitorNode(name, uuid) {
@@ -134,23 +135,12 @@ class FlowMonitor : public StateMonitorNode {
       : StateMonitorNode(name) {
   }
 
-  void updateConnection(minifi::Connection* connection) {
-    if (nullptr != connection) {
-      connections_[connection->getUUIDStr()] = connection;
-    }
-  }
-
-  void clearConnections() {
-    connections_.clear();
-  }
-
   void setFlowVersion(std::shared_ptr<state::response::FlowVersion> flow_version) {
     flow_version_ = std::move(flow_version);
   }
 
  protected:
   std::shared_ptr<state::response::FlowVersion> flow_version_;
-  std::map<std::string, minifi::Connection*> connections_;
 };
 
 /**
@@ -166,7 +156,7 @@ class FlowInformation : public FlowMonitor {
       : FlowMonitor(name) {
   }
 
-  MINIFIAPI static constexpr const char* Description = "Node part of an AST that defines the flow ID and flow URL deployed to this agent";
+  MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent";
 
   std::string getName() const override {
     return "flowInfo";
@@ -257,7 +247,17 @@ class FlowInformation : public FlowMonitor {
     return serialized;
   }
 
- protected:
+  std::vector<PublishedMetric> calculateMetrics() override {
+    std::vector<PublishedMetric> metrics = calculateConnectionMetrics("FlowInformation");
+
+    if (nullptr != monitor_) {
+      monitor_->executeOnAllComponents([&metrics](StateController& component){
+        metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0),
+          {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", "FlowInformation"}}});
+      });
+    }
+    return metrics;
+  }
 };
 
 }  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h b/libminifi/include/core/state/nodes/MetricsBase.h
index d16c2bcc4..6fe7dcb54 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -15,29 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_METRICSBASE_H_
-#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_METRICSBASE_H_
+#pragma once
 
 #include <utility>
 #include <vector>
 #include <memory>
 #include <string>
+#include <optional>
 
 #include "../Value.h"
+#include "../PublishedMetricProvider.h"
 #include "core/Core.h"
 #include "core/Connectable.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::state::response {
 
 /**
  * Purpose: Defines a metric. Serialization is intended to be thread safe.
  */
-class ResponseNode : public core::Connectable {
+class ResponseNode : public core::Connectable, public PublishedMetricProvider {
  public:
   ResponseNode()
       : core::Connectable("metric"),
@@ -108,11 +104,11 @@ class ObjectNode : public ResponseNode {
     nodes_.push_back(node);
   }
 
-  virtual std::string getName() const {
+  std::string getName() const override {
     return Connectable::getName();
   }
 
-  virtual std::vector<SerializedResponseNode> serialize() {
+  std::vector<SerializedResponseNode> serialize() override {
     std::vector<SerializedResponseNode> serialized;
 //    SerializedResponseNode outer_node;
     //  outer_node.name = getName();
@@ -120,7 +116,9 @@ class ObjectNode : public ResponseNode {
       SerializedResponseNode inner_node;
       inner_node.name = node->getName();
       for (auto &embed : node->serialize()) {
-        inner_node.children.push_back(std::move(embed));
+        if (!embed.empty() || embed.keep_empty) {
+          inner_node.children.push_back(std::move(embed));
+        }
       }
       serialized.push_back(std::move(inner_node));
     }
@@ -128,7 +126,7 @@ class ObjectNode : public ResponseNode {
     return serialized;
   }
 
-  virtual bool isEmpty() {
+  bool isEmpty() override {
     return nodes_.empty();
   }
 
@@ -186,6 +184,12 @@ class MetricsNodeSource : public ResponseNodeSource {
 
 class NodeReporter {
  public:
+  struct ReportedNode {
+    std::string name;
+    bool is_array;
+    std::vector<SerializedResponseNode> serialized_nodes;
+  };
+
   NodeReporter() = default;
 
   virtual ~NodeReporter() = default;
@@ -194,20 +198,20 @@ class NodeReporter {
    * Retrieves metrics node
    * @return metrics response node
    */
-  virtual std::shared_ptr<ResponseNode> getMetricsNode(const std::string& metricsClass) const = 0;
+  virtual std::optional<ReportedNode> getMetricsNode(const std::string& metricsClass) const = 0;
 
   /**
    * Retrieves root nodes configured to be included in heartbeat
    * @param includeManifest -- determines if manifest is to be included
    * @return a list of response nodes
    */
-  virtual std::vector<std::shared_ptr<ResponseNode>> getHeartbeatNodes(bool includeManifest) const = 0;
+  virtual std::vector<ReportedNode> getHeartbeatNodes(bool includeManifest) const = 0;
 
   /**
    * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest
    * @return the agent manifest response node
    */
-  virtual std::shared_ptr<state::response::ResponseNode> getAgentManifest() = 0;
+  virtual ReportedNode getAgentManifest() = 0;
 };
 
 /**
@@ -238,11 +242,4 @@ class ResponseNodeSink {
   virtual int16_t setMetricsNodes(const std::shared_ptr<ResponseNode> &metrics) = 0;
 };
 
-}  // namespace response
-}  // namespace state
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CORE_STATE_NODES_METRICSBASE_H_
+}  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/include/core/state/nodes/QueueMetrics.h b/libminifi/include/core/state/nodes/QueueMetrics.h
index 3cf516dd6..f89d34da6 100644
--- a/libminifi/include/core/state/nodes/QueueMetrics.h
+++ b/libminifi/include/core/state/nodes/QueueMetrics.h
@@ -15,31 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_QUEUEMETRICS_H_
-#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_QUEUEMETRICS_H_
+#pragma once
 
 #include <memory>
 #include <string>
 #include <utility>
 #include <vector>
-#include <sstream>
-#include <map>
 
 #include "../nodes/MetricsBase.h"
 #include "Connection.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+#include "../ConnectionStore.h"
+
+namespace org::apache::nifi::minifi::state::response {
 
 /**
  * Justification and Purpose: Provides Connection queue metrics. Provides critical information to the
  * C2 server.
  *
  */
-class QueueMetrics : public ResponseNode {
+class QueueMetrics : public ResponseNode, public ConnectionStore {
  public:
   QueueMetrics(const std::string &name, const utils::Identifier &uuid)
       : ResponseNode(name, uuid) {
@@ -53,19 +47,15 @@ class QueueMetrics : public ResponseNode {
       : ResponseNode("QueueMetrics") {
   }
 
-  virtual std::string getName() const {
-    return "QueueMetrics";
-  }
+  MINIFIAPI static constexpr const char* Description = "Metric node that defines queue metric information";
 
-  void addConnection(std::unique_ptr<minifi::Connection> connection) {
-    if (nullptr != connection) {
-      connections.insert(std::make_pair(connection->getName(), std::move(connection)));
-    }
+  std::string getName() const override {
+    return "QueueMetrics";
   }
 
-  std::vector<SerializedResponseNode> serialize() {
+  std::vector<SerializedResponseNode> serialize() override {
     std::vector<SerializedResponseNode> serialized;
-    for (const auto& [_, connection] : connections) {
+    for (const auto& [_, connection] : connections_) {
       SerializedResponseNode parent;
       parent.name = connection->getName();
       SerializedResponseNode datasize;
@@ -94,15 +84,9 @@ class QueueMetrics : public ResponseNode {
     return serialized;
   }
 
- protected:
-  std::map<std::string, std::unique_ptr<minifi::Connection>> connections;
+  std::vector<PublishedMetric> calculateMetrics() override {
+    return calculateConnectionMetrics("QueueMetrics");
+  }
 };
 
-}  // namespace response
-}  // namespace state
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CORE_STATE_NODES_QUEUEMETRICS_H_
+}  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/include/core/state/nodes/RepositoryMetrics.h b/libminifi/include/core/state/nodes/RepositoryMetrics.h
index 2c96cb1d2..009f577b3 100644
--- a/libminifi/include/core/state/nodes/RepositoryMetrics.h
+++ b/libminifi/include/core/state/nodes/RepositoryMetrics.h
@@ -15,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_REPOSITORYMETRICS_H_
-#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_REPOSITORYMETRICS_H_
+#pragma once
 
 #include <memory>
 #include <string>
@@ -27,12 +26,8 @@
 
 #include "../nodes/MetricsBase.h"
 #include "Connection.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+
+namespace org::apache::nifi::minifi::state::response {
 
 /**
  * Justification and Purpose: Provides repository metrics. Provides critical information to the
@@ -53,19 +48,21 @@ class RepositoryMetrics : public ResponseNode {
       : ResponseNode("RepositoryMetrics") {
   }
 
-  virtual std::string getName() const {
+  MINIFIAPI static constexpr const char* Description = "Metric node that defines repository metric information";
+
+  std::string getName() const override {
     return "RepositoryMetrics";
   }
 
   void addRepository(const std::shared_ptr<core::Repository> &repo) {
     if (nullptr != repo) {
-      repositories.insert(std::make_pair(repo->getName(), repo));
+      repositories_.insert(std::make_pair(repo->getName(), repo));
     }
   }
 
-  std::vector<SerializedResponseNode> serialize() {
+  std::vector<SerializedResponseNode> serialize() override {
     std::vector<SerializedResponseNode> serialized;
-    for (auto conn : repositories) {
+    for (auto conn : repositories_) {
       auto repo = conn.second;
       SerializedResponseNode parent;
       parent.name = repo->getName();
@@ -90,15 +87,18 @@ class RepositoryMetrics : public ResponseNode {
     return serialized;
   }
 
+  std::vector<PublishedMetric> calculateMetrics() override {
+    std::vector<PublishedMetric> metrics;
+    for (const auto& [_, repo] : repositories_) {
+      metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getName()}}});
+      metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getName()}}});
+      metrics.push_back({"repository_size", static_cast<double>(repo->getRepoSize()), {{"metric_class", getName()}, {"repository_name", repo->getName()}}});
+    }
+    return metrics;
+  }
+
  protected:
-  std::map<std::string, std::shared_ptr<core::Repository>> repositories;
+  std::map<std::string, std::shared_ptr<core::Repository>> repositories_;
 };
 
-}  // namespace response
-}  // namespace state
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CORE_STATE_NODES_REPOSITORYMETRICS_H_
+}  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
new file mode 100644
index 000000000..ffd7a57f9
--- /dev/null
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <unordered_map>
+#include <map>
+#include <string>
+#include <memory>
+#include <mutex>
+#include <functional>
+
+#include "MetricsBase.h"
+#include "core/ProcessGroup.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/FlowConfiguration.h"
+#include "utils/gsl.h"
+#include "utils/Id.h"
+
+namespace org::apache::nifi::minifi::state::response {
+
+class ResponseNodeLoader {
+ public:
+  ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo,
+    std::shared_ptr<core::Repository> flow_file_repo, core::FlowConfiguration* flow_configuration);
+  std::shared_ptr<ResponseNode> loadResponseNode(const std::string& clazz, core::ProcessGroup* root);
+  std::shared_ptr<state::response::ResponseNode> getComponentMetricsNode(const std::string& metrics_class) const;
+  void setControllerServiceProvider(core::controller::ControllerServiceProvider* controller);
+  void setStateMonitor(state::StateMonitor* update_sink);
+  void initializeComponentMetrics(core::ProcessGroup* root);
+
+ private:
+  std::shared_ptr<ResponseNode> getResponseNode(const std::string& clazz) const;
+  void initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node);
+  static void initializeQueueMetrics(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root);
+  void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node);
+  void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node);
+  void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node);
+  void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node);
+  void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root);
+
+  mutable std::mutex component_metrics_mutex_;
+  std::unordered_map<std::string, std::shared_ptr<ResponseNode>> component_metrics_;
+  std::shared_ptr<Configure> configuration_;
+  std::shared_ptr<core::Repository> provenance_repo_;
+  std::shared_ptr<core::Repository> flow_file_repo_;
+  core::FlowConfiguration* flow_configuration_ = nullptr;
+  core::controller::ControllerServiceProvider* controller_ = nullptr;
+  state::StateMonitor* update_sink_ = nullptr;
+  std::mutex callback_mutex_;
+  std::map<utils::Identifier, std::function<void(core::ProcessGroup*)>> flow_change_callbacks_;
+  std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResponseNodeLoader>::getLogger()};
+};
+
+}  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/include/core/state/nodes/SchedulingNodes.h b/libminifi/include/core/state/nodes/SchedulingNodes.h
index ba3b561de..4d8acdac5 100644
--- a/libminifi/include/core/state/nodes/SchedulingNodes.h
+++ b/libminifi/include/core/state/nodes/SchedulingNodes.h
@@ -37,11 +37,11 @@ class SchedulingDefaults : public DeviceInformation {
       : DeviceInformation(name) {
   }
 
-  std::string getName() const {
+  std::string getName() const override {
     return "schedulingDefaults";
   }
 
-  std::vector<SerializedResponseNode> serialize() {
+  std::vector<SerializedResponseNode> serialize() override {
     std::vector<SerializedResponseNode> serialized;
 
     SerializedResponseNode schedulingDefaults;
diff --git a/libminifi/include/core/state/nodes/SupportedOperations.h b/libminifi/include/core/state/nodes/SupportedOperations.h
index 3ff8baa37..8aa147126 100644
--- a/libminifi/include/core/state/nodes/SupportedOperations.h
+++ b/libminifi/include/core/state/nodes/SupportedOperations.h
@@ -33,7 +33,7 @@ class SupportedOperations : public DeviceInformation {
   SupportedOperations(const std::string &name, const utils::Identifier &uuid);
   explicit SupportedOperations(const std::string &name);
 
-  MINIFIAPI static constexpr const char* Description = "Node part of an AST that defines the supported C2 operations in the Agent Manifest.";
+  MINIFIAPI static constexpr const char* Description = "Metric node that defines the supported C2 operations in the Agent Manifest.";
 
   std::string getName() const override;
   std::vector<SerializedResponseNode> serialize() override;
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index 8f6636c7c..c4c6abd9b 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -156,12 +156,17 @@ class Configuration : public Properties {
 
   static constexpr const char *nifi_asset_directory = "nifi.asset.directory";
 
+  // Metrics publisher options
+  static constexpr const char *nifi_metrics_publisher_class = "nifi.metrics.publisher.class";
+  static constexpr const char *nifi_metrics_publisher_prometheus_metrics_publisher_port = "nifi.metrics.publisher.PrometheusMetricsPublisher.port";
+  static constexpr const char *nifi_metrics_publisher_metrics = "nifi.metrics.publisher.metrics";
+
   MINIFIAPI static const std::vector<core::ConfigurationProperty> CONFIGURATION_PROPERTIES;
   MINIFIAPI static const std::array<const char*, 2> DEFAULT_SENSITIVE_PROPERTIES;
 
   static std::vector<std::string> mergeProperties(std::vector<std::string> properties,
                                                   const std::vector<std::string>& additional_properties);
-  static std::vector<std::string> getSensitiveProperties(std::function<std::optional<std::string>(const std::string&)> reader);
+  static std::vector<std::string> getSensitiveProperties(const std::function<std::optional<std::string>(const std::string&)>& reader);
   static bool validatePropertyValue(const std::string& property_name, const std::string& property_value);
 };
 
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index df5edb8a8..9cff1d76a 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -82,6 +82,8 @@ class Identifier {
   static std::optional<Identifier> parse(const std::string& str);
 
  private:
+  friend struct ::std::hash<org::apache::nifi::minifi::utils::Identifier>;
+
   static bool parseByte(Data& data, const uint8_t* input, int& charIdx, int& byteIdx);
 
   Data data_{};
@@ -132,3 +134,27 @@ class NonRepeatingStringGenerator {
 };
 
 }  // namespace org::apache::nifi::minifi::utils
+
+namespace std {
+template<>
+struct hash<org::apache::nifi::minifi::utils::Identifier> {
+  size_t operator()(const org::apache::nifi::minifi::utils::Identifier& id) const noexcept {
+    static_assert(sizeof(org::apache::nifi::minifi::utils::Identifier) % sizeof(size_t) == 0);
+    constexpr int slices = sizeof(org::apache::nifi::minifi::utils::Identifier) / sizeof(size_t);
+    const auto combine = [](size_t& seed, size_t new_hash) {
+      // from the boost hash_combine docs
+      seed ^= new_hash + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+    };
+    const auto get_slice = [](const org::apache::nifi::minifi::utils::Identifier& id, size_t idx) -> size_t {
+      size_t result{};
+      memcpy(&result, reinterpret_cast<const unsigned char*>(&id.data_) + idx * sizeof(size_t), sizeof(size_t));
+      return result;
+    };
+    size_t hash = get_slice(id, 0);
+    for (size_t i = 1; i < slices; ++i) {
+      combine(hash, get_slice(id, i));
+    }
+    return hash;
+  }
+};
+}  // namespace std
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index 8d89525ca..b471bf202 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -124,7 +124,10 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
   core::ConfigurationProperty{Configuration::nifi_log_logger_root},
   core::ConfigurationProperty{Configuration::nifi_log_compression_cached_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_log_compression_compressed_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())},
-  core::ConfigurationProperty{Configuration::nifi_asset_directory}
+  core::ConfigurationProperty{Configuration::nifi_asset_directory},
+  core::ConfigurationProperty{Configuration::nifi_metrics_publisher_class},
+  core::ConfigurationProperty{Configuration::nifi_metrics_publisher_prometheus_metrics_publisher_port, gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())},
+  core::ConfigurationProperty{Configuration::nifi_metrics_publisher_metrics}
 };
 
 const std::array<const char*, 2> Configuration::DEFAULT_SENSITIVE_PROPERTIES = {Configuration::nifi_security_client_pass_phrase,
@@ -145,7 +148,7 @@ std::vector<std::string> Configuration::mergeProperties(std::vector<std::string>
   return properties;
 }
 
-std::vector<std::string> Configuration::getSensitiveProperties(std::function<std::optional<std::string>(const std::string&)> reader) {
+std::vector<std::string> Configuration::getSensitiveProperties(const std::function<std::optional<std::string>(const std::string&)>& reader) {
   std::vector<std::string> sensitive_properties(Configuration::DEFAULT_SENSITIVE_PROPERTIES.begin(), Configuration::DEFAULT_SENSITIVE_PROPERTIES.end());
   if (reader) {
     const auto additional_sensitive_props_list = reader(Configuration::nifi_sensitive_props_additional_keys);
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 25a1a4eb5..a7f9a5ce0 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -28,8 +28,6 @@
 
 #include "FlowController.h"
 #include "core/state/nodes/AgentInformation.h"
-#include "core/state/nodes/FlowInformation.h"
-#include "core/state/nodes/RepositoryMetrics.h"
 #include "core/state/ProcessorController.h"
 #include "c2/C2Agent.h"
 #include "core/ProcessGroup.h"
@@ -44,6 +42,7 @@
 #include "utils/HTTPClient.h"
 #include "io/NetworkPrioritizer.h"
 #include "io/FileStream.h"
+#include "core/ClassLoader.h"
 
 namespace org::apache::nifi::minifi {
 
@@ -70,6 +69,8 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
   initialized_ = false;
 
   protocol_ = std::make_unique<FlowControlProtocol>(this, configuration_);
+  response_node_loader_.setControllerServiceProvider(this);
+  response_node_loader_.setStateMonitor(this);
 }
 
 FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
@@ -118,39 +119,43 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st
   logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", newRoot->getName(), newRoot->getVersion());
 
   updating_ = true;
-
-  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
-  stop();
-  unload();
-  controller_map_->clear();
-  auto prevRoot = std::move(this->root_);
-  this->root_ = std::move(newRoot);
-  processor_to_controller_.clear();
-  updateResponseNodeConnections();
-  initialized_ = false;
   bool started = false;
-  try {
-    load(std::move(root_), true);
-    flow_update_ = true;
-    started = start() == 0;
-
-    updating_ = false;
-
-    if (started) {
-      auto flowVersion = flow_configuration_->getFlowVersion();
-      if (flowVersion) {
-        logger_->log_debug("Setting flow id to %s", flowVersion->getFlowId());
-        configuration_->set(Configure::nifi_c2_flow_id, flowVersion->getFlowId());
-        configuration_->set(Configure::nifi_c2_flow_url, flowVersion->getFlowIdentifier()->getRegistryUrl());
-      } else {
-        logger_->log_debug("Invalid flow version, not setting");
-      }
+
+  {
+    std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+    stop();
+    unload();
+    controller_map_->clear();
+    clearResponseNodes();
+    if (metrics_publisher_) {
+      metrics_publisher_->clearMetricNodes();
+    }
+    auto prevRoot = std::move(this->root_);
+    this->root_ = std::move(newRoot);
+    processor_to_controller_.clear();
+    initialized_ = false;
+    try {
+      load(std::move(root_), true);
+      flow_update_ = true;
+      started = start() == 0;
+    } catch (...) {
+      this->root_ = std::move(prevRoot);
+      load(std::move(this->root_), true);
+      flow_update_ = true;
+    }
+  }
+
+  updating_ = false;
+
+  if (started) {
+    auto flowVersion = flow_configuration_->getFlowVersion();
+    if (flowVersion) {
+      logger_->log_debug("Setting flow id to %s", flowVersion->getFlowId());
+      configuration_->set(Configure::nifi_c2_flow_id, flowVersion->getFlowId());
+      configuration_->set(Configure::nifi_c2_flow_url, flowVersion->getFlowIdentifier()->getRegistryUrl());
+    } else {
+      logger_->log_debug("Invalid flow version, not setting");
     }
-  } catch (...) {
-    this->root_ = std::move(prevRoot);
-    load(std::move(this->root_), true);
-    flow_update_ = true;
-    updating_ = false;
   }
 
   return started;
@@ -278,7 +283,6 @@ void FlowController::load(std::unique_ptr<core::ProcessGroup> root, bool reload)
       logger_->log_info("Load Flow Controller from provided root");
       this->root_ = std::move(root);
       processor_to_controller_.clear();
-      updateResponseNodeConnections();
     } else {
       logger_->log_info("Instantiating new flow");
       this->root_ = loadInitialFlow();
@@ -291,6 +295,13 @@ void FlowController::load(std::unique_ptr<core::ProcessGroup> root, bool reload)
     logger_->log_info("Loaded root processor Group");
     logger_->log_info("Initializing timers");
     controller_service_provider_impl_ = flow_configuration_->getControllerServiceProvider();
+    response_node_loader_.initializeComponentMetrics(root_.get());
+    initializeResponseNodes(root_.get());
+    if (metrics_publisher_) {
+      metrics_publisher_->loadMetricNodes(root_.get());
+    } else {
+      loadMetricsPublisher();
+    }
 
     if (!thread_pool_.isRunning() || reload) {
       thread_pool_.shutdown();
@@ -425,19 +436,26 @@ int16_t FlowController::clearConnection(const std::string &connection) {
   return -1;
 }
 
-std::shared_ptr<state::response::ResponseNode> FlowController::getAgentManifest() {
-  auto agentInfo = std::make_shared<state::response::AgentInformation>("agentInfo");
-  agentInfo->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(getControllerService(c2::C2Agent::UPDATE_NAME)).get());
-  agentInfo->setAgentIdentificationProvider(configuration_);
-  agentInfo->setConfigurationReader([this](const std::string& key){
+state::response::NodeReporter::ReportedNode FlowController::getAgentManifest() {
+  state::response::AgentInformation agentInfo("agentInfo");
+  agentInfo.setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(getControllerService(c2::C2Agent::UPDATE_NAME)).get());
+  agentInfo.setAgentIdentificationProvider(configuration_);
+  agentInfo.setConfigurationReader([this](const std::string& key){
     return configuration_->getRawValue(key);
   });
-  agentInfo->setStateMonitor(this);
-  agentInfo->includeAgentStatus(false);
-  return agentInfo;
+  agentInfo.setStateMonitor(this);
+  agentInfo.includeAgentStatus(false);
+  state::response::NodeReporter::ReportedNode reported_node;
+  reported_node.name = agentInfo.getName();
+  reported_node.is_array = agentInfo.isArray();
+  reported_node.serialized_nodes = agentInfo.serialize();
+  return reported_node;
 }
 
 void FlowController::executeOnAllComponents(std::function<void(state::StateController&)> func) {
+  if (updating_) {
+    return;
+  }
   std::lock_guard<std::recursive_mutex> lock(mutex_);
   for (auto* component: getAllComponents()) {
     func(*component);
@@ -445,6 +463,9 @@ void FlowController::executeOnAllComponents(std::function<void(state::StateContr
 }
 
 void FlowController::executeOnComponent(const std::string &name, std::function<void(state::StateController&)> func) {
+  if (updating_) {
+    return;
+  }
   std::lock_guard<std::recursive_mutex> lock(mutex_);
   if (auto* component = getComponent(name); component != nullptr) {
     func(*component);
@@ -552,4 +573,20 @@ state::StateController* FlowController::getProcessorController(const std::string
   return foundController.get();
 }
 
+void FlowController::loadMetricsPublisher() {
+  if (auto metrics_publisher_class = configuration_->get(minifi::Configure::nifi_metrics_publisher_class)) {
+    auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(*metrics_publisher_class, *metrics_publisher_class);
+    if (!ptr) {
+      logger_->log_error("Configured metrics publisher class \"%s\" could not be instantiated.", *metrics_publisher_class);
+      return;
+    }
+    metrics_publisher_ = utils::dynamic_unique_cast<state::MetricsPublisher>(std::move(ptr));
+    if (!metrics_publisher_) {
+      logger_->log_error("Configured metrics publisher class \"%s\" is not a metrics publisher.", *metrics_publisher_class);
+      return;
+    }
+    metrics_publisher_->initialize(configuration_, response_node_loader_, root_.get());
+  }
+}
+
 }  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 82ceded8d..a0065daff 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -220,7 +220,7 @@ void C2Agent::performHeartBeat() {
   C2Payload payload(Operation::HEARTBEAT);
   logger_->log_trace("Performing heartbeat");
   auto reporter = dynamic_cast<state::response::NodeReporter*>(update_sink_);
-  std::vector<std::shared_ptr<state::response::ResponseNode>> metrics;
+  std::vector<state::response::NodeReporter::ReportedNode> metrics;
   if (reporter) {
     if (!manifest_sent_) {
       // include agent manifest for the first heartbeat
@@ -233,9 +233,9 @@ void C2Agent::performHeartBeat() {
     payload.reservePayloads(metrics.size());
     for (const auto& metric : metrics) {
       C2Payload child_metric_payload(Operation::HEARTBEAT);
-      child_metric_payload.setLabel(metric->getName());
-      child_metric_payload.setContainer(metric->isArray());
-      serializeMetrics(child_metric_payload, metric->getName(), metric->serialize(), metric->isArray());
+      child_metric_payload.setLabel(metric.name);
+      child_metric_payload.setContainer(metric.is_array);
+      serializeMetrics(child_metric_payload, metric.name, metric.serialized_nodes, metric.is_array);
       payload.addPayload(std::move(child_metric_payload));
     }
   }
@@ -504,7 +504,7 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) {
         C2Payload metrics(Operation::ACKNOWLEDGE);
         metricsClass.empty() ? metrics.setLabel("metrics") : metrics.setLabel(metricsClass);
         if (metricsNode) {
-          serializeMetrics(metrics, metricsNode->getName(), metricsNode->serialize(), metricsNode->isArray());
+          serializeMetrics(metrics, metricsNode->name, metricsNode->serialized_nodes, metricsNode->is_array);
         }
         response.addPayload(std::move(metrics));
       }
@@ -523,7 +523,7 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) {
         agentInfo.setLabel("agentInfo");
 
         const auto manifest = reporter->getAgentManifest();
-        serializeMetrics(agentInfo, manifest->getName(), manifest->serialize());
+        serializeMetrics(agentInfo, manifest.name, manifest.serialized_nodes);
         response.addPayload(std::move(agentInfo));
       }
       enqueue_c2_response(std::move(response));
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index 5eff01af9..14829aa75 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -44,7 +44,8 @@ C2Client::C2Client(
       configuration_(std::move(configuration)),
       filesystem_(std::move(filesystem)),
       logger_(std::move(logger)),
-      request_restart_(std::move(request_restart)) {}
+      request_restart_(std::move(request_restart)),
+      response_node_loader_(configuration_, provenance_repo_, flow_file_repo_, flow_configuration_.get()) {}
 
 void C2Client::stopC2() {
   if (c2_agent_) {
@@ -76,75 +77,13 @@ void C2Client::initialize(core::controller::ControllerServiceProvider *controlle
     configuration_->set(Configuration::nifi_c2_agent_identifier_fallback, agent_id, PropertyChangeLifetime::PERSISTENT);
   }
 
-  {
-    std::lock_guard<std::mutex> lock(initialization_mutex_);
-    if (initialized_ && !flow_update_) {
-      return;
-    }
-  }
-
-  // root_response_nodes_ was not cleared before, it is unclear if that was intentional
-
-  std::map<std::string, Connection*> connections;
-  if (root_ != nullptr) {
-    root_->getConnections(connections);
-  }
-
-  std::string class_csv;
-  if (configuration_->get(minifi::Configuration::nifi_c2_root_classes, class_csv)) {
-    std::vector<std::string> classes = utils::StringUtils::split(class_csv, ",");
-
-    for (const std::string& clazz : classes) {
-      auto instance = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-      auto response_node = utils::dynamic_unique_cast<state::response::ResponseNode>(std::move(instance));
-      if (nullptr == response_node) {
-        logger_->log_error("No metric defined for %s", clazz);
-        continue;
-      }
-      auto identifier = dynamic_cast<state::response::AgentIdentifier*>(response_node.get());
-      if (identifier != nullptr) {
-        identifier->setAgentIdentificationProvider(configuration_);
-      }
-      auto monitor = dynamic_cast<state::response::AgentMonitor*>(response_node.get());
-      if (monitor != nullptr) {
-        monitor->addRepository(provenance_repo_);
-        monitor->addRepository(flow_file_repo_);
-        monitor->setStateMonitor(update_sink);
-      }
-      auto agent_node = dynamic_cast<state::response::AgentNode*>(response_node.get());
-      if (agent_node != nullptr && controller != nullptr) {
-        agent_node->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(controller->getControllerService(C2Agent::UPDATE_NAME)).get());
-      }
-      if (agent_node != nullptr) {
-        agent_node->setConfigurationReader([this](const std::string& key){
-          return configuration_->getRawValue(key);
-        });
-      }
-      auto configuration_checksums = dynamic_cast<state::response::ConfigurationChecksums*>(response_node.get());
-      if (configuration_checksums) {
-        configuration_checksums->addChecksumCalculator(configuration_->getChecksumCalculator());
-        configuration_checksums->addChecksumCalculator(flow_configuration_->getChecksumCalculator());
-      }
-      auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(response_node.get());
-      if (flowMonitor != nullptr) {
-        for (auto &con : connections) {
-          flowMonitor->updateConnection(con.second);
-        }
-        flowMonitor->setStateMonitor(update_sink);
-        flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion());
-      }
-      const auto responseNodeName = response_node->getName();
-      std::lock_guard<std::mutex> guard(metrics_mutex_);
-      root_response_nodes_[responseNodeName] = std::move(response_node);
-    }
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
+  if (initialized_ && !flow_update_) {
+    return;
   }
 
-  initializeComponentMetrics();
-
-  loadC2ResponseConfiguration(Configuration::nifi_c2_root_class_definitions);
-
-  std::lock_guard<std::mutex> lock(initialization_mutex_);
   if (!initialized_) {
+    initializeResponseNodes(root_.get());
     // C2Agent is initialized once, meaning that a C2-triggered flow/configuration update
     // might not be equal to a fresh restart
     c2_agent_ = std::make_unique<c2::C2Agent>(controller, pause_handler, update_sink, configuration_, filesystem_, request_restart_);
@@ -160,29 +99,14 @@ std::optional<std::string> C2Client::fetchFlow(const std::string& uri) const {
   return c2_agent_->fetchFlow(uri);
 }
 
-void C2Client::initializeComponentMetrics() {
-  {
-    std::lock_guard<std::mutex> guard(metrics_mutex_);
-    component_metrics_.clear();
-  }
-
-  if (root_ == nullptr) {
-    return;
-  }
-  std::vector<core::Processor*> processors;
-  root_->getAllProcessors(processors);
-  for (const auto processor : processors) {
-    auto rep = dynamic_cast<state::response::ResponseNodeSource*>(processor);
-    if (rep == nullptr) {
+void C2Client::loadNodeClasses(const std::string& class_definitions, const std::shared_ptr<state::response::ResponseNode>& new_node) {
+  auto classes = utils::StringUtils::split(class_definitions, ",");
+  for (const std::string& clazz : classes) {
+    auto response_node = response_node_loader_.loadResponseNode(clazz, root_.get());
+    if (!response_node) {
       continue;
     }
-    // we have a metrics source.
-    std::vector<std::shared_ptr<state::response::ResponseNode>> metric_vector;
-    rep->getResponseNodes(metric_vector);
-    std::lock_guard<std::mutex> guard(metrics_mutex_);
-    for (auto& metric : metric_vector) {
-      component_metrics_[metric->getName()] = metric;
-    }
+    std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(response_node);
   }
 }
 
@@ -206,35 +130,13 @@ void C2Client::loadC2ResponseConfiguration(const std::string &prefix) {
       }
       std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name);
       if (configuration_->get(classOption, class_definitions)) {
-        std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ",");
-        for (const std::string& clazz : classes) {
-          // instantiate the object
-          std::shared_ptr<core::CoreComponent> ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-          if (nullptr == ptr) {
-            const bool found_metric = [&] {
-              std::lock_guard<std::mutex> guard{metrics_mutex_};
-              auto metric = component_metrics_.find(clazz);
-              if (metric != component_metrics_.end()) {
-                ptr = metric->second;
-                return true;
-              }
-              return false;
-            }();
-            if (!found_metric) {
-              logger_->log_error("No metric defined for %s", clazz);
-              continue;
-            }
-          }
-          auto node = std::dynamic_pointer_cast<state::response::ResponseNode>(ptr);
-          std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node);
-        }
-
+        loadNodeClasses(class_definitions, new_node);
       } else {
         std::string optionName = option + "." + name;
-        auto node = loadC2ResponseConfiguration(optionName, new_node);
+        loadC2ResponseConfiguration(optionName, new_node);
       }
 
-      std::lock_guard<std::mutex> guard{metrics_mutex_};
+      // We don't need to lock here, we already do it in the initializeResponseNodes member function
       root_response_nodes_[name] = new_node;
     } catch (...) {
       logger_->log_error("Could not create metrics class %s", metricsClass);
@@ -264,37 +166,16 @@ std::shared_ptr<state::response::ResponseNode> C2Client::loadC2ResponseConfigura
         std::vector<std::string> sub_classes = utils::StringUtils::split(name, ",");
         for (const std::string& subClassStr : classes) {
           auto node = loadC2ResponseConfiguration(subClassStr, prev_node);
-          if (node != nullptr)
+          if (node != nullptr) {
             std::static_pointer_cast<state::response::ObjectNode>(prev_node)->add_node(node);
+          }
         }
       } else {
         if (configuration_->get(classOption, class_definitions)) {
-          std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ",");
-          for (const std::string& clazz : classes) {
-            // instantiate the object
-            std::shared_ptr<core::CoreComponent> ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-            if (nullptr == ptr) {
-              const bool found_metric = [&] {
-                std::lock_guard<std::mutex> guard{metrics_mutex_};
-                auto metric = component_metrics_.find(clazz);
-                if (metric != component_metrics_.end()) {
-                  ptr = metric->second;
-                  return true;
-                }
-                return false;
-              }();
-              if (!found_metric) {
-                logger_->log_error("No metric defined for %s", clazz);
-                continue;
-              }
-            }
-
-            auto node = std::dynamic_pointer_cast<state::response::ResponseNode>(ptr);
-            std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node);
-          }
-          if (!new_node->isEmpty())
+          loadNodeClasses(class_definitions, new_node);
+          if (!new_node->isEmpty()) {
             std::static_pointer_cast<state::response::ObjectNode>(prev_node)->add_node(new_node);
-
+          }
         } else {
           std::string optionName = option + "." + name;
           auto sub_node = loadC2ResponseConfiguration(optionName, new_node);
@@ -308,57 +189,76 @@ std::shared_ptr<state::response::ResponseNode> C2Client::loadC2ResponseConfigura
   return prev_node;
 }
 
-std::shared_ptr<state::response::ResponseNode> C2Client::getMetricsNode(const std::string& metrics_class) const {
+std::optional<state::response::NodeReporter::ReportedNode> C2Client::getMetricsNode(const std::string& metrics_class) const {
+  std::lock_guard<std::mutex> guard{metrics_mutex_};
   if (!metrics_class.empty()) {
-    std::lock_guard<std::mutex> lock(metrics_mutex_);
-    const auto citer = component_metrics_.find(metrics_class);
-    if (citer != component_metrics_.end()) {
-      return citer->second;
+    auto metrics_node = response_node_loader_.getComponentMetricsNode(metrics_class);
+    if (metrics_node) {
+      state::response::NodeReporter::ReportedNode reported_node;
+      reported_node.is_array = metrics_node->isArray();
+      reported_node.name = metrics_node->getName();
+      reported_node.serialized_nodes = metrics_node->serialize();
+      return reported_node;
     }
   } else {
-    std::lock_guard<std::mutex> lock(metrics_mutex_);
     const auto iter = root_response_nodes_.find("metrics");
     if (iter != root_response_nodes_.end()) {
-      return iter->second;
+      state::response::NodeReporter::ReportedNode reported_node;
+      reported_node.is_array = iter->second->isArray();
+      reported_node.name = iter->second->getName();
+      reported_node.serialized_nodes = iter->second->serialize();
+      return reported_node;
     }
   }
-  return nullptr;
+  return std::nullopt;
 }
 
-std::vector<std::shared_ptr<state::response::ResponseNode>> C2Client::getHeartbeatNodes(bool include_manifest) const {
+std::vector<state::response::NodeReporter::ReportedNode> C2Client::getHeartbeatNodes(bool include_manifest) const {
   std::string fullHb{"true"};
   configuration_->get(minifi::Configuration::nifi_c2_full_heartbeat, fullHb);
   const bool include = include_manifest || fullHb == "true";
 
-  std::vector<std::shared_ptr<state::response::ResponseNode>> nodes;
+  std::vector<state::response::NodeReporter::ReportedNode> nodes;
+  std::lock_guard<std::mutex> guard{metrics_mutex_};
   nodes.reserve(root_response_nodes_.size());
-  std::lock_guard<std::mutex> lock(metrics_mutex_);
   for (const auto &entry : root_response_nodes_) {
     auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(entry.second);
     if (identifier) {
       identifier->includeAgentManifest(include);
     }
-    nodes.push_back(entry.second);
+    if (entry.second) {
+      state::response::NodeReporter::ReportedNode reported_node;
+      reported_node.name = entry.second->getName();
+      reported_node.is_array = entry.second->isArray();
+      reported_node.serialized_nodes = entry.second->serialize();
+      nodes.push_back(reported_node);
+    }
   }
   return nodes;
 }
 
-void C2Client::updateResponseNodeConnections() {
-  std::map<std::string, Connection*> connections;
-  if (root_ != nullptr) {
-    root_->getConnections(connections);
-  }
+void C2Client::initializeResponseNodes(core::ProcessGroup* root) {
+  std::string class_csv;
+  std::lock_guard<std::mutex> guard{metrics_mutex_};
+  if (configuration_->get(minifi::Configuration::nifi_c2_root_classes, class_csv)) {
+    std::vector<std::string> classes = utils::StringUtils::split(class_csv, ",");
 
-  std::lock_guard<std::mutex> lock(metrics_mutex_);
-  for (auto& [_, responseNode] : root_response_nodes_) {
-    auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(responseNode.get());
-    if (flowMonitor != nullptr) {
-      flowMonitor->clearConnections();
-      for (const auto &con: connections) {
-        flowMonitor->updateConnection(con.second);
+    for (const std::string& clazz : classes) {
+      auto response_node = response_node_loader_.loadResponseNode(clazz, root);
+      if (!response_node) {
+        continue;
       }
+
+      root_response_nodes_[response_node->getName()] = std::move(response_node);
     }
   }
+
+  loadC2ResponseConfiguration(Configuration::nifi_c2_root_class_definitions);
+}
+
+void C2Client::clearResponseNodes() {
+  std::lock_guard<std::mutex> guard{metrics_mutex_};
+  root_response_nodes_.clear();
 }
 
 }  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/libminifi/src/core/state/nodes/AgentInformation.cpp
index 01f404729..0609f25ec 100644
--- a/libminifi/src/core/state/nodes/AgentInformation.cpp
+++ b/libminifi/src/core/state/nodes/AgentInformation.cpp
@@ -18,21 +18,11 @@
 #include "core/state/nodes/AgentInformation.h"
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::state::response {
 
 utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_;
 std::mutex AgentStatus::cpu_load_tracker_mutex_;
 
 REGISTER_RESOURCE(AgentInformation, DescriptionOnly);
 
-} /* namespace response */
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/libminifi/src/core/state/nodes/QueueMetrics.cpp
similarity index 64%
copy from libminifi/src/core/state/nodes/AgentInformation.cpp
copy to libminifi/src/core/state/nodes/QueueMetrics.cpp
index 01f404729..d37268102 100644
--- a/libminifi/src/core/state/nodes/AgentInformation.cpp
+++ b/libminifi/src/core/state/nodes/QueueMetrics.cpp
@@ -15,24 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "core/state/nodes/AgentInformation.h"
+
+#include "core/state/nodes/QueueMetrics.h"
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::state::response {
 
-utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_;
-std::mutex AgentStatus::cpu_load_tracker_mutex_;
+REGISTER_RESOURCE(QueueMetrics, DescriptionOnly);
 
-REGISTER_RESOURCE(AgentInformation, DescriptionOnly);
+}  // namespace org::apache::nifi::minifi::state::response
 
-} /* namespace response */
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/libminifi/src/core/state/nodes/RepositoryMetrics.cpp
similarity index 64%
copy from libminifi/src/core/state/nodes/AgentInformation.cpp
copy to libminifi/src/core/state/nodes/RepositoryMetrics.cpp
index 01f404729..a8d402b0b 100644
--- a/libminifi/src/core/state/nodes/AgentInformation.cpp
+++ b/libminifi/src/core/state/nodes/RepositoryMetrics.cpp
@@ -15,24 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "core/state/nodes/AgentInformation.h"
+
+#include "core/state/nodes/RepositoryMetrics.h"
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::state::response {
 
-utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_;
-std::mutex AgentStatus::cpu_load_tracker_mutex_;
+REGISTER_RESOURCE(RepositoryMetrics, DescriptionOnly);
 
-REGISTER_RESOURCE(AgentInformation, DescriptionOnly);
+}  // namespace org::apache::nifi::minifi::state::response
 
-} /* namespace response */
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
new file mode 100644
index 000000000..f98d29379
--- /dev/null
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -0,0 +1,193 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/state/nodes/ResponseNodeLoader.h"
+
+#include <vector>
+
+#include "core/Processor.h"
+#include "core/ClassLoader.h"
+#include "core/state/nodes/RepositoryMetrics.h"
+#include "core/state/nodes/QueueMetrics.h"
+#include "core/state/nodes/AgentInformation.h"
+#include "core/state/nodes/ConfigurationChecksums.h"
+#include "c2/C2Agent.h"
+
+namespace org::apache::nifi::minifi::state::response {
+
+ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo,
+    std::shared_ptr<core::Repository> flow_file_repo, core::FlowConfiguration* flow_configuration)
+  : configuration_(std::move(configuration)),
+    provenance_repo_(std::move(provenance_repo)),
+    flow_file_repo_(std::move(flow_file_repo)),
+    flow_configuration_(flow_configuration) {
+}
+
+void ResponseNodeLoader::initializeComponentMetrics(core::ProcessGroup* root) {
+  {
+    std::lock_guard<std::mutex> guard(component_metrics_mutex_);
+    component_metrics_.clear();
+  }
+
+  if (!root) {
+    return;
+  }
+
+  std::vector<core::Processor*> processors;
+  root->getAllProcessors(processors);
+  for (const auto processor : processors) {
+    auto node_source = dynamic_cast<ResponseNodeSource*>(processor);
+    if (node_source == nullptr) {
+      continue;
+    }
+    // we have a metrics source.
+    std::vector<std::shared_ptr<ResponseNode>> metric_vector;
+    node_source->getResponseNodes(metric_vector);
+    std::lock_guard<std::mutex> guard(component_metrics_mutex_);
+    for (const auto& metric : metric_vector) {
+      component_metrics_[metric->getName()] = metric;
+    }
+  }
+}
+
+std::shared_ptr<ResponseNode> ResponseNodeLoader::getResponseNode(const std::string& clazz) const {
+  std::shared_ptr<core::CoreComponent> ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
+  if (ptr == nullptr) {
+    return getComponentMetricsNode(clazz);
+  }
+  return std::dynamic_pointer_cast<ResponseNode>(ptr);
+}
+
+void ResponseNodeLoader::initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) {
+  auto repository_metrics = dynamic_cast<RepositoryMetrics*>(response_node.get());
+  if (repository_metrics != nullptr) {
+    repository_metrics->addRepository(provenance_repo_);
+    repository_metrics->addRepository(flow_file_repo_);
+  }
+}
+
+void ResponseNodeLoader::initializeQueueMetrics(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) {
+  if (!root) {
+    return;
+  }
+
+  auto queue_metrics = dynamic_cast<QueueMetrics*>(response_node.get());
+  if (queue_metrics != nullptr) {
+    std::map<std::string, Connection*> connections;
+    root->getConnections(connections);
+    for (const auto &con : connections) {
+      queue_metrics->updateConnection(con.second);
+    }
+  }
+}
+
+void ResponseNodeLoader::initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) {
+  auto identifier = dynamic_cast<state::response::AgentIdentifier*>(response_node.get());
+  if (identifier != nullptr) {
+    identifier->setAgentIdentificationProvider(configuration_);
+  }
+}
+
+void ResponseNodeLoader::initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) {
+  auto monitor = dynamic_cast<state::response::AgentMonitor*>(response_node.get());
+  if (monitor != nullptr) {
+    monitor->addRepository(provenance_repo_);
+    monitor->addRepository(flow_file_repo_);
+    monitor->setStateMonitor(update_sink_);
+  }
+}
+
+void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) {
+  auto agent_node = dynamic_cast<state::response::AgentNode*>(response_node.get());
+  if (agent_node != nullptr && controller_ != nullptr) {
+    agent_node->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(controller_->getControllerService(c2::C2Agent::UPDATE_NAME)).get());
+  }
+  if (agent_node != nullptr) {
+    agent_node->setConfigurationReader([this](const std::string& key){
+      return configuration_->getRawValue(key);
+    });
+  }
+}
+
+void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) {
+  auto configuration_checksums = dynamic_cast<state::response::ConfigurationChecksums*>(response_node.get());
+  if (configuration_checksums) {
+    configuration_checksums->addChecksumCalculator(configuration_->getChecksumCalculator());
+    if (flow_configuration_) {
+      configuration_checksums->addChecksumCalculator(flow_configuration_->getChecksumCalculator());
+    }
+  }
+}
+
+void ResponseNodeLoader::initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) {
+  auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(response_node.get());
+  if (flowMonitor == nullptr) {
+    return;
+  }
+
+  std::map<std::string, Connection*> connections;
+  if (root) {
+    root->getConnections(connections);
+  }
+
+  for (auto &con : connections) {
+    flowMonitor->updateConnection(con.second);
+  }
+  flowMonitor->setStateMonitor(update_sink_);
+  if (flow_configuration_) {
+    flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion());
+  }
+}
+
+std::shared_ptr<ResponseNode> ResponseNodeLoader::loadResponseNode(const std::string& clazz, core::ProcessGroup* root) {
+  auto response_node = getResponseNode(clazz);
+  if (!response_node) {
+    logger_->log_error("No metric defined for %s", clazz);
+    return nullptr;
+  }
+
+  initializeRepositoryMetrics(response_node);
+  initializeQueueMetrics(response_node, root);
+  initializeAgentIdentifier(response_node);
+  initializeAgentMonitor(response_node);
+  initializeAgentNode(response_node);
+  initializeConfigurationChecksums(response_node);
+  initializeFlowMonitor(response_node, root);
+  return response_node;
+}
+
+std::shared_ptr<state::response::ResponseNode> ResponseNodeLoader::getComponentMetricsNode(const std::string& metrics_class) const {
+  if (!metrics_class.empty()) {
+    std::lock_guard<std::mutex> lock(component_metrics_mutex_);
+    const auto citer = component_metrics_.find(metrics_class);
+    if (citer != component_metrics_.end()) {
+      return citer->second;
+    }
+  }
+  return nullptr;
+}
+
+void ResponseNodeLoader::setControllerServiceProvider(core::controller::ControllerServiceProvider* controller) {
+  controller_ = controller;
+}
+
+void ResponseNodeLoader::setStateMonitor(state::StateMonitor* update_sink) {
+  update_sink_ = update_sink;
+}
+
+}  // namespace org::apache::nifi::minifi::state::response
+
diff --git a/libminifi/test/resources/TestC2Metrics.yml b/libminifi/test/resources/TestC2Metrics.yml
new file mode 100644
index 000000000..ea3b7eb74
--- /dev/null
+++ b/libminifi/test/resources/TestC2Metrics.yml
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+    - name: GetTCP
+      id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      class: org.apache.nifi.processors.standard.GetTCP
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 100 msec
+      penalization period: 30 sec
+      yield period: 10 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          endpoint-list: localhost:8776
+          end-of-message-byte: d
+          reconnect-interval: 100ms
+          connection-attempt-timeout: 2000
+    - name: LogAttribute
+      id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      class: org.apache.nifi.processors.standard.LogAttribute
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 30 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+        - response
+      Properties:
+        Log Level: info
+        Log Payload: true
+
+Connections:
+    - name: GetTCP/success/LogAttribute
+      id: 2438e3c8-015a-1000-79ca-83af40ec1997
+      source name: invoke
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      source relationship names:
+      - success
+      - failure
+      destination name: LogAttribute
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+
+Controller Services: []
+Remote Processing Groups:
+
diff --git a/libminifi/test/resources/TestC2MetricsUpdate.yml b/libminifi/test/resources/TestC2MetricsUpdate.yml
new file mode 100644
index 000000000..477402a53
--- /dev/null
+++ b/libminifi/test/resources/TestC2MetricsUpdate.yml
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+    - id: 4fe2d51d-076a-49b0-88de-5cf5adf52b8f
+      name: GenerateFlowFile
+      class: org.apache.nifi.minifi.processors.GenerateFlowFile
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 300 ms
+      penalization period: 1000 ms
+      yield period: 1000 ms
+      run duration nanos: 0
+      auto-terminated relationships list: []
+      Properties:
+        Batch Size: '1'
+        Data Format: Binary
+        File Size: 10 B
+        Unique FlowFiles: 'true'
+    - name: LogAttribute
+      id: 5128e3c8-015a-1000-79ca-83af40ec1990
+      class: org.apache.nifi.processors.standard.LogAttribute
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 30 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+        - response
+      Properties:
+        Log Level: info
+        Log Payload: true
+Connections:
+    - name: GenerateFlowFile/success/LogAttribute
+      id: 8368e3c8-015a-1003-52ca-83af40ec1332
+      source name: GenerateFlowFile
+      source id: 4fe2d51d-076a-49b0-88de-5cf5adf52b8f
+      source relationship names:
+      - success
+      destination name: LogAttribute
+      destination id: 5128e3c8-015a-1000-79ca-83af40ec1990
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+
+Controller Services: []
+Remote Processing Groups:
diff --git a/libminifi/test/resources/encrypted.minifi.properties b/libminifi/test/resources/encrypted.minifi.properties
index 43d49f823..11c38e182 100644
--- a/libminifi/test/resources/encrypted.minifi.properties
+++ b/libminifi/test/resources/encrypted.minifi.properties
@@ -77,11 +77,11 @@ c2.agent.identifier.protected=xsalsa20poly1305
 ## define metrics reported
 nifi.c2.root.class.definitions=metrics
 nifi.c2.root.class.definitions.metrics.name=metrics
-nifi.c2.root.class.definitions.metrics.metrics=typedmetrics
-nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.name=RuntimeMetrics
-nifi.c2.root.class.definitions.metrics.metrics.queuemetrics.name=QueueMetrics
-nifi.c2.root.class.definitions.metrics.metrics.queuemetrics.classes=QueueMetrics
-nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.classes=ProcessMetrics,SystemInformation
+nifi.c2.root.class.definitions.metrics.metrics=runtimemetrics,loadmetrics,processorMetrics
+nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.name=RuntimeMetrics
+nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes=DeviceInfoNode,FlowInformation
+nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name=LoadMetrics
+nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes=QueueMetrics,RepositoryMetrics
 nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetric
 nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileMetrics
 
@@ -102,3 +102,8 @@ nifi.c2.flow.id=
 nifi.c2.flow.url=
 
 nifi.sensitive.props.additional.keys=c2.agent.identifier
+
+# Publish metrics to external consumers
+# nifi.metrics.publisher.class=PrometheusMetricsPublisher
+# nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936
+# nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
diff --git a/libminifi/test/unit/C2MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp
similarity index 99%
rename from libminifi/test/unit/C2MetricsTests.cpp
rename to libminifi/test/unit/MetricsTests.cpp
index 7ea9af005..6c4faafbb 100644
--- a/libminifi/test/unit/C2MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -51,7 +51,7 @@ TEST_CASE("QueueMetricsTestConnections", "[c2m3]") {
   connection->setMaxQueueDataSize(1024);
   connection->setMaxQueueSize(1024);
 
-  metrics.addConnection(std::move(connection));
+  metrics.updateConnection(connection.get());
 
   REQUIRE(1 == metrics.serialize().size());
 
diff --git a/thirdparty/prometheus-cpp/remove-find_package.patch b/thirdparty/prometheus-cpp/remove-find_package.patch
new file mode 100644
index 000000000..52e0964ea
--- /dev/null
+++ b/thirdparty/prometheus-cpp/remove-find_package.patch
@@ -0,0 +1,13 @@
+diff --git a/pull/CMakeLists.txt b/pull/CMakeLists.txt
+index 8de1069..1322ab8 100644
+--- a/pull/CMakeLists.txt
++++ b/pull/CMakeLists.txt
+@@ -8,7 +8,7 @@ if(USE_THIRDPARTY_LIBRARIES)
+     PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/prometheus
+   )
+ else()
+-  find_package(civetweb CONFIG REQUIRED)
++  # find_package(civetweb CONFIG REQUIRED)
+ 
+   # work-around https://github.com/civetweb/civetweb/pull/918
+   if(WIN32 AND NOT TARGET WINSOCK::WINSOCK)
diff --git a/win_build_vs.bat b/win_build_vs.bat
index e69bb1461..35d3d1f02 100755
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -41,6 +41,7 @@ set redist=
 set build_linter=OFF
 set build_nanofi=OFF
 set build_opencv=OFF
+set build_prometheus=OFF
 set real_odbc=OFF
 
 set arg_counter=0
@@ -63,6 +64,7 @@ for %%x in (%*) do (
     if [%%~x] EQU [/Z]           set build_azure=ON
     if [%%~x] EQU [/N]           set build_nanofi=ON
     if [%%~x] EQU [/O]           set build_opencv=ON
+    if [%%~x] EQU [/PR]          set build_prometheus=ON
     if [%%~x] EQU [/64]          set build_platform=x64
     if [%%~x] EQU [/D]           set cmake_build_type=RelWithDebInfo
     if [%%~x] EQU [/DD]          set cmake_build_type=Debug
@@ -75,7 +77,7 @@ for %%x in (%*) do (
 mkdir %builddir%
 pushd %builddir%\
 
-cmake -G %generator% -A %build_platform% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL% -DUSE_REAL_ODBC_TEST_DRIVER=%real_odbc% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS% -DENABLE_PDH=%build_PDH% -DENABLE_AZURE=%build_azure% -D [...]
+cmake -G %generator% -A %build_platform% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL% -DUSE_REAL_ODBC_TEST_DRIVER=%real_odbc% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS% -DENABLE_PDH=%build_PDH% -DENABLE_AZURE=%build_azure% -D [...]
 IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL%
 if [%cpack%] EQU [ON] (
     cpack -C %cmake_build_type%