You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2023/01/18 19:31:11 UTC

[nifi-minifi-cpp] 01/02: MINIFICPP-1973 Refactor ResourceQueue

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e6d8271870c37a4dcea87a35b9b70e5e5b178ee5
Author: Martin Zink <ma...@apache.org>
AuthorDate: Tue Nov 29 13:10:00 2022 +0100

    MINIFICPP-1973 Refactor ResourceQueue
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1473
---
 extensions/http-curl/processors/InvokeHTTP.cpp | 15 +++--
 extensions/script/ExecuteScript.cpp            | 10 ++--
 extensions/splunk/PutSplunkHTTP.cpp            | 48 ++++++++--------
 libminifi/include/utils/ResourceQueue.h        | 32 ++++++++---
 libminifi/test/unit/ResourceQueueTests.cpp     | 77 ++++++++++++++++++--------
 5 files changed, 117 insertions(+), 65 deletions(-)

diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index b7397badd..9e803eec1 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -263,7 +263,15 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context
   gsl_Expects(context);
 
   setupMembersFromProperties(*context);
-  client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(getMaxConcurrentTasks(), logger_);
+  std::weak_ptr<core::ProcessContext> weak_context = context;
+  auto create_client = [this, weak_context]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
+    if (auto context = weak_context.lock())
+      return createHTTPClientFromPropertiesAndMembers(*context);
+    else
+      return nullptr;
+  };
+
+  client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_);
 }
 
 bool InvokeHTTP::shouldEmitFlowFile(minifi::extensions::curl::HTTPClient& client) {
@@ -306,11 +314,8 @@ bool InvokeHTTP::appendHeaders(const core::FlowFile& flow_file, /*std::invocable
 
 void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
   gsl_Expects(session && context && client_queue_);
-  auto create_client = [&]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
-    return createHTTPClientFromPropertiesAndMembers(*context);
-  };
 
-  auto client = client_queue_->getResource(create_client);
+  auto client = client_queue_->getResource();
 
   onTriggerWithClient(context, session, *client);
 }
diff --git a/extensions/script/ExecuteScript.cpp b/extensions/script/ExecuteScript.cpp
index 28717c122..7124abbba 100644
--- a/extensions/script/ExecuteScript.cpp
+++ b/extensions/script/ExecuteScript.cpp
@@ -71,7 +71,10 @@ void ExecuteScript::initialize() {
 
 void ExecuteScript::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory* /*sessionFactory*/) {
 #ifdef LUA_SUPPORT
-  lua_script_engine_queue_ = utils::ResourceQueue<lua::LuaScriptEngine>::create(getMaxConcurrentTasks(), logger_);
+  auto create_engine = [this]() -> std::unique_ptr<lua::LuaScriptEngine> {
+    return engine_factory_.createEngine<lua::LuaScriptEngine>();
+  };
+  lua_script_engine_queue_ = utils::ResourceQueue<lua::LuaScriptEngine>::create(create_engine, getMaxConcurrentTasks(), std::nullopt, logger_);
 #endif  // LUA_SUPPORT
 #ifdef PYTHON_SUPPORT
   python_script_engine_ = engine_factory_.createEngine<python::PythonScriptEngine>();
@@ -114,11 +117,8 @@ void ExecuteScript::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
   } else if (script_engine_ == ScriptEngineOption::LUA) {
 #ifdef LUA_SUPPORT
     gsl_Expects(lua_script_engine_queue_);
-    auto create_engine = [&]() -> std::unique_ptr<lua::LuaScriptEngine> {
-      return engine_factory_.createEngine<lua::LuaScriptEngine>();
-    };
 
-    lua_script_engine.emplace(lua_script_engine_queue_->getResource(create_engine));
+    lua_script_engine.emplace(lua_script_engine_queue_->getResource());
     engine = lua_script_engine->get();
 #else
     throw std::runtime_error("Lua support is disabled in this build.");
diff --git a/extensions/splunk/PutSplunkHTTP.cpp b/extensions/splunk/PutSplunkHTTP.cpp
index 0969d836e..f51f4f3af 100644
--- a/extensions/splunk/PutSplunkHTTP.cpp
+++ b/extensions/splunk/PutSplunkHTTP.cpp
@@ -40,32 +40,26 @@ void PutSplunkHTTP::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PutSplunkHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
-  SplunkHECProcessor::onSchedule(context, sessionFactory);
-  client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(getMaxConcurrentTasks(), logger_);
-}
-
 namespace {
 std::optional<std::string> getContentType(core::ProcessContext& context, const core::FlowFile& flow_file) {
   return context.getProperty(PutSplunkHTTP::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");});
 }
 
-std::string getEndpoint(core::ProcessContext& context, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file, curl::HTTPClient& client) {
+std::string getEndpoint(core::ProcessContext& context, curl::HTTPClient& client) {
   std::stringstream endpoint;
   endpoint << "/services/collector/raw";
   std::vector<std::string> parameters;
-  std::string prop_value;
-  if (context.getProperty(PutSplunkHTTP::SourceType, prop_value, flow_file)) {
-    parameters.push_back("sourcetype=" + client.escape(prop_value));
+  if (auto source_type = context.getProperty(PutSplunkHTTP::SourceType)) {
+    parameters.push_back("sourcetype=" + client.escape(*source_type));
   }
-  if (context.getProperty(PutSplunkHTTP::Source, prop_value, flow_file)) {
-    parameters.push_back("source=" + client.escape(prop_value));
+  if (auto source = context.getProperty(PutSplunkHTTP::Source)) {
+    parameters.push_back("source=" + client.escape(*source));
   }
-  if (context.getProperty(PutSplunkHTTP::Host, prop_value, flow_file)) {
-    parameters.push_back("host=" + client.escape(prop_value));
+  if (auto host = context.getProperty(PutSplunkHTTP::Host)) {
+    parameters.push_back("host=" + client.escape(*host));
   }
-  if (context.getProperty(PutSplunkHTTP::Index, prop_value, flow_file)) {
-    parameters.push_back("index=" + client.escape(prop_value));
+  if (auto index = context.getProperty(PutSplunkHTTP::Index)) {
+    parameters.push_back("index=" + client.escape(*index));
   }
   if (!parameters.empty()) {
     endpoint << "?" << utils::StringUtils::join("&", parameters);
@@ -117,6 +111,21 @@ void setFlowFileAsPayload(core::ProcessSession& session,
 }
 }  // namespace
 
+void PutSplunkHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  SplunkHECProcessor::onSchedule(context, sessionFactory);
+  std::weak_ptr<core::ProcessContext> weak_context = context;
+  auto create_client = [this, weak_context]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
+    if (auto context = weak_context.lock()) {
+      auto client = std::make_unique<curl::HTTPClient>();
+      initializeClient(*client, getNetworkLocation().append(getEndpoint(*context, *client)), getSSLContextService(*context));
+      return client;
+    }
+    return nullptr;
+  };
+
+  client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_);
+}
+
 void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
   gsl_Expects(context && session && client_queue_);
 
@@ -127,13 +136,7 @@ void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& conte
   }
   auto flow_file = gsl::not_null(std::move(ff));
 
-  auto create_client = [&]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
-    auto client = std::make_unique<curl::HTTPClient>();
-    initializeClient(*client, getNetworkLocation().append(getEndpoint(*context, flow_file, *client)), getSSLContextService(*context));
-    return client;
-  };
-
-  auto client = client_queue_->getResource(create_client);
+  auto client = client_queue_->getResource();
 
   setFlowFileAsPayload(*session, *context, *client, flow_file);
 
@@ -145,4 +148,3 @@ void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& conte
 }
 
 }  // namespace org::apache::nifi::minifi::extensions::splunk
-
diff --git a/libminifi/include/utils/ResourceQueue.h b/libminifi/include/utils/ResourceQueue.h
index acb5be4fa..7385c18c4 100644
--- a/libminifi/include/utils/ResourceQueue.h
+++ b/libminifi/include/utils/ResourceQueue.h
@@ -65,10 +65,12 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
     std::unique_ptr<ResourceType> resource_;
   };
 
-  static auto create(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger);
+  static auto create(std::function<std::unique_ptr<ResourceType>()> creator,
+                     std::optional<size_t> maximum_number_of_creatable_resources = std::nullopt,
+                     std::optional<std::function<void(ResourceType&)>> reset_fn = std::nullopt,
+                     std::shared_ptr<core::logging::Logger> logger = nullptr);
 
-  template<typename Fn>
-  [[nodiscard]] std::enable_if_t<std::is_invocable_v<std::unique_ptr<ResourceType>()>, ResourceWrapper> getResource(const Fn& create_resource) {
+  [[nodiscard]] ResourceWrapper getResource() {
     std::unique_ptr<ResourceType> resource;
     // Use an existing resource, if one is available
     if (internal_queue_.tryDequeue(resource)) {
@@ -78,7 +80,7 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
       const std::lock_guard<std::mutex> lock(counter_mutex_);
       if (!maximum_number_of_creatable_resources_ || resources_created_ < maximum_number_of_creatable_resources_) {
         ++resources_created_;
-        resource = create_resource();
+        resource = create_new_resource_();
         logDebug("Created new [%p] resource instance. Number of instances: %d%s.",
                  resource.get(),
                  resources_created_,
@@ -94,14 +96,21 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
   }
 
  protected:
-  ResourceQueue(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger)
-      : maximum_number_of_creatable_resources_(maximum_number_of_creatable_resources),
+  ResourceQueue(std::function<std::unique_ptr<ResourceType>()> create_new_resource,
+                std::optional<size_t> maximum_number_of_creatable_resources,
+                std::optional<std::function<void(ResourceType&)>> reset_fn,
+                std::shared_ptr<core::logging::Logger> logger)
+      : create_new_resource_(std::move(create_new_resource)),
+        maximum_number_of_creatable_resources_(maximum_number_of_creatable_resources),
+        reset_fn_(std::move(reset_fn)),
         logger_(std::move(logger)) {
   }
 
  private:
   void returnResource(std::unique_ptr<ResourceType> resource) {
     logDebug("Returning [%p] resource", resource.get());
+    if (reset_fn_)
+      reset_fn_.value()(*resource);
     internal_queue_.enqueue(std::move(resource));
   }
 
@@ -111,8 +120,10 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
       logger_->log_debug(format, std::forward<Args>(args)...);
   }
 
+  const std::function<std::unique_ptr<ResourceType>()> create_new_resource_;
   const std::optional<size_t> maximum_number_of_creatable_resources_;
-  std::shared_ptr<core::logging::Logger> logger_;
+  const std::optional<std::function<void(ResourceType&)>> reset_fn_;
+  const std::shared_ptr<core::logging::Logger> logger_;
   ConditionConcurrentQueue<std::unique_ptr<ResourceType>> internal_queue_;
   size_t resources_created_ = 0;
   std::mutex counter_mutex_;
@@ -126,7 +137,10 @@ struct ResourceQueue<ResourceType>::make_shared_enabler : public ResourceQueue<R
 };
 
 template<class ResourceType>
-auto ResourceQueue<ResourceType>::create(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger) {
-  return std::make_shared<make_shared_enabler>(maximum_number_of_creatable_resources, std::move(logger));
+auto ResourceQueue<ResourceType>::create(std::function<std::unique_ptr<ResourceType>()> creator,
+                                         std::optional<size_t> maximum_number_of_creatable_resources,
+                                         std::optional<std::function<void(ResourceType&)>> reset_fn,
+                                         std::shared_ptr<core::logging::Logger> logger) {
+  return std::make_shared<make_shared_enabler>(std::move(creator), maximum_number_of_creatable_resources, std::move(reset_fn), std::move(logger));
 }
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/test/unit/ResourceQueueTests.cpp b/libminifi/test/unit/ResourceQueueTests.cpp
index 526050477..8266d3b50 100644
--- a/libminifi/test/unit/ResourceQueueTests.cpp
+++ b/libminifi/test/unit/ResourceQueueTests.cpp
@@ -29,23 +29,26 @@ using namespace std::literals::chrono_literals;
 namespace org::apache::nifi::minifi::utils::testing {
 
 TEST_CASE("Limiting resource queue to a maximum of 2 resources", "[utils::ResourceQueue]") {
-  std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResourceQueue<int>>::getLogger()};
-  LogTestController::getInstance().setTrace<ResourceQueue<int>>();
+  using std::chrono::steady_clock;
+
+  std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()};
+
+  LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>();
 
   std::mutex resources_created_mutex;
-  std::set<int> resources_created;
+  std::set<steady_clock::time_point> resources_created;
 
-  auto worker = [&](int value, const std::shared_ptr<ResourceQueue<int>>& resource_queue) {
-    auto resource = resource_queue->getResource([value]{return std::make_unique<int>(value);});
+  auto worker = [&](const std::shared_ptr<ResourceQueue<steady_clock::time_point>>& resource_queue) {
+    auto resource = resource_queue->getResource();
     std::this_thread::sleep_for(10ms);
     std::lock_guard<std::mutex> lock(resources_created_mutex);
     resources_created.emplace(*resource);
   };
 
-  auto resource_queue = ResourceQueue<int>::create(2, logger_);
-  std::thread thread_one{[&] { worker(1, resource_queue); }};
-  std::thread thread_two{[&] { worker(2, resource_queue); }};
-  std::thread thread_three{[&] { worker(3, resource_queue); }};
+  auto resource_queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::now()); }, 2, std::nullopt, logger);
+  std::thread thread_one{[&] { worker(resource_queue); }};
+  std::thread thread_two{[&] { worker(resource_queue); }};
+  std::thread thread_three{[&] { worker(resource_queue); }};
 
   thread_one.join();
   thread_two.join();
@@ -56,15 +59,17 @@ TEST_CASE("Limiting resource queue to a maximum of 2 resources", "[utils::Resour
 }
 
 TEST_CASE("Resource limitation is not set to the resource queue", "[utils::ResourceQueue]") {
-  std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResourceQueue<int>>::getLogger()};
-  LogTestController::getInstance().setTrace<ResourceQueue<int>>();
+  using std::chrono::steady_clock;
+
+  std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()};
+  LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>();
   LogTestController::getInstance().clear();
-  auto resource_queue = ResourceQueue<int>::create(std::nullopt, logger_);
-  std::set<int> resources_created;
+  auto resource_queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::now()); }, std::nullopt, std::nullopt, logger);
+  std::set<steady_clock::time_point> resources_created;
 
