You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2023/01/18 19:31:11 UTC
[nifi-minifi-cpp] 01/02: MINIFICPP-1973 Refactor ResourceQueue
This is an automated email from the ASF dual-hosted git repository.
fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit e6d8271870c37a4dcea87a35b9b70e5e5b178ee5
Author: Martin Zink <ma...@apache.org>
AuthorDate: Tue Nov 29 13:10:00 2022 +0100
MINIFICPP-1973 Refactor ResourceQueue
Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
This closes #1473
---
extensions/http-curl/processors/InvokeHTTP.cpp | 15 +++--
extensions/script/ExecuteScript.cpp | 10 ++--
extensions/splunk/PutSplunkHTTP.cpp | 48 ++++++++--------
libminifi/include/utils/ResourceQueue.h | 32 ++++++++---
libminifi/test/unit/ResourceQueueTests.cpp | 77 ++++++++++++++++++--------
5 files changed, 117 insertions(+), 65 deletions(-)
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index b7397badd..9e803eec1 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -263,7 +263,15 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context
gsl_Expects(context);
setupMembersFromProperties(*context);
- client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(getMaxConcurrentTasks(), logger_);
+ std::weak_ptr<core::ProcessContext> weak_context = context;
+ auto create_client = [this, weak_context]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
+ if (auto context = weak_context.lock())
+ return createHTTPClientFromPropertiesAndMembers(*context);
+ else
+ return nullptr;
+ };
+
+ client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_);
}
bool InvokeHTTP::shouldEmitFlowFile(minifi::extensions::curl::HTTPClient& client) {
@@ -306,11 +314,8 @@ bool InvokeHTTP::appendHeaders(const core::FlowFile& flow_file, /*std::invocable
void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
gsl_Expects(session && context && client_queue_);
- auto create_client = [&]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
- return createHTTPClientFromPropertiesAndMembers(*context);
- };
- auto client = client_queue_->getResource(create_client);
+ auto client = client_queue_->getResource();
onTriggerWithClient(context, session, *client);
}
diff --git a/extensions/script/ExecuteScript.cpp b/extensions/script/ExecuteScript.cpp
index 28717c122..7124abbba 100644
--- a/extensions/script/ExecuteScript.cpp
+++ b/extensions/script/ExecuteScript.cpp
@@ -71,7 +71,10 @@ void ExecuteScript::initialize() {
void ExecuteScript::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory* /*sessionFactory*/) {
#ifdef LUA_SUPPORT
- lua_script_engine_queue_ = utils::ResourceQueue<lua::LuaScriptEngine>::create(getMaxConcurrentTasks(), logger_);
+ auto create_engine = [this]() -> std::unique_ptr<lua::LuaScriptEngine> {
+ return engine_factory_.createEngine<lua::LuaScriptEngine>();
+ };
+ lua_script_engine_queue_ = utils::ResourceQueue<lua::LuaScriptEngine>::create(create_engine, getMaxConcurrentTasks(), std::nullopt, logger_);
#endif // LUA_SUPPORT
#ifdef PYTHON_SUPPORT
python_script_engine_ = engine_factory_.createEngine<python::PythonScriptEngine>();
@@ -114,11 +117,8 @@ void ExecuteScript::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
} else if (script_engine_ == ScriptEngineOption::LUA) {
#ifdef LUA_SUPPORT
gsl_Expects(lua_script_engine_queue_);
- auto create_engine = [&]() -> std::unique_ptr<lua::LuaScriptEngine> {
- return engine_factory_.createEngine<lua::LuaScriptEngine>();
- };
- lua_script_engine.emplace(lua_script_engine_queue_->getResource(create_engine));
+ lua_script_engine.emplace(lua_script_engine_queue_->getResource());
engine = lua_script_engine->get();
#else
throw std::runtime_error("Lua support is disabled in this build.");
diff --git a/extensions/splunk/PutSplunkHTTP.cpp b/extensions/splunk/PutSplunkHTTP.cpp
index 0969d836e..f51f4f3af 100644
--- a/extensions/splunk/PutSplunkHTTP.cpp
+++ b/extensions/splunk/PutSplunkHTTP.cpp
@@ -40,32 +40,26 @@ void PutSplunkHTTP::initialize() {
setSupportedRelationships(relationships());
}
-void PutSplunkHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
- SplunkHECProcessor::onSchedule(context, sessionFactory);
- client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(getMaxConcurrentTasks(), logger_);
-}
-
namespace {
std::optional<std::string> getContentType(core::ProcessContext& context, const core::FlowFile& flow_file) {
return context.getProperty(PutSplunkHTTP::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");});
}
-std::string getEndpoint(core::ProcessContext& context, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file, curl::HTTPClient& client) {
+std::string getEndpoint(core::ProcessContext& context, curl::HTTPClient& client) {
std::stringstream endpoint;
endpoint << "/services/collector/raw";
std::vector<std::string> parameters;
- std::string prop_value;
- if (context.getProperty(PutSplunkHTTP::SourceType, prop_value, flow_file)) {
- parameters.push_back("sourcetype=" + client.escape(prop_value));
+ if (auto source_type = context.getProperty(PutSplunkHTTP::SourceType)) {
+ parameters.push_back("sourcetype=" + client.escape(*source_type));
}
- if (context.getProperty(PutSplunkHTTP::Source, prop_value, flow_file)) {
- parameters.push_back("source=" + client.escape(prop_value));
+ if (auto source = context.getProperty(PutSplunkHTTP::Source)) {
+ parameters.push_back("source=" + client.escape(*source));
}
- if (context.getProperty(PutSplunkHTTP::Host, prop_value, flow_file)) {
- parameters.push_back("host=" + client.escape(prop_value));
+ if (auto host = context.getProperty(PutSplunkHTTP::Host)) {
+ parameters.push_back("host=" + client.escape(*host));
}
- if (context.getProperty(PutSplunkHTTP::Index, prop_value, flow_file)) {
- parameters.push_back("index=" + client.escape(prop_value));
+ if (auto index = context.getProperty(PutSplunkHTTP::Index)) {
+ parameters.push_back("index=" + client.escape(*index));
}
if (!parameters.empty()) {
endpoint << "?" << utils::StringUtils::join("&", parameters);
@@ -117,6 +111,21 @@ void setFlowFileAsPayload(core::ProcessSession& session,
}
} // namespace
+void PutSplunkHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ SplunkHECProcessor::onSchedule(context, sessionFactory);
+ std::weak_ptr<core::ProcessContext> weak_context = context;
+ auto create_client = [this, weak_context]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
+ if (auto context = weak_context.lock()) {
+ auto client = std::make_unique<curl::HTTPClient>();
+ initializeClient(*client, getNetworkLocation().append(getEndpoint(*context, *client)), getSSLContextService(*context));
+ return client;
+ }
+ return nullptr;
+ };
+
+ client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_);
+}
+
void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
gsl_Expects(context && session && client_queue_);
@@ -127,13 +136,7 @@ void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& conte
}
auto flow_file = gsl::not_null(std::move(ff));
- auto create_client = [&]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
- auto client = std::make_unique<curl::HTTPClient>();
- initializeClient(*client, getNetworkLocation().append(getEndpoint(*context, flow_file, *client)), getSSLContextService(*context));
- return client;
- };
-
- auto client = client_queue_->getResource(create_client);
+ auto client = client_queue_->getResource();
setFlowFileAsPayload(*session, *context, *client, flow_file);
@@ -145,4 +148,3 @@ void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& conte
}
} // namespace org::apache::nifi::minifi::extensions::splunk
-
diff --git a/libminifi/include/utils/ResourceQueue.h b/libminifi/include/utils/ResourceQueue.h
index acb5be4fa..7385c18c4 100644
--- a/libminifi/include/utils/ResourceQueue.h
+++ b/libminifi/include/utils/ResourceQueue.h
@@ -65,10 +65,12 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
std::unique_ptr<ResourceType> resource_;
};
- static auto create(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger);
+ static auto create(std::function<std::unique_ptr<ResourceType>()> creator,
+ std::optional<size_t> maximum_number_of_creatable_resources = std::nullopt,
+ std::optional<std::function<void(ResourceType&)>> reset_fn = std::nullopt,
+ std::shared_ptr<core::logging::Logger> logger = nullptr);
- template<typename Fn>
- [[nodiscard]] std::enable_if_t<std::is_invocable_v<std::unique_ptr<ResourceType>()>, ResourceWrapper> getResource(const Fn& create_resource) {
+ [[nodiscard]] ResourceWrapper getResource() {
std::unique_ptr<ResourceType> resource;
// Use an existing resource, if one is available
if (internal_queue_.tryDequeue(resource)) {
@@ -78,7 +80,7 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
const std::lock_guard<std::mutex> lock(counter_mutex_);
if (!maximum_number_of_creatable_resources_ || resources_created_ < maximum_number_of_creatable_resources_) {
++resources_created_;
- resource = create_resource();
+ resource = create_new_resource_();
logDebug("Created new [%p] resource instance. Number of instances: %d%s.",
resource.get(),
resources_created_,
@@ -94,14 +96,21 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
}
protected:
- ResourceQueue(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger)
- : maximum_number_of_creatable_resources_(maximum_number_of_creatable_resources),
+ ResourceQueue(std::function<std::unique_ptr<ResourceType>()> create_new_resource,
+ std::optional<size_t> maximum_number_of_creatable_resources,
+ std::optional<std::function<void(ResourceType&)>> reset_fn,
+ std::shared_ptr<core::logging::Logger> logger)
+ : create_new_resource_(std::move(create_new_resource)),
+ maximum_number_of_creatable_resources_(maximum_number_of_creatable_resources),
+ reset_fn_(std::move(reset_fn)),
logger_(std::move(logger)) {
}
private:
void returnResource(std::unique_ptr<ResourceType> resource) {
logDebug("Returning [%p] resource", resource.get());
+ if (reset_fn_)
+ reset_fn_.value()(*resource);
internal_queue_.enqueue(std::move(resource));
}
@@ -111,8 +120,10 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource
logger_->log_debug(format, std::forward<Args>(args)...);
}
+ const std::function<std::unique_ptr<ResourceType>()> create_new_resource_;
const std::optional<size_t> maximum_number_of_creatable_resources_;
- std::shared_ptr<core::logging::Logger> logger_;
+ const std::optional<std::function<void(ResourceType&)>> reset_fn_;
+ const std::shared_ptr<core::logging::Logger> logger_;
ConditionConcurrentQueue<std::unique_ptr<ResourceType>> internal_queue_;
size_t resources_created_ = 0;
std::mutex counter_mutex_;
@@ -126,7 +137,10 @@ struct ResourceQueue<ResourceType>::make_shared_enabler : public ResourceQueue<R
};
template<class ResourceType>
-auto ResourceQueue<ResourceType>::create(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger) {
- return std::make_shared<make_shared_enabler>(maximum_number_of_creatable_resources, std::move(logger));
+auto ResourceQueue<ResourceType>::create(std::function<std::unique_ptr<ResourceType>()> creator,
+ std::optional<size_t> maximum_number_of_creatable_resources,
+ std::optional<std::function<void(ResourceType&)>> reset_fn,
+ std::shared_ptr<core::logging::Logger> logger) {
+ return std::make_shared<make_shared_enabler>(std::move(creator), maximum_number_of_creatable_resources, std::move(reset_fn), std::move(logger));
}
} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/test/unit/ResourceQueueTests.cpp b/libminifi/test/unit/ResourceQueueTests.cpp
index 526050477..8266d3b50 100644
--- a/libminifi/test/unit/ResourceQueueTests.cpp
+++ b/libminifi/test/unit/ResourceQueueTests.cpp
@@ -29,23 +29,26 @@ using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::utils::testing {
TEST_CASE("Limiting resource queue to a maximum of 2 resources", "[utils::ResourceQueue]") {
- std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResourceQueue<int>>::getLogger()};
- LogTestController::getInstance().setTrace<ResourceQueue<int>>();
+ using std::chrono::steady_clock;
+
+ std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()};
+
+ LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>();
std::mutex resources_created_mutex;
- std::set<int> resources_created;
+ std::set<steady_clock::time_point> resources_created;
- auto worker = [&](int value, const std::shared_ptr<ResourceQueue<int>>& resource_queue) {
- auto resource = resource_queue->getResource([value]{return std::make_unique<int>(value);});
+ auto worker = [&](const std::shared_ptr<ResourceQueue<steady_clock::time_point>>& resource_queue) {
+ auto resource = resource_queue->getResource();
std::this_thread::sleep_for(10ms);
std::lock_guard<std::mutex> lock(resources_created_mutex);
resources_created.emplace(*resource);
};
- auto resource_queue = ResourceQueue<int>::create(2, logger_);
- std::thread thread_one{[&] { worker(1, resource_queue); }};
- std::thread thread_two{[&] { worker(2, resource_queue); }};
- std::thread thread_three{[&] { worker(3, resource_queue); }};
+ auto resource_queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::now()); }, 2, std::nullopt, logger);
+ std::thread thread_one{[&] { worker(resource_queue); }};
+ std::thread thread_two{[&] { worker(resource_queue); }};
+ std::thread thread_three{[&] { worker(resource_queue); }};
thread_one.join();
thread_two.join();
@@ -56,15 +59,17 @@ TEST_CASE("Limiting resource queue to a maximum of 2 resources", "[utils::Resour
}
TEST_CASE("Resource limitation is not set to the resource queue", "[utils::ResourceQueue]") {
- std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResourceQueue<int>>::getLogger()};
- LogTestController::getInstance().setTrace<ResourceQueue<int>>();
+ using std::chrono::steady_clock;
+
+ std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()};
+ LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>();
LogTestController::getInstance().clear();
- auto resource_queue = ResourceQueue<int>::create(std::nullopt, logger_);
- std::set<int> resources_created;
+ auto resource_queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::now()); }, std::nullopt, std::nullopt, logger);
+ std::set<steady_clock::time_point> resources_created;
- auto resource_wrapper_one = resource_queue->getResource([]{return std::make_unique<int>(1);});
- auto resource_wrapper_two = resource_queue->getResource([]{return std::make_unique<int>(2);});
- auto resource_wrapper_three = resource_queue->getResource([]{return std::make_unique<int>(3);});
+ auto resource_wrapper_one = resource_queue->getResource();
+ auto resource_wrapper_two = resource_queue->getResource();
+ auto resource_wrapper_three = resource_queue->getResource();
resources_created.emplace(*resource_wrapper_one);
resources_created.emplace(*resource_wrapper_two);
@@ -76,20 +81,46 @@ TEST_CASE("Resource limitation is not set to the resource queue", "[utils::Resou
}
TEST_CASE("resource returns when it goes out of scope", "[utils::ResourceQueue]") {
- auto queue = utils::ResourceQueue<int>::create(std::nullopt, nullptr);
+ using std::chrono::steady_clock;
+ auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); });
+ {
+ auto resource = queue->getResource();
+ CHECK(*resource == steady_clock::time_point::min());
+ *resource = steady_clock::now();
+ }
+ {
+ auto resource = queue->getResource();
+ CHECK(*resource != steady_clock::time_point::min());
+ }
+}
+
+TEST_CASE("resource resets when it goes out of scope", "[utils::ResourceQueue]") {
+ using std::chrono::steady_clock;
+ std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()};
+ LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>();
+ LogTestController::getInstance().clear();
+ auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); },
+ std::nullopt,
+ [](steady_clock::time_point& resource){ resource = steady_clock::time_point::min();},
+ logger);
{
- auto resource = queue->getResource([] { return std::make_unique<int>(1); });
- CHECK(*resource == 1);
+ auto resource = queue->getResource();
+ CHECK(*resource == steady_clock::time_point::min());
+ *resource = steady_clock::now();
}
{
- auto resource = queue->getResource([] { return std::make_unique<int>(2); });
- CHECK(*resource == 1);
+ CHECK(LogTestController::getInstance().matchesRegex("Returning .* resource", 0ms));
+ auto resource = queue->getResource();
+ CHECK(*resource == steady_clock::time_point::min());
+ CHECK(LogTestController::getInstance().matchesRegex("Using available .* resource instance", 0ms));
}
}
TEST_CASE("queue destroyed before resource", "[utils::ResourceQueue]") {
- auto queue = utils::ResourceQueue<int>::create(std::nullopt, nullptr);
- auto resource = queue->getResource([]{ return std::make_unique<int>(1); });
+ using std::chrono::steady_clock;
+ auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); });
+ auto resource = queue->getResource();
REQUIRE_NOTHROW(queue.reset());
+ REQUIRE_NOTHROW(*resource);
}
} // namespace org::apache::nifi::minifi::utils::testing