You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2023/01/18 19:31:10 UTC

[nifi-minifi-cpp] branch main updated (8d18ef1ff -> 2173e13e1)

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from 8d18ef1ff MINIFICPP-2026 Make isRunning member functions const
     new e6d827187 MINIFICPP-1973 Refactor ResourceQueue
     new 2173e13e1 MINIFICPP-1987 Configuring processor metrics with regular expressions

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 C2.md                                              |   4 +
 METRICS.md                                         |  16 ++-
 docker/test/integration/minifi/core/ImageStore.py  |   2 +-
 extensions/http-curl/processors/InvokeHTTP.cpp     |  15 ++-
 .../http-curl/tests/C2DescribeMetricsTest.cpp      |  30 ++++-
 extensions/http-curl/tests/C2MetricsTest.cpp       |   2 +-
 extensions/script/ExecuteScript.cpp                |  10 +-
 extensions/splunk/PutSplunkHTTP.cpp                |  48 ++++----
 .../include/core/state/nodes/ResponseNodeLoader.h  |  21 ++--
 libminifi/include/utils/ResourceQueue.h            |  32 +++--
 libminifi/src/c2/C2Client.cpp                      |   2 +-
 .../src/core/state/nodes/ResponseNodeLoader.cpp    |  40 ++++--
 libminifi/test/unit/ResourceQueueTests.cpp         |  77 ++++++++----
 libminifi/test/unit/ResponseNodeLoaderTests.cpp    | 134 +++++++++++++++++++++
 14 files changed, 336 insertions(+), 97 deletions(-)
 create mode 100644 libminifi/test/unit/ResponseNodeLoaderTests.cpp


[nifi-minifi-cpp] 02/02: MINIFICPP-1987 Configuring processor metrics with regular expressions

Posted by fg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 2173e13e17585a8f3f60edd59c28475bf7669f46
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Nov 22 17:37:37 2022 +0100

    MINIFICPP-1987 Configuring processor metrics with regular expressions
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1459
---
 C2.md                                              |   4 +
 METRICS.md                                         |  16 ++-
 docker/test/integration/minifi/core/ImageStore.py  |   2 +-
 .../http-curl/tests/C2DescribeMetricsTest.cpp      |  30 ++++-
 extensions/http-curl/tests/C2MetricsTest.cpp       |   2 +-
 .../include/core/state/nodes/ResponseNodeLoader.h  |  21 ++--
 libminifi/src/c2/C2Client.cpp                      |   2 +-
 .../src/core/state/nodes/ResponseNodeLoader.cpp    |  40 ++++--
 libminifi/test/unit/ResponseNodeLoaderTests.cpp    | 134 +++++++++++++++++++++
 9 files changed, 219 insertions(+), 32 deletions(-)

diff --git a/C2.md b/C2.md
index 14c17090a..71c40f192 100644
--- a/C2.md
+++ b/C2.md
@@ -117,6 +117,10 @@ a configuration of an agent
     nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetric
     nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileMetrics
 
+Processor metrics can also be configured using regular expressions with the `processorMetrics/` prefix, so the following definition is also valid:
+
+    nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=processorMetrics/Get.*Metrics
+
 This example shows a metrics sub tree defined by the option 'nifi.c2.root.class.definitions'.
 
 This is a comma separated list of all sub trees. In the example, above, only one sub tree exists: metrics.
diff --git a/METRICS.md b/METRICS.md
index fa0732c06..bf6ed91e4 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -59,13 +59,13 @@ The following option defines which metric classes should be exposed through the
 
     # in minifi.properties
 
-    nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
+    nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation,processorMetrics/Tail.*
 
 An agent identifier should also be defined to identify which agent the metric is exposed from. If not set, the hostname is used as the identifier.
 
-	# in minifi.properties
+    # in minifi.properties
 
-	nifi.metrics.publisher.agent.identifier=Agent1
+    nifi.metrics.publisher.agent.identifier=Agent1
 
 ## System Metrics
 
@@ -168,6 +168,16 @@ AgentStatus is a system level metric that defines current agent status including
 
 Processor level metrics can be accessed for any processor provided by MiNiFi. These metrics correspond to the name of the processor appended by the "Metrics" suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
 
+Besides configuring processor metrics directly, they can also be configured using regular expressions with the `processorMetrics/` prefix.
+
+All available processor metrics can be requested in the `minifi.properties` by using the following configuration:
+
+    nifi.metrics.publisher.metrics=processorMetrics/.*
+
+Regular expressions can also be used for requesting multiple processor metrics at once, like GetFileMetrics and GetTCPMetrics with the following configuration:
+
+    nifi.metrics.publisher.metrics=processorMetrics/Get.*Metrics
+
 ### General Metrics
 
 There are general metrics that are available for all processors. Besides these metrics processors can implement additional metrics that are speicific to that processor.
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index c0c158852..20d212c69 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -111,7 +111,7 @@ class ImageStore:
                 RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.c2.enable=true  >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.c2.rest.url=http://minifi-c2-server:10090/c2/config/heartbeat  >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.c2.rest.url.ack=http://minifi-c2-server:10090/c2/config/acknowledge  >> {minifi_root}/conf/minifi.properties
