You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2023/01/18 19:31:12 UTC

[nifi-minifi-cpp] 02/02: MINIFICPP-1987 Configuring processor metrics with regular expressions

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 2173e13e17585a8f3f60edd59c28475bf7669f46
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Nov 22 17:37:37 2022 +0100

    MINIFICPP-1987 Configuring processor metrics with regular expressions
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1459
---
 C2.md                                              |   4 +
 METRICS.md                                         |  16 ++-
 docker/test/integration/minifi/core/ImageStore.py  |   2 +-
 .../http-curl/tests/C2DescribeMetricsTest.cpp      |  30 ++++-
 extensions/http-curl/tests/C2MetricsTest.cpp       |   2 +-
 .../include/core/state/nodes/ResponseNodeLoader.h  |  21 ++--
 libminifi/src/c2/C2Client.cpp                      |   2 +-
 .../src/core/state/nodes/ResponseNodeLoader.cpp    |  40 ++++--
 libminifi/test/unit/ResponseNodeLoaderTests.cpp    | 134 +++++++++++++++++++++
 9 files changed, 219 insertions(+), 32 deletions(-)

diff --git a/C2.md b/C2.md
index 14c17090a..71c40f192 100644
--- a/C2.md
+++ b/C2.md
@@ -117,6 +117,10 @@ a configuration of an agent
     nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetric
     nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileMetrics
 
+Processor metrics can also be configured using regular expressions with the `processorMetrics/` prefix, so the following definition is also valid:
+
+    nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=processorMetrics/Get.*Metrics
+
 This example shows a metrics sub tree defined by the option 'nifi.c2.root.class.definitions'.
 
 This is a comma separated list of all sub trees. In the example, above, only one sub tree exists: metrics.
diff --git a/METRICS.md b/METRICS.md
index fa0732c06..bf6ed91e4 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -59,13 +59,13 @@ The following option defines which metric classes should be exposed through the
 
     # in minifi.properties
 
-    nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
+    nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation,processorMetrics/Tail.*
 
 An agent identifier should also be defined to identify which agent the metric is exposed from. If not set, the hostname is used as the identifier.
 
-	# in minifi.properties
+    # in minifi.properties
 
-	nifi.metrics.publisher.agent.identifier=Agent1
+    nifi.metrics.publisher.agent.identifier=Agent1
 
 ## System Metrics
 
@@ -168,6 +168,16 @@ AgentStatus is a system level metric that defines current agent status including
 
 Processor level metrics can be accessed for any processor provided by MiNiFi. These metrics correspond to the name of the processor appended by the "Metrics" suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
 
+Besides configuring processor metrics directly, they can also be configured using regular expressions with the `processorMetrics/` prefix.
+
+All available processor metrics can be requested in the `minifi.properties` by using the following configuration:
+
+    nifi.metrics.publisher.metrics=processorMetrics/.*
+
+Regular expressions can also be used for requesting multiple processor metrics at once, like GetFileMetrics and GetTCPMetrics with the following configuration:
+
+    nifi.metrics.publisher.metrics=processorMetrics/Get.*Metrics
+
 ### General Metrics
 
 There are general metrics that are available for all processors. Besides these metrics processors can implement additional metrics that are speicific to that processor.
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index c0c158852..20d212c69 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -111,7 +111,7 @@ class ImageStore:
                 RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.c2.enable=true  >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.c2.rest.url=http://minifi-c2-server:10090/c2/config/heartbeat  >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.c2.rest.url.ack=http://minifi-c2-server:10090/c2/config/acknowledge  >> {minifi_root}/conf/minifi.properties