-  auto resource_wrapper_one = resource_queue->getResource([]{return std::make_unique<int>(1);});
-  auto resource_wrapper_two = resource_queue->getResource([]{return std::make_unique<int>(2);});
-  auto resource_wrapper_three = resource_queue->getResource([]{return std::make_unique<int>(3);});
+  auto resource_wrapper_one = resource_queue->getResource();
+  auto resource_wrapper_two = resource_queue->getResource();
+  auto resource_wrapper_three = resource_queue->getResource();
 
   resources_created.emplace(*resource_wrapper_one);
   resources_created.emplace(*resource_wrapper_two);
@@ -76,20 +81,46 @@ TEST_CASE("Resource limitation is not set to the resource queue", "[utils::Resou
 }
 
 TEST_CASE("resource returns when it goes out of scope", "[utils::ResourceQueue]") {
-  auto queue = utils::ResourceQueue<int>::create(std::nullopt, nullptr);
+  using std::chrono::steady_clock;
+  auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); });
+  {
+    auto resource = queue->getResource();
+    CHECK(*resource == steady_clock::time_point::min());
+    *resource = steady_clock::now();
+  }
+  {
+    auto resource = queue->getResource();
+    CHECK(*resource != steady_clock::time_point::min());
+  }
+}
+
+TEST_CASE("resource resets when it goes out of scope", "[utils::ResourceQueue]") {
+  using std::chrono::steady_clock;
+  std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()};
+  LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>();
+  LogTestController::getInstance().clear();
+  auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); },
+                                                                      std::nullopt,
+                                                                      [](steady_clock::time_point& resource){ resource = steady_clock::time_point::min();},
+                                                                      logger);
   {
-    auto resource = queue->getResource([] { return std::make_unique<int>(1); });
-    CHECK(*resource == 1);
+    auto resource = queue->getResource();
+    CHECK(*resource == steady_clock::time_point::min());
+    *resource = steady_clock::now();
   }
   {
-    auto resource = queue->getResource([] { return std::make_unique<int>(2); });
-    CHECK(*resource == 1);
+    CHECK(LogTestController::getInstance().matchesRegex("Returning .* resource", 0ms));
+    auto resource = queue->getResource();
+    CHECK(*resource == steady_clock::time_point::min());
+    CHECK(LogTestController::getInstance().matchesRegex("Using available .* resource instance", 0ms));
   }
 }
 
 TEST_CASE("queue destroyed before resource", "[utils::ResourceQueue]") {
-  auto queue = utils::ResourceQueue<int>::create(std::nullopt, nullptr);
-  auto resource = queue->getResource([]{ return std::make_unique<int>(1); });
+  using std::chrono::steady_clock;
+  auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); });
+  auto resource = queue->getResource();
   REQUIRE_NOTHROW(queue.reset());
+  REQUIRE_NOTHROW(*resource);
 }
 }  // namespace org::apache::nifi::minifi::utils::testing