diff --git a/extensions/http-curl/tests/C2DescribeMetricsTest.cpp b/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
index acf5d80cb..0e177d5fd 100644
--- a/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
@@ -66,10 +66,14 @@ class MetricsHandler: public HeartbeatHandler {
 
   void handleHeartbeat(const rapidjson::Document&, struct mg_connection* conn) override {
     switch (state_) {
-      case TestState::DESCRIBE_SPECIFIC_METRIC: {
+      case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: {
         sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", "GetFileMetrics"}});
         break;
       }
+      case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: {
+        sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", "QueueMetrics"}});
+        break;
+      }
       case TestState::DESCRIBE_ALL_METRICS: {
         sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn);
         break;
@@ -81,8 +85,12 @@ class MetricsHandler: public HeartbeatHandler {
 
   void handleAcknowledge(const rapidjson::Document& root) override {
     switch (state_) {
-      case TestState::DESCRIBE_SPECIFIC_METRIC: {
-        verifySpecificMetrics(root);
+      case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: {
+        verifySpecificProcessorMetrics(root);
+        break;
+      }
+      case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: {
+        verifySpecificSystemMetrics(root);
         break;
       }
       case TestState::DESCRIBE_ALL_METRICS: {
@@ -96,7 +104,8 @@ class MetricsHandler: public HeartbeatHandler {
 
  private:
   enum class TestState {
-    DESCRIBE_SPECIFIC_METRIC,
+    DESCRIBE_SPECIFIC_PROCESSOR_METRIC,
+    DESCRIBE_SPECIFIC_SYSTEM_METRIC,
     DESCRIBE_ALL_METRICS
   };
 
@@ -109,12 +118,21 @@ class MetricsHandler: public HeartbeatHandler {
     mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
   }
 
-  void verifySpecificMetrics(const rapidjson::Document& root) {
+  void verifySpecificProcessorMetrics(const rapidjson::Document& root) {
     auto getfile_metrics_verified =
       !root.HasMember("metrics") &&
       root.HasMember("GetFileMetrics") &&
       root["GetFileMetrics"].HasMember(GETFILE1_UUID) &&
       root["GetFileMetrics"].HasMember(GETFILE2_UUID);
+    if (getfile_metrics_verified) {
+      state_ = TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC;
+    }
+  }
+
+  void verifySpecificSystemMetrics(const rapidjson::Document& root) {
+    auto getfile_metrics_verified =
+      !root.HasMember("metrics") &&
+      root.HasMember("QueueMetrics");
     if (getfile_metrics_verified) {
       state_ = TestState::DESCRIBE_ALL_METRICS;
     }
@@ -136,7 +154,7 @@ class MetricsHandler: public HeartbeatHandler {
     }
   }
 
-  TestState state_ = TestState::DESCRIBE_SPECIFIC_METRIC;
+  TestState state_ = TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC;
   std::atomic_bool& metrics_found_;
 };
 
diff --git a/extensions/http-curl/tests/C2MetricsTest.cpp b/extensions/http-curl/tests/C2MetricsTest.cpp
index 16dd9fcbc..2d717e2e7 100644
--- a/extensions/http-curl/tests/C2MetricsTest.cpp
+++ b/extensions/http-curl/tests/C2MetricsTest.cpp
@@ -207,7 +207,7 @@ int main(int argc, char **argv) {
   harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name", "LoadMetrics");
   harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes", "QueueMetrics,RepositoryMetrics");
   harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name", "ProcessorMetrics");
-  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes", "GetTCPMetrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes", "processorMetrics/GetTCP.*");
   harness.setKeyDir(args.key_dir);
   auto replacement_path = args.test_file;
   minifi::utils::StringUtils::replaceAll(replacement_path, "TestC2Metrics", "TestC2MetricsUpdate");
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 8ad250f9f..b30125673 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -39,22 +39,23 @@ class ResponseNodeLoader {
  public:
   ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo,
     std::shared_ptr<core::Repository> flow_file_repo, core::FlowConfiguration* flow_configuration);
-  std::vector<std::shared_ptr<ResponseNode>> loadResponseNodes(const std::string& clazz, core::ProcessGroup* root);
-  std::vector<std::shared_ptr<ResponseNode>> getComponentMetricsNodes(const std::string& metrics_class) const;
+  void initializeComponentMetrics(core::ProcessGroup* root);
   void setControllerServiceProvider(core::controller::ControllerServiceProvider* controller);
   void setStateMonitor(state::StateMonitor* update_sink);
-  void initializeComponentMetrics(core::ProcessGroup* root);
+  std::vector<std::shared_ptr<ResponseNode>> loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) const;
 
  private:
+  std::vector<std::shared_ptr<ResponseNode>> getComponentMetricsNodes(const std::string& metrics_class) const;
   std::vector<std::shared_ptr<ResponseNode>> getResponseNodes(const std::string& clazz) const;
-  void initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node);
+  void initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) const;
   static void initializeQueueMetrics(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root);
-  void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root);
+  void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) const;
+  std::vector<std::shared_ptr<ResponseNode>> getMatchingComponentMetricsNodes(const std::string& regex_str) const;
 
   mutable std::mutex component_metrics_mutex_;
   std::unordered_map<std::string, std::vector<std::shared_ptr<ResponseNode>>> component_metrics_;
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index 39ec5131f..26b6c6817 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -210,7 +210,7 @@ std::optional<state::response::NodeReporter::ReportedNode> C2Client::getMetricsN
   };
 
   if (!metrics_class.empty()) {
-    auto metrics_nodes = response_node_loader_.getComponentMetricsNodes(metrics_class);
+    auto metrics_nodes = response_node_loader_.loadResponseNodes(metrics_class, root_.get());
     if (!metrics_nodes.empty()) {
       return createReportedNode(metrics_nodes);
     }
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index 149fe14e1..df2f8720e 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -27,6 +27,8 @@
 #include "core/state/nodes/ConfigurationChecksums.h"
 #include "c2/C2Agent.h"
 #include "utils/gsl.h"
+#include "utils/RegexUtils.h"
+#include "utils/StringUtils.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
@@ -75,7 +77,7 @@ std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::getResponseNodes(
   return {response_node};
 }
 
-void ResponseNodeLoader::initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) const {
   auto repository_metrics = dynamic_cast<RepositoryMetrics*>(response_node.get());
   if (repository_metrics != nullptr) {
     repository_metrics->addRepository(provenance_repo_);
@@ -98,14 +100,14 @@ void ResponseNodeLoader::initializeQueueMetrics(const std::shared_ptr<ResponseNo
   }
 }
 
-void ResponseNodeLoader::initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) const {
   auto identifier = dynamic_cast<state::response::AgentIdentifier*>(response_node.get());
   if (identifier != nullptr) {
     identifier->setAgentIdentificationProvider(configuration_);
   }
 }
 
-void ResponseNodeLoader::initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) const {
   auto monitor = dynamic_cast<state::response::AgentMonitor*>(response_node.get());
   if (monitor != nullptr) {
     monitor->addRepository(provenance_repo_);
@@ -114,7 +116,7 @@ void ResponseNodeLoader::initializeAgentMonitor(const std::shared_ptr<ResponseNo
   }
 }
 
-void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) const {
   auto agent_node = dynamic_cast<state::response::AgentNode*>(response_node.get());
   if (agent_node != nullptr && controller_ != nullptr) {
     agent_node->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(controller_->getControllerService(c2::C2Agent::UPDATE_NAME)).get());
@@ -126,7 +128,7 @@ void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>
   }
 }
 
-void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) const {
   auto agent_status = dynamic_cast<state::response::AgentStatus*>(response_node.get());
   if (agent_status != nullptr) {
     agent_status->addRepository(provenance_repo_);
@@ -135,7 +137,7 @@ void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNod
   }
 }
 
-void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) const {
   auto configuration_checksums = dynamic_cast<state::response::ConfigurationChecksums*>(response_node.get());
   if (configuration_checksums) {
     configuration_checksums->addChecksumCalculator(configuration_->getChecksumCalculator());
@@ -145,7 +147,7 @@ void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<
   }
 }
 
-void ResponseNodeLoader::initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) {
+void ResponseNodeLoader::initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) const {
   auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(response_node.get());
   if (flowMonitor == nullptr) {
     return;
@@ -165,7 +167,7 @@ void ResponseNodeLoader::initializeFlowMonitor(const std::shared_ptr<ResponseNod
   }
 }
 
-std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) {
+std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) const {
   auto response_nodes = getResponseNodes(clazz);
   if (response_nodes.empty()) {
     logger_->log_error("No metric defined for %s", clazz);
@@ -185,9 +187,27 @@ std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes
   return response_nodes;
 }
 
+std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::getMatchingComponentMetricsNodes(const std::string& regex_str) const {
+  std::vector<std::shared_ptr<ResponseNode>> result;
+  for (const auto& [metric_name, metrics] : component_metrics_) {
+    utils::Regex regex(regex_str);
+    if (utils::regexMatch(metric_name, regex)) {
+      result.insert(result.end(), metrics.begin(), metrics.end());
+    }
+  }
+  return result;
+}
+
 std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::getComponentMetricsNodes(const std::string& metrics_class) const {
-  if (!metrics_class.empty()) {
-    std::lock_guard<std::mutex> lock(component_metrics_mutex_);
+  if (metrics_class.empty()) {
+    return {};
+  }
+  std::lock_guard<std::mutex> lock(component_metrics_mutex_);
+  static const std::string PROCESSOR_FILTER_PREFIX = "processorMetrics/";
+  if (utils::StringUtils::startsWith(metrics_class, PROCESSOR_FILTER_PREFIX)) {
+    auto regex_str = metrics_class.substr(PROCESSOR_FILTER_PREFIX.size());
+    return getMatchingComponentMetricsNodes(regex_str);
+  } else {
     const auto citer = component_metrics_.find(metrics_class);
     if (citer != component_metrics_.end()) {
       return citer->second;
diff --git a/libminifi/test/unit/ResponseNodeLoaderTests.cpp b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
new file mode 100644
index 000000000..222346764
--- /dev/null
+++ b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
@@ -0,0 +1,134 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include "../Catch.h"
+#include "core/state/nodes/ResponseNodeLoader.h"
+#include "../ReadFromFlowFileTestProcessor.h"
+#include "../WriteToFlowFileTestProcessor.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "utils/Id.h"
+#include "ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class ResponseNodeLoaderTestFixture {
+ public:
+  ResponseNodeLoaderTestFixture()
+    : root_(std::make_unique<minifi::core::ProcessGroup>(minifi::core::ProcessGroupType::ROOT_PROCESS_GROUP, "root")),
+      configuration_(std::make_shared<minifi::Configure>()),
+      prov_repo_(std::make_shared<TestRepository>()),
+      ff_repository_(std::make_shared<TestRepository>()),
+      content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
+      response_node_loader_(configuration_, prov_repo_, ff_repository_, nullptr) {
+    ff_repository_->initialize(configuration_);
+    content_repo_->initialize(configuration_);
+    auto uuid1 = addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1");
+    auto uuid2 = addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1");
+    addConnection("Connection", "success", uuid1, uuid2);
+    auto uuid3 = addProcessor<minifi::processors::ReadFromFlowFileTestProcessor>("ReadFromFlowFileTestProcessor");
+    addConnection("Connection", "success", uuid2, uuid3);
+    response_node_loader_.initializeComponentMetrics(root_.get());
+  }
+
+ protected:
+  template<typename T, typename = typename std::enable_if_t<std::is_base_of_v<minifi::core::Processor, T>>>
+  minifi::utils::Identifier addProcessor(const std::string& name) {
+    auto processor = std::make_unique<T>(name);
+    auto uuid = processor->getUUID();
+    root_->addProcessor(std::move(processor));
+    return uuid;
+  }
+
+  void addConnection(const std::string& connection_name, const std::string& relationship_name, const minifi::utils::Identifier& src_uuid, const minifi::utils::Identifier& dst_uuid) {
+    auto connection = std::make_unique<minifi::Connection>(ff_repository_, content_repo_, connection_name);
+    connection->setRelationship({relationship_name, "d"});
+    connection->setDestinationUUID(src_uuid);
+    connection->setSourceUUID(dst_uuid);
+    root_->addConnection(std::move(connection));
+  }
+
+  std::unique_ptr<minifi::core::ProcessGroup> root_;
+  std::shared_ptr<minifi::Configure> configuration_;
+  std::shared_ptr<TestRepository> prov_repo_;
+  std::shared_ptr<TestRepository> ff_repository_;
+  std::shared_ptr<minifi::core::ContentRepository> content_repo_;
+  minifi::state::response::ResponseNodeLoader response_node_loader_;
+};
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load non-existent response node", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("NonExistentNode", root_.get());
+  REQUIRE(nodes.empty());
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load processor metrics node not part of the flow config", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("TailFileMetrics", root_.get());
+  REQUIRE(nodes.empty());
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load system metrics node", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("QueueMetrics", root_.get());
+  REQUIRE(nodes.size() == 1);
+  REQUIRE(nodes[0]->getName() == "QueueMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load processor metrics node part of the flow config", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("ReadFromFlowFileTestProcessorMetrics", root_.get());
+  REQUIRE(nodes.size() == 1);
+  REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load multiple processor metrics nodes of the same type in a single flow", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("WriteToFlowFileTestProcessorMetrics", root_.get());
+  REQUIRE(nodes.size() == 2);
+  REQUIRE(nodes[0]->getName() == "WriteToFlowFileTestProcessorMetrics");
+  REQUIRE(nodes[1]->getName() == "WriteToFlowFileTestProcessorMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Use regex to filter processor metrics", "[responseNodeLoaderTest]") {
+  SECTION("Load all processor metrics with regex") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/.*", root_.get());
+    std::unordered_map<std::string, uint32_t> metric_counts;
+    REQUIRE(nodes.size() == 3);
+    for (const auto& node : nodes) {
+      metric_counts[node->getName()]++;
+    }
+    REQUIRE(metric_counts["WriteToFlowFileTestProcessorMetrics"] == 2);
+    REQUIRE(metric_counts["ReadFromFlowFileTestProcessorMetrics"] == 1);
+  }
+
+  SECTION("Filter for a single processor") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/Read.*", root_.get());
+    REQUIRE(nodes.size() == 1);
+    REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+  }
+
+  SECTION("Full match") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/ReadFromFlowFileTestProcessorMetrics", root_.get());
+    REQUIRE(nodes.size() == 1);
+    REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+  }
+
+  SECTION("No partial match is allowed") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/Read", root_.get());
+    REQUIRE(nodes.empty());
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::test


[nifi-minifi-cpp] 01/02: MINIFICPP-1973 Refactor ResourceQueue

Posted by fg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e6d8271870c37a4dcea87a35b9b70e5e5b178ee5
Author: Martin Zink <ma...@apache.org>
AuthorDate: Tue Nov 29 13:10:00 2022 +0100

    MINIFICPP-1973 Refactor ResourceQueue
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1473
---
 extensions/http-curl/processors/InvokeHTTP.cpp | 15 +++--
 extensions/script/ExecuteScript.cpp            | 10 ++--
 extensions/splunk/PutSplunkHTTP.cpp            | 48 ++++++++--------
 libminifi/include/utils/ResourceQueue.h        | 32 ++++++++---
 libminifi/test/unit/ResourceQueueTests.cpp     | 77 ++++++++++++++++++--------
 5 files changed, 117 insertions(+), 65 deletions(-)

diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index b7397badd..9e803eec1 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -263,7 +263,15 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context
   gsl_Expects(context);
 
   setupMembersFromProperties(*context);
-  client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(getMaxConcurrentTasks(), logger_);
+  std::weak_ptr<core::ProcessContext> weak_context = context;
+  auto create_client = [this, weak_context]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
+    if (auto context = weak_context.lock())
+      return createHTTPClientFromPropertiesAndMembers(*context);
+    else
+      return nullptr;
+  };
+
+  client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_);
 }
 
 bool InvokeHTTP::shouldEmitFlowFile(minifi::extensions::curl::HTTPClient& client) {
@@ -306,11 +314,8 @@ bool InvokeHTTP::appendHeaders(const core::FlowFile& flow_file, /*std::invocable
 
 void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
   gsl_Expects(session && context && client_queue_);
-  auto create_client = [&]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
-    return createHTTPClientFromPropertiesAndMembers(*context);
-  };
 
-  auto client = client_queue_->getResource(create_client);
+  auto client = client_queue_->getResource();
 
   onTriggerWithClient(context, session, *client);
 }
diff --git a/extensions/script/ExecuteScript.cpp b/extensions/script/ExecuteScript.cpp
index 28717c122..7124abbba 100644
--- a/extensions/script/ExecuteScript.cpp
+++ b/extensions/script/ExecuteScript.cpp
@@ -71,7 +71,10 @@ void ExecuteScript::initialize() {
 
 void ExecuteScript::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory* /*sessionFactory*/) {
 #ifdef LUA_SUPPORT
-  lua_script_engine_queue_ = utils::ResourceQueue<lua::LuaScriptEngine>::create(getMaxConcurrentTasks(), logger_);
+  auto create_engine = [this]() -> std::unique_ptr<lua::LuaScriptEngine> {
+    return engine_factory_.createEngine<lua::LuaScriptEngine>();
+  };
+  lua_script_engine_queue_ = utils::ResourceQueue<lua::LuaScriptEngine>::create(create_engine, getMaxConcurrentTasks(), std::nullopt, logger_);
 #endif  // LUA_SUPPORT
 #ifdef PYTHON_SUPPORT
   python_script_engine_ = engine_factory_.createEngine<python::PythonScriptEngine>();
@@ -114,11 +117,8 @@ void ExecuteScript::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
   } else if (script_engine_ == ScriptEngineOption::LUA) {
 #ifdef LUA_SUPPORT
     gsl_Expects(lua_script_engine_queue_);
-    auto create_engine = [&]() -> std::unique_ptr<lua::LuaScriptEngine> {
-      return engine_factory_.createEngine<lua::LuaScriptEngine>();
-    };
 
-    lua_script_engine.emplace(lua_script_engine_queue_->getResource(create_engine));
+    lua_script_engine.emplace(lua_script_engine_queue_->getResource());
     engine = lua_script_engine->get();
 #else
     throw std::runtime_error("Lua support is disabled in this build.");
diff --git a/extensions/splunk/PutSplunkHTTP.cpp b/extensions/splunk/PutSplunkHTTP.cpp
index 0969d836e..f51f4f3af 100644
--- a/extensions/splunk/PutSplunkHTTP.cpp
+++ b/extensions/splunk/PutSplunkHTTP.cpp
@@ -40,32 +40,26 @@ void PutSplunkHTTP::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PutSplunkHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
-  SplunkHECProcessor::onSchedule(context, sessionFactory);
-  client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(getMaxConcurrentTasks(), logger_);
-}
-
 namespace {
 std::optional<std::string> getContentType(core::ProcessContext& context, const core::FlowFile& flow_file) {
   return context.getProperty(PutSplunkHTTP::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");});
 }
 
-std::string getEndpoint(core::ProcessContext& context, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file, curl::HTTPClient& client) {
+std::string getEndpoint(core::ProcessContext& context, curl::HTTPClient& client) {
   std::stringstream endpoint;
   endpoint << "/services/collector/raw";
   std::vector<std::string> parameters;
-  std::string prop_value;
-  if (context.getProperty(PutSplunkHTTP::SourceType, prop_value, flow_file)) {
-    parameters.push_back("sourcetype=" + client.escape(prop_value));
+  if (auto source_type = context.getProperty(PutSplunkHTTP::SourceType)) {
+    parameters.push_back("sourcetype=" + client.escape(*source_type));
   }
-  if (context.getProperty(PutSplunkHTTP::Source, prop_value, flow_file)) {
-    parameters.push_back("source=" + client.escape(prop_value));
+  if (auto source = context.getProperty(PutSplunkHTTP::Source)) {
+    parameters.push_back("source=" + client.escape(*source));
   }
-  if (context.getProperty(PutSplunkHTTP::Host, prop_value, flow_file)) {
-    parameters.push_back("host=" + client.escape(prop_value));
+  if (auto host = context.getProperty(PutSplunkHTTP::Host)) {
+    parameters.push_back("host=" + client.escape(*host));
   }
-  if (context.getProperty(PutSplunkHTTP::Index, prop_value, flow_file)) {
-    parameters.push_back("index=" + client.escape(prop_value));
+  if (auto index = context.getProperty(PutSplunkHTTP::Index)) {
+    parameters.push_back("index=" + client.escape(*index));
   }
   if (!parameters.empty()) {
     endpoint << "?" << utils::StringUtils::join("&", parameters);
@@ -117,6 +111,21 @@ void setFlowFileAsPayload(core::ProcessSession& session,
 }
 }  // namespace
 
+void PutSplunkHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  SplunkHECProcessor::onSchedule(context, sessionFactory);
+  std::weak_ptr<core::ProcessContext> weak_context = context;
+  auto create_client = [this, weak_context]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
+    if (auto context = weak_context.lock()) {
+      auto client = std::make_unique<curl::HTTPClient>();
+      initializeClient(*client, getNetworkLocation().append(getEndpoint(*context, *client)), getSSLContextService(*context));
+      return client;
+    }
+    return nullptr;
+  };
+
+  client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_);
+}
+
 void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
   gsl_Expects(context && session && client_queue_);
 
@@ -127,13 +136,7 @@ void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& conte
   }
   auto flow_file = gsl::not_null(std::move(ff));
 
-  auto create_client = [&]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
-    auto client = std::make_unique<curl::HTTPClient>();
-    initializeClient(*client, getNetworkLocation().append(getEndpoint(*context, flow_file, *client)), getSSLContextService(*context));
-    return client;
-  };
-
-  auto client = client_queue_->getResource(create_client);
+  auto client = client_queue_->getResource();
 
   setFlowFileAsPayload(*session, *context, *client, flow_file);
 
@@ -145,4 +148,3 @@ void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& conte
 }
 
 }  // namespace org::apache::nifi::minifi::extensions::splunk
-
diff --git a/libminifi/include/utils/ResourceQueue.h b/libminifi/include/utils/ResourceQueue.h
index acb5be4fa..7385c18c4 100644
--- a/libminifi/include/utils/ResourceQueue.h
+++ b/libminifi/include/utils/ResourceQueue.h
@@ -65,10 +65,12 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
     std::unique_ptr<ResourceType> resource_;
   };
 
-  static auto create(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger);
+  static auto create(std::function<std::unique_ptr<ResourceType>()> creator,
+                     std::optional<size_t> maximum_number_of_creatable_resources = std::nullopt,
+                     std::optional<std::function<void(ResourceType&)>> reset_fn = std::nullopt,
+                     std::shared_ptr<core::logging::Logger> logger = nullptr);
 
-  template<typename Fn>
-  [[nodiscard]] std::enable_if_t<std::is_invocable_v<std::unique_ptr<ResourceType>()>, ResourceWrapper> getResource(const Fn& create_resource) {
+  [[nodiscard]] ResourceWrapper getResource() {
     std::unique_ptr<ResourceType> resource;
     // Use an existing resource, if one is available
     if (internal_queue_.tryDequeue(resource)) {
@@ -78,7 +80,7 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
       const std::lock_guard<std::mutex> lock(counter_mutex_);
       if (!maximum_number_of_creatable_resources_ || resources_created_ < maximum_number_of_creatable_resources_) {
         ++resources_created_;
-        resource = create_resource();
+        resource = create_new_resource_();
         logDebug("Created new [%p] resource instance. Number of instances: %d%s.",
                  resource.get(),
                  resources_created_,
@@ -94,14 +96,21 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
   }
 
  protected:
-  ResourceQueue(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger)
-      : maximum_number_of_creatable_resources_(maximum_number_of_creatable_resources),
+  ResourceQueue(std::function<std::unique_ptr<ResourceType>()> create_new_resource,
+                std::optional<size_t> maximum_number_of_creatable_resources,
+                std::optional<std::function<void(ResourceType&)>> reset_fn,
+                std::shared_ptr<core::logging::Logger> logger)
+      : create_new_resource_(std::move(create_new_resource)),
+        maximum_number_of_creatable_resources_(maximum_number_of_creatable_resources),
+        reset_fn_(std::move(reset_fn)),
         logger_(std::move(logger)) {
   }
 
  private:
   void returnResource(std::unique_ptr<ResourceType> resource) {
     logDebug("Returning [%p] resource", resource.get());
+    if (reset_fn_)
+      reset_fn_.value()(*resource);
     internal_queue_.enqueue(std::move(resource));
   }
 
@@ -111,8 +120,10 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
       logger_->log_debug(format, std::forward<Args>(args)...);
   }
 
+  const std::function<std::unique_ptr<ResourceType>()> create_new_resource_;
   const std::optional<size_t> maximum_number_of_creatable_resources_;
-  std::shared_ptr<core::logging::Logger> logger_;
+  const std::optional<std::function<void(ResourceType&)>> reset_fn_;
+  const std::shared_ptr<core::logging::Logger> logger_;
   ConditionConcurrentQueue<std::unique_ptr<ResourceType>> internal_queue_;
   size_t resources_created_ = 0;
   std::mutex counter_mutex_;
@@ -126,7 +137,10 @@ struct ResourceQueue<ResourceType>::make_shared_enabler : public ResourceQueue<R
 };
 
 template<class ResourceType>
-auto ResourceQueue<ResourceType>::create(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger) {
-  return std::make_shared<make_shared_enabler>(maximum_number_of_creatable_resources, std::move(logger));
+auto ResourceQueue<ResourceType>::create(std::function<std::unique_ptr<ResourceType>()> creator,
+                                         std::optional<size_t> maximum_number_of_creatable_resources,
+                                         std::optional<std::function<void(ResourceType&)>> reset_fn,
+                                         std::shared_ptr<core::logging::Logger> logger) {
+  return std::make_shared<make_shared_enabler>(std::move(creator), maximum_number_of_creatable_resources, std::move(reset_fn), std::move(logger));
 }
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/test/unit/ResourceQueueTests.cpp b/libminifi/test/unit/ResourceQueueTests.cpp
index 526050477..8266d3b50 100644
--- a/libminifi/test/unit/ResourceQueueTests.cpp
+++ b/libminifi/test/unit/ResourceQueueTests.cpp
@@ -29,23 +29,26 @@ using namespace std::literals::chrono_literals;
 namespace org::apache::nifi::minifi::utils::testing {
 
 TEST_CASE("Limiting resource queue to a maximum of 2 resources", "[utils::ResourceQueue]") {
-  std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResourceQueue<int>>::getLogger()};
-  LogTestController::getInstance().setTrace<ResourceQueue<int>>();
+  using std::chrono::steady_clock;
+
+  std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()};
+
+  LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>();
 
   std::mutex resources_created_mutex;
-  std::set<int> resources_created;
+  std::set<steady_clock::time_point> resources_created;
 
-  auto worker = [&](int value, const std::shared_ptr<ResourceQueue<int>>& resource_queue) {
-    auto resource = resource_queue->getResource([value]{return std::make_unique<int>(value);});
+  auto worker = [&](const std::shared_ptr<ResourceQueue<steady_clock::time_point>>& resource_queue) {
+    auto resource = resource_queue->getResource();
     std::this_thread::sleep_for(10ms);
     std::lock_guard<std::mutex> lock(resources_created_mutex);
     resources_created.emplace(*resource);
   };
 
-  auto resource_queue = ResourceQueue<int>::create(2, logger_);
-  std::thread thread_one{[&] { worker(1, resource_queue); }};
-  std::thread thread_two{[&] { worker(2, resource_queue); }};
-  std::thread thread_three{[&] { worker(3, resource_queue); }};
+  auto resource_queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::now()); }, 2, std::nullopt, logger);
+  std::thread thread_one{[&] { worker(resource_queue); }};
+  std::thread thread_two{[&] { worker(resource_queue); }};
+  std::thread thread_three{[&] { worker(resource_queue); }};
 
   thread_one.join();
   thread_two.join();
@@ -56,15 +59,17 @@ TEST_CASE("Limiting resource queue to a maximum of 2 resources", "[utils::Resour
 }
 
 TEST_CASE("Resource limitation is not set to the resource queue", "[utils::ResourceQueue]") {
-  std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResourceQueue<int>>::getLogger()};
-  LogTestController::getInstance().setTrace<ResourceQueue<int>>();
+  using std::chrono::steady_clock;
+
+  std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()};
+  LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>();
   LogTestController::getInstance().clear();
-  auto resource_queue = ResourceQueue<int>::create(std::nullopt, logger_);
-  std::set<int> resources_created;
+  auto resource_queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::now()); }, std::nullopt, std::nullopt, logger);
+  std::set<steady_clock::time_point> resources_created;
 