diff --git a/extensions/http-curl/tests/C2DescribeMetricsTest.cpp b/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
index acf5d80cb..0e177d5fd 100644
--- a/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
@@ -66,10 +66,14 @@ class MetricsHandler: public HeartbeatHandler {
 
   void handleHeartbeat(const rapidjson::Document&, struct mg_connection* conn) override {
     switch (state_) {
-      case TestState::DESCRIBE_SPECIFIC_METRIC: {
+      case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: {
         sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", "GetFileMetrics"}});
         break;
       }
+      case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: {
+        sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", "QueueMetrics"}});
+        break;
+      }
       case TestState::DESCRIBE_ALL_METRICS: {
         sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn);
         break;
@@ -81,8 +85,12 @@ class MetricsHandler: public HeartbeatHandler {
 
   void handleAcknowledge(const rapidjson::Document& root) override {
     switch (state_) {
-      case TestState::DESCRIBE_SPECIFIC_METRIC: {
-        verifySpecificMetrics(root);
+      case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: {
+        verifySpecificProcessorMetrics(root);
+        break;
+      }
+      case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: {
+        verifySpecificSystemMetrics(root);
         break;
       }
       case TestState::DESCRIBE_ALL_METRICS: {
@@ -96,7 +104,8 @@ class MetricsHandler: public HeartbeatHandler {
 
  private:
   enum class TestState {
-    DESCRIBE_SPECIFIC_METRIC,
+    DESCRIBE_SPECIFIC_PROCESSOR_METRIC,
+    DESCRIBE_SPECIFIC_SYSTEM_METRIC,
     DESCRIBE_ALL_METRICS
   };
 
@@ -109,12 +118,21 @@ class MetricsHandler: public HeartbeatHandler {
     mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
   }
 
-  void verifySpecificMetrics(const rapidjson::Document& root) {
+  void verifySpecificProcessorMetrics(const rapidjson::Document& root) {
     auto getfile_metrics_verified =
       !root.HasMember("metrics") &&
       root.HasMember("GetFileMetrics") &&
       root["GetFileMetrics"].HasMember(GETFILE1_UUID) &&
       root["GetFileMetrics"].HasMember(GETFILE2_UUID);
+    if (getfile_metrics_verified) {
+      state_ = TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC;
+    }
+  }
+
+  void verifySpecificSystemMetrics(const rapidjson::Document& root) {
+    auto getfile_metrics_verified =
+      !root.HasMember("metrics") &&
+      root.HasMember("QueueMetrics");
     if (getfile_metrics_verified) {
       state_ = TestState::DESCRIBE_ALL_METRICS;
     }
@@ -136,7 +154,7 @@ class MetricsHandler: public HeartbeatHandler {
     }
   }
 
-  TestState state_ = TestState::DESCRIBE_SPECIFIC_METRIC;
+  TestState state_ = TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC;
   std::atomic_bool& metrics_found_;
 };
 
diff --git a/extensions/http-curl/tests/C2MetricsTest.cpp b/extensions/http-curl/tests/C2MetricsTest.cpp
index 16dd9fcbc..2d717e2e7 100644
--- a/extensions/http-curl/tests/C2MetricsTest.cpp
+++ b/extensions/http-curl/tests/C2MetricsTest.cpp
@@ -207,7 +207,7 @@ int main(int argc, char **argv) {
   harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name", "LoadMetrics");
   harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes", "QueueMetrics,RepositoryMetrics");
   harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name", "ProcessorMetrics");
-  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes", "GetTCPMetrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes", "processorMetrics/GetTCP.*");
   harness.setKeyDir(args.key_dir);
   auto replacement_path = args.test_file;
   minifi::utils::StringUtils::replaceAll(replacement_path, "TestC2Metrics", "TestC2MetricsUpdate");
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 8ad250f9f..b30125673 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -39,22 +39,23 @@ class ResponseNodeLoader {
  public:
   ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo,
     std::shared_ptr<core::Repository> flow_file_repo, core::FlowConfiguration* flow_configuration);
-  std::vector<std::shared_ptr<ResponseNode>> loadResponseNodes(const std::string& clazz, core::ProcessGroup* root);
-  std::vector<std::shared_ptr<ResponseNode>> getComponentMetricsNodes(const std::string& metrics_class) const;
+  void initializeComponentMetrics(core::ProcessGroup* root);
   void setControllerServiceProvider(core::controller::ControllerServiceProvider* controller);
   void setStateMonitor(state::StateMonitor* update_sink);
-  void initializeComponentMetrics(core::ProcessGroup* root);
+  std::vector<std::shared_ptr<ResponseNode>> loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) const;
 
  private:
+  std::vector<std::shared_ptr<ResponseNode>> getComponentMetricsNodes(const std::string& metrics_class) const;
   std::vector<std::shared_ptr<ResponseNode>> getResponseNodes(const std::string& clazz) const;
-  void initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node);
+  void initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) const;
   static void initializeQueueMetrics(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root);
-  void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root);
+  void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) const;
+  std::vector<std::shared_ptr<ResponseNode>> getMatchingComponentMetricsNodes(const std::string& regex_str) const;
 
   mutable std::mutex component_metrics_mutex_;
   std::unordered_map<std::string, std::vector<std::shared_ptr<ResponseNode>>> component_metrics_;
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index 39ec5131f..26b6c6817 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -210,7 +210,7 @@ std::optional<state::response::NodeReporter::ReportedNode> C2Client::getMetricsN
   };
 
   if (!metrics_class.empty()) {
-    auto metrics_nodes = response_node_loader_.getComponentMetricsNodes(metrics_class);
+    auto metrics_nodes = response_node_loader_.loadResponseNodes(metrics_class, root_.get());
     if (!metrics_nodes.empty()) {
       return createReportedNode(metrics_nodes);
     }
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index 149fe14e1..df2f8720e 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -27,6 +27,8 @@
 #include "core/state/nodes/ConfigurationChecksums.h"
 #include "c2/C2Agent.h"
 #include "utils/gsl.h"
+#include "utils/RegexUtils.h"
+#include "utils/StringUtils.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
@@ -75,7 +77,7 @@ std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::getResponseNodes(
   return {response_node};
 }
 
-void ResponseNodeLoader::initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) const {
   auto repository_metrics = dynamic_cast<RepositoryMetrics*>(response_node.get());
   if (repository_metrics != nullptr) {
     repository_metrics->addRepository(provenance_repo_);
@@ -98,14 +100,14 @@ void ResponseNodeLoader::initializeQueueMetrics(const std::shared_ptr<ResponseNo
   }
 }
 
-void ResponseNodeLoader::initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) const {
   auto identifier = dynamic_cast<state::response::AgentIdentifier*>(response_node.get());
   if (identifier != nullptr) {
     identifier->setAgentIdentificationProvider(configuration_);
   }
 }
 
-void ResponseNodeLoader::initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) const {
   auto monitor = dynamic_cast<state::response::AgentMonitor*>(response_node.get());
   if (monitor != nullptr) {
     monitor->addRepository(provenance_repo_);
@@ -114,7 +116,7 @@ void ResponseNodeLoader::initializeAgentMonitor(const std::shared_ptr<ResponseNo
   }
 }
 
-void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) const {
   auto agent_node = dynamic_cast<state::response::AgentNode*>(response_node.get());
   if (agent_node != nullptr && controller_ != nullptr) {
     agent_node->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(controller_->getControllerService(c2::C2Agent::UPDATE_NAME)).get());
@@ -126,7 +128,7 @@ void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>
   }
 }
 
-void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) const {
   auto agent_status = dynamic_cast<state::response::AgentStatus*>(response_node.get());
   if (agent_status != nullptr) {
     agent_status->addRepository(provenance_repo_);
@@ -135,7 +137,7 @@ void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNod
   }
 }
 
-void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) const {
   auto configuration_checksums = dynamic_cast<state::response::ConfigurationChecksums*>(response_node.get());
   if (configuration_checksums) {
     configuration_checksums->addChecksumCalculator(configuration_->getChecksumCalculator());
@@ -145,7 +147,7 @@ void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<
   }
 }
 
-void ResponseNodeLoader::initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) {
+void ResponseNodeLoader::initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) const {
   auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(response_node.get());
   if (flowMonitor == nullptr) {
     return;
@@ -165,7 +167,7 @@ void ResponseNodeLoader::initializeFlowMonitor(const std::shared_ptr<ResponseNod
   }
 }
 
-std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) {
+std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) const {
   auto response_nodes = getResponseNodes(clazz);
   if (response_nodes.empty()) {
     logger_->log_error("No metric defined for %s", clazz);
@@ -185,9 +187,27 @@ std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes
   return response_nodes;
 }
 
+std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::getMatchingComponentMetricsNodes(const std::string& regex_str) const {
+  std::vector<std::shared_ptr<ResponseNode>> result;
+  for (const auto& [metric_name, metrics] : component_metrics_) {
+    utils::Regex regex(regex_str);
+    if (utils::regexMatch(metric_name, regex)) {
+      result.insert(result.end(), metrics.begin(), metrics.end());
+    }
+  }
+  return result;
+}
+
 std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::getComponentMetricsNodes(const std::string& metrics_class) const {
-  if (!metrics_class.empty()) {
-    std::lock_guard<std::mutex> lock(component_metrics_mutex_);
+  if (metrics_class.empty()) {
+    return {};
+  }
+  std::lock_guard<std::mutex> lock(component_metrics_mutex_);
+  static const std::string PROCESSOR_FILTER_PREFIX = "processorMetrics/";
+  if (utils::StringUtils::startsWith(metrics_class, PROCESSOR_FILTER_PREFIX)) {
+    auto regex_str = metrics_class.substr(PROCESSOR_FILTER_PREFIX.size());
+    return getMatchingComponentMetricsNodes(regex_str);
+  } else {
     const auto citer = component_metrics_.find(metrics_class);
     if (citer != component_metrics_.end()) {
       return citer->second;
diff --git a/libminifi/test/unit/ResponseNodeLoaderTests.cpp b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
new file mode 100644
index 000000000..222346764
--- /dev/null
+++ b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
@@ -0,0 +1,134 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include "../Catch.h"
+#include "core/state/nodes/ResponseNodeLoader.h"
+#include "../ReadFromFlowFileTestProcessor.h"
+#include "../WriteToFlowFileTestProcessor.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "utils/Id.h"
+#include "ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class ResponseNodeLoaderTestFixture {
+ public:
+  ResponseNodeLoaderTestFixture()
+    : root_(std::make_unique<minifi::core::ProcessGroup>(minifi::core::ProcessGroupType::ROOT_PROCESS_GROUP, "root")),
+      configuration_(std::make_shared<minifi::Configure>()),
+      prov_repo_(std::make_shared<TestRepository>()),
+      ff_repository_(std::make_shared<TestRepository>()),
+      content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
+      response_node_loader_(configuration_, prov_repo_, ff_repository_, nullptr) {
+    ff_repository_->initialize(configuration_);
+    content_repo_->initialize(configuration_);
+    auto uuid1 = addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1");
+    auto uuid2 = addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1");
+    addConnection("Connection", "success", uuid1, uuid2);
+    auto uuid3 = addProcessor<minifi::processors::ReadFromFlowFileTestProcessor>("ReadFromFlowFileTestProcessor");
+    addConnection("Connection", "success", uuid2, uuid3);
+    response_node_loader_.initializeComponentMetrics(root_.get());
+  }
+
+ protected:
+  template<typename T, typename = typename std::enable_if_t<std::is_base_of_v<minifi::core::Processor, T>>>
+  minifi::utils::Identifier addProcessor(const std::string& name) {
+    auto processor = std::make_unique<T>(name);
+    auto uuid = processor->getUUID();
+    root_->addProcessor(std::move(processor));
+    return uuid;
+  }
+
+  void addConnection(const std::string& connection_name, const std::string& relationship_name, const minifi::utils::Identifier& src_uuid, const minifi::utils::Identifier& dst_uuid) {
+    auto connection = std::make_unique<minifi::Connection>(ff_repository_, content_repo_, connection_name);
+    connection->setRelationship({relationship_name, "d"});
+    connection->setDestinationUUID(src_uuid);
+    connection->setSourceUUID(dst_uuid);
+    root_->addConnection(std::move(connection));
+  }
+
+  std::unique_ptr<minifi::core::ProcessGroup> root_;
+  std::shared_ptr<minifi::Configure> configuration_;
+  std::shared_ptr<TestRepository> prov_repo_;
+  std::shared_ptr<TestRepository> ff_repository_;
+  std::shared_ptr<minifi::core::ContentRepository> content_repo_;
+  minifi::state::response::ResponseNodeLoader response_node_loader_;
+};
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load non-existent response node", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("NonExistentNode", root_.get());
+  REQUIRE(nodes.empty());
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load processor metrics node not part of the flow config", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("TailFileMetrics", root_.get());
+  REQUIRE(nodes.empty());
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load system metrics node", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("QueueMetrics", root_.get());
+  REQUIRE(nodes.size() == 1);
+  REQUIRE(nodes[0]->getName() == "QueueMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load processor metrics node part of the flow config", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("ReadFromFlowFileTestProcessorMetrics", root_.get());
+  REQUIRE(nodes.size() == 1);
+  REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load multiple processor metrics nodes of the same type in a single flow", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("WriteToFlowFileTestProcessorMetrics", root_.get());
+  REQUIRE(nodes.size() == 2);
+  REQUIRE(nodes[0]->getName() == "WriteToFlowFileTestProcessorMetrics");
+  REQUIRE(nodes[1]->getName() == "WriteToFlowFileTestProcessorMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Use regex to filter processor metrics", "[responseNodeLoaderTest]") {
+  SECTION("Load all processor metrics with regex") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/.*", root_.get());
+    std::unordered_map<std::string, uint32_t> metric_counts;
+    REQUIRE(nodes.size() == 3);
+    for (const auto& node : nodes) {
+      metric_counts[node->getName()]++;
+    }
+    REQUIRE(metric_counts["WriteToFlowFileTestProcessorMetrics"] == 2);
+    REQUIRE(metric_counts["ReadFromFlowFileTestProcessorMetrics"] == 1);
+  }
+
+  SECTION("Filter for a single processor") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/Read.*", root_.get());
+    REQUIRE(nodes.size() == 1);
+    REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+  }
+
+  SECTION("Full match") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/ReadFromFlowFileTestProcessorMetrics", root_.get());
+    REQUIRE(nodes.size() == 1);
+    REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+  }
+
+  SECTION("No partial match is allowed") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/Read", root_.get());
+    REQUIRE(nodes.empty());
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::test