-  auto resource_wrapper_one = resource_queue->getResource([]{return std::make_unique<int>(1);});
-  auto resource_wrapper_two = resource_queue->getResource([]{return std::make_unique<int>(2);});
-  auto resource_wrapper_three = resource_queue->getResource([]{return std::make_unique<int>(3);});
+  auto resource_wrapper_one = resource_queue->getResource();
+  auto resource_wrapper_two = resource_queue->getResource();
+  auto resource_wrapper_three = resource_queue->getResource();
 
   resources_created.emplace(*resource_wrapper_one);
   resources_created.emplace(*resource_wrapper_two);
@@ -76,20 +81,46 @@ TEST_CASE("Resource limitation is not set to the resource queue", "[utils::Resou
 }
 
 TEST_CASE("resource returns when it goes out of scope", "[utils::ResourceQueue]") {
-  auto queue = utils::ResourceQueue<int>::create(std::nullopt, nullptr);
+  using std::chrono::steady_clock;
+  auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); });
+  {
+    auto resource = queue->getResource();
+    CHECK(*resource == steady_clock::time_point::min());
+    *resource = steady_clock::now();
+  }
+  {
+    auto resource = queue->getResource();
+    CHECK(*resource != steady_clock::time_point::min());
+  }
+}
+
+TEST_CASE("resource resets when it goes out of scope", "[utils::ResourceQueue]") {
+  using std::chrono::steady_clock;
+  std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()};
+  LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>();
+  LogTestController::getInstance().clear();
+  auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); },
+                                                                      std::nullopt,
+                                                                      [](steady_clock::time_point& resource){ resource = steady_clock::time_point::min();},
+                                                                      logger);
   {
-    auto resource = queue->getResource([] { return std::make_unique<int>(1); });
-    CHECK(*resource == 1);
+    auto resource = queue->getResource();
+    CHECK(*resource == steady_clock::time_point::min());
+    *resource = steady_clock::now();
   }
   {
-    auto resource = queue->getResource([] { return std::make_unique<int>(2); });
-    CHECK(*resource == 1);
+    CHECK(LogTestController::getInstance().matchesRegex("Returning .* resource", 0ms));
+    auto resource = queue->getResource();
+    CHECK(*resource == steady_clock::time_point::min());
+    CHECK(LogTestController::getInstance().matchesRegex("Using available .* resource instance", 0ms));
   }
 }
 
 TEST_CASE("queue destroyed before resource", "[utils::ResourceQueue]") {
-  auto queue = utils::ResourceQueue<int>::create(std::nullopt, nullptr);
-  auto resource = queue->getResource([]{ return std::make_unique<int>(1); });
+  using std::chrono::steady_clock;
+  auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); });
+  auto resource = queue->getResource();
   REQUIRE_NOTHROW(queue.reset());
+  REQUIRE_NOTHROW(*resource);
 }
 }  // namespace org::apache::nifi::minifi::utils::testing