You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/03/11 12:41:26 UTC
[nifi-minifi-cpp] 01/02: MINIFICPP-1158 - Event driven processors
can starve each other
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit bab6f93fa5b1e565cace7293bea57cd9372ba197
Author: Arpad Boda <ab...@apache.org>
AuthorDate: Wed Feb 12 16:36:25 2020 +0100
MINIFICPP-1158 - Event driven processors can starve each other
Signed-off-by: Arpad Boda <ab...@apache.org>
Approved by szaszm and bakaid on GH
This closes #735
---
extensions/coap/tests/CoapIntegrationBase.h | 2 +-
.../integration/UpdateAttributeIntegrationTest.cpp | 30 +-
.../standard-processors/processors/GetTCP.cpp | 3 +-
extensions/standard-processors/processors/GetTCP.h | 4 +-
libminifi/include/CronDrivenSchedulingAgent.h | 10 +-
libminifi/include/EventDrivenSchedulingAgent.h | 21 +-
libminifi/include/FlowController.h | 30 +-
libminifi/include/SchedulingAgent.h | 82 +----
libminifi/include/ThreadedSchedulingAgent.h | 8 +-
libminifi/include/TimerDrivenSchedulingAgent.h | 12 +-
.../core/controller/ControllerServiceProvider.h | 5 +-
.../controller/StandardControllerServiceProvider.h | 14 +-
libminifi/include/core/state/UpdateController.h | 4 +-
libminifi/include/properties/Configure.h | 1 +
libminifi/include/utils/Monitors.h | 174 +++++++++
libminifi/include/utils/ThreadPool.h | 387 +++------------------
libminifi/src/CPPLINT.cfg | 1 -
libminifi/src/Configure.cpp | 1 +
libminifi/src/CronDrivenSchedulingAgent.cpp | 13 +-
libminifi/src/EventDrivenSchedulingAgent.cpp | 38 +-
libminifi/src/FlowController.cpp | 33 +-
libminifi/src/SchedulingAgent.cpp | 29 +-
libminifi/src/ThreadedSchedulingAgent.cpp | 22 +-
libminifi/src/TimerDrivenSchedulingAgent.cpp | 13 +-
libminifi/src/core/Processor.cpp | 1 +
libminifi/src/utils/ThreadPool.cpp | 250 +++++++++++++
libminifi/test/resources/TestUpdateAttribute.yml | 18 +-
libminifi/test/unit/BackTraceTests.cpp | 8 +-
libminifi/test/unit/ThreadPoolTests.cpp | 8 +-
29 files changed, 630 insertions(+), 592 deletions(-)
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index 85af8c6..83935bf 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -34,7 +34,7 @@ int ssl_enable(void *ssl_context, void *user_data) {
class CoapIntegrationBase : public IntegrationBase {
public:
- CoapIntegrationBase(uint64_t waitTime = 60000)
+ CoapIntegrationBase(uint64_t waitTime = 5000)
: IntegrationBase(waitTime),
server(nullptr) {
}
diff --git a/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp b/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
index a525404..40da9e2 100644
--- a/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
+++ b/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
@@ -36,37 +36,25 @@
class TestHarness : public IntegrationBase {
public:
- TestHarness() {
- log_entry_found = false;
- }
-
- void testSetup() {
+ void testSetup() override {
LogTestController::getInstance().setTrace<minifi::FlowController>();
- LogTestController::getInstance().setTrace<core::ProcessSession>();
- LogTestController::getInstance().setTrace<core::ProcessContextExpr>();
+ LogTestController::getInstance().setTrace<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<core::ProcessContextExpr>();
LogTestController::getInstance().setInfo<processors::LogAttribute>();
}
- void cleanup() {
- }
-
- void runAssertions() {
- assert(log_entry_found);
- }
+ void cleanup() override {}
- void waitToVerifyProcessor() {
- // This test takes a while to complete -> wait at most 10 secs
- log_entry_found = LogTestController::getInstance().contains("key:route_check_attr value:good", std::chrono::seconds(10));
- log_entry_found = LogTestController::getInstance().contains("key:variable_attribute value:replacement_value", std::chrono::seconds(10));
+ void runAssertions() override {
+ assert(LogTestController::getInstance().contains("key:route_check_attr value:good"));
+ assert(LogTestController::getInstance().contains("key:variable_attribute value:replacement_value"));
+ assert(LogTestController::getInstance().contains("ProcessSession rollback", std::chrono::seconds(1)) == false); // No rollback happened
}
- void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+ void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
// inject the variable into the context.
configuration->set("nifi.variable.test", "replacement_value");
}
-
- protected:
- bool log_entry_found;
};
int main(int argc, char **argv) {
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index afa32dc..5a8ab19 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -225,8 +225,7 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
}
}
- utils::ThreadPool<int> pool = utils::ThreadPool<int>(concurrent_handlers_);
- client_thread_pool_ = std::move(pool);
+ client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
client_thread_pool_.start();
running_ = true;
diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h
index 7b9e0a5..8796669 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -69,9 +69,9 @@ class SocketAfterExecute : public utils::AfterExecute<int> {
return false;
}
- virtual int64_t wait_time() {
+ virtual std::chrono::milliseconds wait_time() {
// wait 500ms
- return 500;
+ return std::chrono::milliseconds(500);
}
protected:
diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h
index 0943570..9abe3b5 100644
--- a/libminifi/include/CronDrivenSchedulingAgent.h
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -41,16 +41,18 @@ class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent {
* Create a new event driven scheduling agent.
*/
CronDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
- std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
- : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) {
+ std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
+ utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+ : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
}
// Destructor
virtual ~CronDrivenSchedulingAgent() {
}
// Run function for the thread
- uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
+ utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+ const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
- virtual void stop() {
+ void stop() override {
std::lock_guard<std::mutex> locK(mutex_);
schedules_.clear();
last_exec_.clear();
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index b434de5..295becc 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -20,6 +20,8 @@
#ifndef __EVENT_DRIVEN_SCHEDULING_AGENT_H__
#define __EVENT_DRIVEN_SCHEDULING_AGENT_H__
+#define DEFAULT_TIME_SLICE_MS 500
+
#include "core/logging/Logger.h"
#include "core/Processor.h"
#include "core/ProcessContext.h"
@@ -39,14 +41,19 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
* Create a new event driven scheduling agent.
*/
EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
- std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
- : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) {
- }
- // Destructor
- virtual ~EventDrivenSchedulingAgent() {
+ std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
+ utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+ : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
+ int slice = configuration->getInt(Configure::nifi_flow_engine_event_driven_time_slice, DEFAULT_TIME_SLICE_MS);
+ if (slice < 10 || 1000 < slice) {
+ throw Exception(FLOW_EXCEPTION, std::string(Configure::nifi_flow_engine_event_driven_time_slice) + " is out of reasonable range!");
+ }
+ time_slice_ = std::chrono::milliseconds(slice);
}
+
// Run function for the thread
- uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
+ utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+ const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
private:
// Prevent default copy constructor and assignment operation
@@ -54,6 +61,8 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent);
EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent);
+ std::chrono::milliseconds time_slice_;
+
};
} /* namespace minifi */
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 2e40bb9..5602452 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -63,9 +63,6 @@ namespace minifi {
*/
class FlowController : public core::controller::ControllerServiceProvider, public state::StateManager {
public:
- static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10;
- static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5;
-
/**
* Flow controller constructor
*/
@@ -86,22 +83,6 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
// Destructor
virtual ~FlowController();
- // Set MAX TimerDrivenThreads
- virtual void setMaxTimerDrivenThreads(int number) {
- max_timer_driven_threads_ = number;
- }
- // Get MAX TimerDrivenThreads
- virtual int getMaxTimerDrivenThreads() {
- return max_timer_driven_threads_;
- }
- // Set MAX EventDrivenThreads
- virtual void setMaxEventDrivenThreads(int number) {
- max_event_driven_threads_ = number;
- }
- // Get MAX EventDrivenThreads
- virtual int getMaxEventDrivenThreads() {
- return max_event_driven_threads_;
- }
// Get the provenance repository
virtual std::shared_ptr<core::Repository> getProvenanceRepository() {
return this->provenance_repo_;
@@ -222,7 +203,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
* Enables the controller service services
* @param serviceNode service node which will be disabled, along with linked services.
*/
- virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Enables controller services
@@ -234,7 +215,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
* Disables controller services
* @param serviceNode service node which will be disabled, along with linked services.
*/
- virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
* Gets all controller services.
@@ -355,11 +336,6 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
std::string properties_file_name_;
// Root Process Group
std::shared_ptr<core::ProcessGroup> root_;
- // MAX Timer Driven Threads
- int max_timer_driven_threads_;
- // MAX Event Driven Threads
- int max_event_driven_threads_;
- // FlowFile Repo
// Whether it is running
std::atomic<bool> running_;
std::atomic<bool> updating_;
@@ -380,6 +356,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
std::shared_ptr<core::ContentRepository> content_repo_;
+ // Thread pool for schedulers
+ utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
// Flow Engines
// Flow Timer Scheduler
std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 6993302..13d7ded 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -28,6 +28,7 @@
#include <algorithm>
#include <thread>
#include "utils/CallBackTimer.h"
+#include "utils/Monitors.h"
#include "utils/TimeUtil.h"
#include "utils/ThreadPool.h"
#include "utils/BackTrace.h"
@@ -49,67 +50,6 @@ namespace apache {
namespace nifi {
namespace minifi {
-/**
- * Uses the wait time for a given worker to determine if it is eligible to run
- */
-class TimerAwareMonitor : public utils::AfterExecute<uint64_t> {
- public:
- TimerAwareMonitor(std::atomic<bool> *run_monitor)
- : current_wait_(0),
- run_monitor_(run_monitor) {
- }
- explicit TimerAwareMonitor(TimerAwareMonitor &&other)
- : AfterExecute(std::move(other)),
- run_monitor_(std::move(other.run_monitor_)) {
- current_wait_.store(other.current_wait_.load());
- }
- virtual bool isFinished(const uint64_t &result) {
- current_wait_.store(result);
- if (*run_monitor_) {
- return false;
- }
- return true;
- }
- virtual bool isCancelled(const uint64_t &result) {
- if (*run_monitor_) {
- return false;
- }
- return true;
- }
- /**
- * Time to wait before re-running this task if necessary
- * @return milliseconds since epoch after which we are eligible to re-run this task.
- */
- virtual int64_t wait_time() {
- return current_wait_.load();
- }
- protected:
-
- std::atomic<uint64_t> current_wait_;
- std::atomic<bool> *run_monitor_;
-};
-
-class SingleRunMonitor : public TimerAwareMonitor {
- public:
- SingleRunMonitor(std::atomic<bool> *run_monitor)
- : TimerAwareMonitor(run_monitor) {
- }
- explicit SingleRunMonitor(TimerAwareMonitor &&other)
- : TimerAwareMonitor(std::move(other)) {
- }
- virtual bool isFinished(const uint64_t &result) {
- if (result == 0) {
- return true;
- } else {
- current_wait_.store(result);
- if (*run_monitor_) {
- return false;
- }
- return true;
- }
- }
-};
-
// SchedulingAgent Class
class SchedulingAgent {
public:
@@ -118,25 +58,18 @@ class SchedulingAgent {
* Create a new scheduling agent.
*/
SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
- std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
- : admin_yield_duration_(0),
+ std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+ : admin_yield_duration_(),
bored_yield_duration_(0),
configure_(configuration),
content_repo_(content_repo),
+ thread_pool_(thread_pool),
controller_service_provider_(controller_service_provider),
logger_(logging::LoggerFactory<SchedulingAgent>::getLogger()),
alert_time_(configuration->getInt(Configure::nifi_flow_engine_alert_period, SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD_MS)) {
running_ = false;
repo_ = repo;
flow_repo_ = flow_repo;
- /**
- * To facilitate traces we cannot use daemon threads -- this could potentially cause blocking on I/O; however, it's a better path
- * to be able to debug why an agent doesn't work and still allow a restart via updates in these cases.
- */
- auto csThreads = configure_->getInt(Configure::nifi_flow_engine_threads, 2);
- auto pool = utils::ThreadPool<uint64_t>(csThreads, false, controller_service_provider, "SchedulingAgent");
- thread_pool_ = std::move(pool);
- thread_pool_.start();
if (alert_time_ > std::chrono::milliseconds(0)) {
std::function<void(void)> f = std::bind(&SchedulingAgent::watchDogFunc, this);
@@ -166,7 +99,6 @@ class SchedulingAgent {
// stop
virtual void stop() {
running_ = false;
- thread_pool_.shutdown();
}
std::vector<BackTrace> getTraces() {
@@ -175,8 +107,8 @@ class SchedulingAgent {
void watchDogFunc();
- virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
- virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+ virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
// schedule, overwritten by different DrivenSchedulingAgent
virtual void schedule(std::shared_ptr<core::Processor> processor) = 0;
// unschedule, overwritten by different DrivenSchedulingAgent
@@ -202,7 +134,7 @@ class SchedulingAgent {
std::shared_ptr<core::ContentRepository> content_repo_;
// thread pool for components.
- utils::ThreadPool<uint64_t> thread_pool_;
+ utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
// controller service provider reference
std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_;
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index ba0998a..c530796 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -45,8 +45,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
* Create a new threaded scheduling agent.
*/
ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
- std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
- : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration),
+ std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+ : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool),
logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
}
// Destructor
@@ -54,7 +54,7 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
}
// Run function for the thread
- virtual uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+ virtual utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) = 0;
public:
@@ -73,6 +73,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
std::shared_ptr<logging::Logger> logger_;
+
+ std::set<std::string> processors_running_; // Set just for easy usage
};
} /* namespace minifi */
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 8398b3a..10aaf77 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -38,17 +38,17 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
* Create a new processor
*/
TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
- std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure)
- : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure),
+ std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
+ utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+ : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure, thread_pool),
logger_(logging::LoggerFactory<TimerDrivenSchedulingAgent>::getLogger()) {
}
- // Destructor
- virtual ~TimerDrivenSchedulingAgent() {
- }
+
/**
* Run function that accepts the processor, context and session factory.
*/
- uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
+ utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+ const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
private:
// Prevent default copy constructor and assignment operation
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index 18ac5de..6a147e1 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -26,6 +26,7 @@
#include "ControllerServiceNode.h"
#include "ControllerServiceMap.h"
#include "core/ClassLoader.h"
+#include "utils/Monitors.h"
namespace org {
namespace apache {
@@ -97,7 +98,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
* Enables the provided controller service
* @param serviceNode controller service node.
*/
- virtual std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
+ virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
/**
* Enables the provided controller service nodes
@@ -109,7 +110,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
* Disables the provided controller service node
* @param serviceNode controller service node.
*/
- virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
+ virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
/**
* Gets a list of all controller services.
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index cc1d51e..6ce6651 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -103,15 +103,12 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
}
- std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+ std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
if (serviceNode->canEnable()) {
return agent_->enableControllerService(serviceNode);
} else {
- std::future<uint64_t> no_run = std::async(std::launch::async, []() {
- uint64_t ret = 0;
- return ret;
- });
+ std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done);
return no_run;
}
}
@@ -135,14 +132,11 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
}
}
- std::future<uint64_t> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+ std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) {
return agent_->disableControllerService(serviceNode);
} else {
- std::future<uint64_t> no_run = std::async(std::launch::async, []() {
- uint64_t ret = 0;
- return ret;
- });
+ std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done);
return no_run;
}
}
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index 4d70d99..8c76e58 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -150,14 +150,14 @@ class UpdateRunner : public utils::AfterExecute<Update> {
return !*running_;
}
- virtual int64_t wait_time() {
+ virtual std::chrono::milliseconds wait_time() {
return delay_;
}
protected:
std::atomic<bool> *running_;
- int64_t delay_;
+ std::chrono::milliseconds delay_;
};
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 89dbe0e..9250b5b 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -47,6 +47,7 @@ class Configure : public Properties {
static const char *nifi_flow_configuration_file_backup_update;
static const char *nifi_flow_engine_threads;
static const char *nifi_flow_engine_alert_period;
+ static const char *nifi_flow_engine_event_driven_time_slice;
static const char *nifi_administrative_yield_duration;
static const char *nifi_bored_yield_duration;
static const char *nifi_graceful_shutdown_seconds;
diff --git a/libminifi/include/utils/Monitors.h b/libminifi/include/utils/Monitors.h
new file mode 100644
index 0000000..a9ff485
--- /dev/null
+++ b/libminifi/include/utils/Monitors.h
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future> // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+ virtual ~AfterExecute() {
+
+ }
+
+ explicit AfterExecute() {
+
+ }
+
+ explicit AfterExecute(AfterExecute &&other) {
+
+ }
+ virtual bool isFinished(const T &result) = 0;
+ virtual bool isCancelled(const T &result) = 0;
+ /**
+ * Time to wait before re-running this task if necessary
+ * @return milliseconds since epoch after which we are eligible to re-run this task.
+ */
+ virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+ TimerAwareMonitor(std::atomic<bool> *run_monitor)
+ : current_wait_(std::chrono::milliseconds(0)),
+ run_monitor_(run_monitor) {
+ }
+ virtual bool isFinished(const std::chrono::milliseconds &result) override {
+ current_wait_.store(result);
+ if (*run_monitor_) {
+ return false;
+ }
+ return true;
+ }
+ virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+ if (*run_monitor_) {
+ return false;
+ }
+ return true;
+ }
+ /**
+ * Time to wait before re-running this task if necessary
+ * @return milliseconds since epoch after which we are eligible to re-run this task.
+ */
+ virtual std::chrono::milliseconds wait_time() override {
+ return current_wait_.load();
+ }
+ protected:
+
+ std::atomic<std::chrono::milliseconds> current_wait_;
+ std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+ SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+ : retry_interval_(retry_interval) {
+ }
+
+ virtual bool isFinished(const bool &result) override {
+ return result;
+ }
+ virtual bool isCancelled(const bool &result) override {
+ return false;
+ }
+ virtual std::chrono::milliseconds wait_time() override {
+ return retry_interval_;
+ }
+ protected:
+ const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct TaskRescheduleInfo {
+ TaskRescheduleInfo(bool result, std::chrono::milliseconds wait_time)
+ : finished_(result), wait_time_(wait_time){}
+ std::chrono::milliseconds wait_time_;
+ bool finished_;
+
+ static TaskRescheduleInfo Done() {
+ return TaskRescheduleInfo(true, std::chrono::milliseconds(0));
+ }
+
+ static TaskRescheduleInfo RetryIn(std::chrono::milliseconds interval) {
+ return TaskRescheduleInfo(false, interval);
+ }
+
+ static TaskRescheduleInfo RetryImmediately() {
+ return TaskRescheduleInfo(false, std::chrono::milliseconds(0));
+ }
+
+#if defined(WIN32)
+ // https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html
+ // Because of this bug we need to have this object default constructible, which makes no sense otherwise. Hack.
+ private:
+ TaskRescheduleInfo() : wait_time_(std::chrono::milliseconds(0)), finished_(true) {}
+ friend class std::_Associated_state<TaskRescheduleInfo>;
+#endif
+};
+
+class ComplexMonitor : public utils::AfterExecute<TaskRescheduleInfo> {
+ public:
+ ComplexMonitor() = default;
+
+ virtual bool isFinished(const TaskRescheduleInfo &result) override {
+ if (result.finished_) {
+ return true;
+ }
+ current_wait_.store(result.wait_time_);
+ return false;
+ }
+ virtual bool isCancelled(const TaskRescheduleInfo &result) override {
+ return false;
+ }
+ /**
+ * Time to wait before re-running this task if necessary
+ * @return milliseconds since epoch after which we are eligible to re-run this task.
+ */
+ virtual std::chrono::milliseconds wait_time() override {
+ return current_wait_.load();
+ }
+
+ private:
+ std::atomic<std::chrono::milliseconds> current_wait_ {std::chrono::milliseconds(0)};
+};
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_MONITORS_H
\ No newline at end of file
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 5bbd3f6..2554dc2 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -30,6 +30,7 @@
#include <functional>
#include "BackTrace.h"
+#include "Monitors.h"
#include "core/expect.h"
#include "controllers/ThreadManagementService.h"
#include "concurrentqueue.h"
@@ -42,33 +43,6 @@ namespace minifi {
namespace utils {
/**
- * Worker task helper that determines
- * whether or not we will run
- */
-template<typename T>
-class AfterExecute {
- public:
- virtual ~AfterExecute() {
-
- }
-
- explicit AfterExecute() {
-
- }
-
- explicit AfterExecute(AfterExecute &&other) {
-
- }
- virtual bool isFinished(const T &result) = 0;
- virtual bool isCancelled(const T &result) = 0;
- /**
- * Time to wait before re-running this task if necessary
- * @return milliseconds since epoch after which we are eligible to re-run this task.
- */
- virtual int64_t wait_time() = 0;
-};
-
-/**
* Worker task
* purpose: Provides a wrapper for the functor
* and returns a future based on the template argument.
@@ -78,7 +52,7 @@ class Worker {
public:
explicit Worker(std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
: identifier_(identifier),
- time_slice_(0),
+ next_exec_time_(std::chrono::steady_clock::now()),
task(task),
run_determinant_(std::move(run_determinant)) {
promise = std::make_shared<std::promise<T>>();
@@ -86,7 +60,7 @@ class Worker {
explicit Worker(std::function<T()> &task, const std::string &identifier)
: identifier_(identifier),
- time_slice_(0),
+ next_exec_time_(std::chrono::steady_clock::now()),
task(task),
run_determinant_(nullptr) {
promise = std::make_shared<std::promise<T>>();
@@ -94,7 +68,7 @@ class Worker {
explicit Worker(const std::string identifier = "")
: identifier_(identifier),
- time_slice_(0) {
+ next_exec_time_(std::chrono::steady_clock::now()) {
}
virtual ~Worker() {
@@ -104,9 +78,9 @@ class Worker {
/**
* Move constructor for worker tasks
*/
- Worker(Worker &&other)
+ Worker (Worker &&other) noexcept
: identifier_(std::move(other.identifier_)),
- time_slice_(std::move(other.time_slice_)),
+ next_exec_time_(std::move(other.next_exec_time_)),
task(std::move(other.task)),
run_determinant_(std::move(other.run_determinant_)),
promise(other.promise) {
@@ -125,7 +99,7 @@ class Worker {
promise->set_value(result);
return false;
}
- time_slice_ = increment_time(run_determinant_->wait_time());
+ next_exec_time_ += run_determinant_->wait_time();
return true;
}
@@ -133,59 +107,52 @@ class Worker {
identifier_ = identifier;
}
- virtual uint64_t getTimeSlice() {
- return time_slice_;
+ virtual std::chrono::time_point<std::chrono::steady_clock> getNextExecutionTime() const {
+ return next_exec_time_;
}
- virtual uint64_t getWaitTime() {
+ virtual std::chrono::milliseconds getWaitTime() const {
return run_determinant_->wait_time();
}
Worker<T>(const Worker<T>&) = delete;
- Worker<T>& operator =(const Worker<T>&) = delete;
+ Worker<T>& operator= (const Worker<T>&) = delete;
- Worker<T>& operator =(Worker<T> &&);
+ Worker<T>& operator= (Worker<T> &&) noexcept;
- std::shared_ptr<std::promise<T>> getPromise();
+ std::shared_ptr<std::promise<T>> getPromise() const;
- const std::string &getIdentifier() {
+ const std::string &getIdentifier() const {
return identifier_;
}
- protected:
-
- inline uint64_t increment_time(const uint64_t &time) {
- std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
- auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
- return millis + time;
- }
-
+protected:
std::string identifier_;
- uint64_t time_slice_;
+ std::chrono::time_point<std::chrono::steady_clock> next_exec_time_;
std::function<T()> task;
std::unique_ptr<AfterExecute<T>> run_determinant_;
std::shared_ptr<std::promise<T>> promise;
};
template<typename T>
-class WorkerComparator {
+class DelayedTaskComparator {
public:
bool operator()(Worker<T> &a, Worker<T> &b) {
- return a.getTimeSlice() < b.getTimeSlice();
+ return a.getNextExecutionTime() > b.getNextExecutionTime();
}
};
template<typename T>
-Worker<T>& Worker<T>::operator =(Worker<T> && other) {
+Worker<T>& Worker<T>::operator =(Worker<T> && other) noexcept {
task = std::move(other.task);
promise = other.promise;
- time_slice_ = std::move(other.time_slice_);
+ next_exec_time_ = std::move(other.next_exec_time_);
identifier_ = std::move(other.identifier_);
run_determinant_ = std::move(other.run_determinant_);
return *this;
}
template<typename T>
-std::shared_ptr<std::promise<T>> Worker<T>::getPromise() {
+std::shared_ptr<std::promise<T>> Worker<T>::getPromise() const {
return promise;
}
@@ -231,18 +198,10 @@ class ThreadPool {
thread_manager_ = nullptr;
}
- ThreadPool(const ThreadPool<T> &&other)
- : daemon_threads_(std::move(other.daemon_threads_)),
- thread_reduction_count_(0),
- max_worker_threads_(std::move(other.max_worker_threads_)),
- adjust_threads_(false),
- running_(false),
- controller_service_provider_(std::move(other.controller_service_provider_)),
- thread_manager_(std::move(other.thread_manager_)),
- name_(std::move(other.name_)) {
- current_workers_ = 0;
- task_count_ = 0;
- }
+ ThreadPool(const ThreadPool<T> &other) = delete;
+ ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete;
+ ThreadPool(ThreadPool<T> &&other) = delete;
+ ThreadPool<T>& operator=(ThreadPool<T> &&other) = delete;
~ThreadPool() {
shutdown();
@@ -268,8 +227,16 @@ class ThreadPool {
/**
* Returns true if a task is running.
*/
- bool isRunning(const std::string &identifier) {
- return task_status_[identifier] == true;
+ bool isTaskRunning(const std::string &identifier) const {
+ try {
+ return task_status_.at(identifier) == true;
+ } catch (const std::out_of_range &e) {
+ return false;
+ }
+ }
+
+ bool isRunning() const {
+ return running_.load();
}
std::vector<BackTrace> getTraces() {
@@ -304,44 +271,24 @@ class ThreadPool {
*/
void setMaxConcurrentTasks(uint16_t max) {
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
- if (running_) {
+ bool was_running = running_;
+ if (was_running) {
shutdown();
}
max_worker_threads_ = max;
- if (!running_)
+ if (was_running)
start();
}
- ThreadPool<T> operator=(const ThreadPool<T> &other) = delete;
- ThreadPool(const ThreadPool<T> &other) = delete;
-
- ThreadPool<T> &operator=(ThreadPool<T> &&other) {
+ void setControllerServiceProvider(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider) {
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
- if (other.running_) {
- other.shutdown();
- }
- if (running_) {
+ bool was_running = running_;
+ if (was_running) {
shutdown();
}
- max_worker_threads_ = std::move(other.max_worker_threads_);
- daemon_threads_ = std::move(other.daemon_threads_);
- current_workers_ = 0;
- thread_reduction_count_ = 0;
-
- thread_queue_ = std::move(other.thread_queue_);
- worker_queue_ = std::move(other.worker_queue_);
-
- controller_service_provider_ = std::move(other.controller_service_provider_);
- thread_manager_ = std::move(other.thread_manager_);
-
- adjust_threads_ = false;
-
- if (!running_) {
+ controller_service_provider_ = controller_service_provider;
+ if (was_running)
start();
- }
-
- name_ = other.name_;
- return *this;
}
protected:
@@ -372,6 +319,8 @@ class ThreadPool {
std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
// manager thread
std::thread manager_thread_;
+// the thread responsible for putting delayed tasks to the worker queue when they had to be put
+ std::thread delayed_scheduler_thread_;
// conditional that's used to adjust the threads
std::atomic<bool> adjust_threads_;
// atomic running boolean
@@ -384,9 +333,11 @@ class ThreadPool {
moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
// worker queue of worker objects
moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
- std::priority_queue<Worker<T>, std::vector<Worker<T>>, WorkerComparator<T>> worker_priority_queue_;
+ std::priority_queue<Worker<T>, std::vector<Worker<T>>, DelayedTaskComparator<T>> delayed_worker_queue_;
// notification for available work
std::condition_variable tasks_available_;
+// notification for new delayed tasks that's before the current ones
+ std::condition_variable delayed_task_available_;
// map to identify if a task should be
std::map<std::string, bool> task_status_;
// manager mutex
@@ -410,247 +361,9 @@ class ThreadPool {
* Runs worker tasks
*/
void run_tasks(std::shared_ptr<WorkerThread> thread);
-};
-template<typename T>
-bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
- {
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- task_status_[task.getIdentifier()] = true;
- }
- future = std::move(task.getPromise()->get_future());
- bool enqueued = worker_queue_.enqueue(std::move(task));
- if (running_) {
- tasks_available_.notify_one();
- }
-
- task_count_++;
-
- return enqueued;
-}
-
-template<typename T>
-void ThreadPool<T>::manageWorkers() {
- for (int i = 0; i < max_worker_threads_; i++) {
- std::stringstream thread_name;
- thread_name << name_ << " #" << i;
- auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
- worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
- thread_queue_.push_back(worker_thread);
- current_workers_++;
- }
-
- if (daemon_threads_) {
- for (auto &thread : thread_queue_) {
- thread->thread_.detach();
- }
- }
-
-// likely don't have a thread manager
- if (LIKELY(nullptr != thread_manager_)) {
- while (running_) {
- auto waitperiod = std::chrono::milliseconds(1) * 500;
- {
- if (thread_manager_->isAboveMax(current_workers_)) {
- auto max = thread_manager_->getMaxConcurrentTasks();
- auto differential = current_workers_ - max;
- thread_reduction_count_ += differential;
- } else if (thread_manager_->shouldReduce()) {
- if (current_workers_ > 1)
- thread_reduction_count_++;
- thread_manager_->reduce();
- } else if (thread_manager_->canIncrease() && max_worker_threads_ - current_workers_ > 0) { // increase slowly
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- auto worker_thread = std::make_shared<WorkerThread>();
- worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
- if (daemon_threads_) {
- worker_thread->thread_.detach();
- }
- thread_queue_.push_back(worker_thread);
- current_workers_++;
- }
- }
- {
- std::shared_ptr<WorkerThread> thread_ref;
- while (deceased_thread_queue_.try_dequeue(thread_ref)) {
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- if (thread_ref->thread_.joinable())
- thread_ref->thread_.join();
- thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
- }
- }
- std::this_thread::sleep_for(waitperiod);
- }
- } else {
- for (auto &thread : thread_queue_) {
- if (thread->thread_.joinable())
- thread->thread_.join();
- }
- }
-}
-template<typename T>
-void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
- auto waitperiod = std::chrono::milliseconds(1) * 100;
- thread->is_running_ = true;
- uint64_t wait_decay_ = 0;
- uint64_t yield_backoff = 10; // start at 10 ms
- while (running_.load()) {
- if (UNLIKELY(thread_reduction_count_ > 0)) {
- if (--thread_reduction_count_ >= 0) {
- deceased_thread_queue_.enqueue(thread);
- thread->is_running_ = false;
- break;
- } else {
- thread_reduction_count_++;
- }
- }
- // if we exceed 500ms of wait due to not being able to run any tasks and there are tasks available, meaning
- // they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state
- // BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from
- // there. This ensures we don't have arbitrarily long sleep cycles.
- if (wait_decay_ > 500000000L) {
- wait_decay_ = 100000000L;
- }
- // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
- // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
- // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
- // be more likely to run. This is intentional.
-
- if (wait_decay_ > 2000) {
- std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
- }
-
- if (current_workers_ < max_worker_threads_) {
- // we are in a reduced state. due to thread management
- // let's institute a backoff up to 500ms
- if (yield_backoff < 500) {
- yield_backoff += 10;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(yield_backoff));
- } else {
- yield_backoff = 10;
- }
- Worker<T> task;
-
- bool prioritized_task = false;
-
- if (!prioritized_task) {
- if (!worker_queue_.try_dequeue(task)) {
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- if (worker_priority_queue_.size() > 0) {
- // this is safe as we are going to immediately pop the queue
- while (!worker_priority_queue_.empty()) {
- task = std::move(const_cast<Worker<T>&>(worker_priority_queue_.top()));
- worker_priority_queue_.pop();
- worker_queue_.enqueue(std::move(task));
- continue;
- }
-
- }
- tasks_available_.wait_for(lock, waitperiod);
- continue;
- } else {
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- if (!task_status_[task.getIdentifier()]) {
- continue;
- }
- }
-
- bool wait_to_run = false;
- if (task.getTimeSlice() > 1) {
- double wt = (double) task.getWaitTime();
- auto now = std::chrono::system_clock::now().time_since_epoch();
- auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
-
- // if our differential is < 10% of the wait time we will not put the task into a wait state
- // since requeuing will break the time slice contract.
- if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) {
- wait_to_run = true;
- }
- }
- // if we have to wait we re-queue the worker.
- if (wait_to_run) {
- {
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- if (!task_status_[task.getIdentifier()]) {
- continue;
- }
- // put it on the priority queue
- worker_priority_queue_.push(std::move(task));
- }
-
- wait_decay_ += 25;
- continue;
- }
- }
- const bool task_renew = task.run();
- wait_decay_ = 0;
- if (task_renew) {
-
- if (UNLIKELY(task_count_ > current_workers_)) {
- // even if we have more work to do we will not
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- if (!task_status_[task.getIdentifier()]) {
- continue;
- }
-
- worker_priority_queue_.push(std::move(task));
- } else {
- worker_queue_.enqueue(std::move(task));
- }
- }
- }
- current_workers_--;
-}
-template<typename T>
-void ThreadPool<T>::start() {
- if (nullptr != controller_service_provider_) {
- auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
- thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
- } else {
- thread_manager_ = nullptr;
- }
- std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
- if (!running_) {
- running_ = true;
- manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
- if (worker_queue_.size_approx() > 0) {
- tasks_available_.notify_all();
- }
- }
-}
-
-template<typename T>
-void ThreadPool<T>::stopTasks(const std::string &identifier) {
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- task_status_[identifier] = false;
-}
-
-template<typename T>
-void ThreadPool<T>::shutdown() {
- if (running_.load()) {
- std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
- running_.store(false);
-
- drain();
- task_status_.clear();
- if (manager_thread_.joinable())
- manager_thread_.join();
- {
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- for(const auto &thread : thread_queue_){
- if (thread->thread_.joinable())
- thread->thread_.join();
- }
- thread_queue_.clear();
- current_workers_ = 0;
- while (worker_queue_.size_approx() > 0) {
- Worker<T> task;
- worker_queue_.try_dequeue(task);
- }
- }
- }
-}
+ void manage_delayed_queue();
+};
} /* namespace utils */
} /* namespace minifi */
diff --git a/libminifi/src/CPPLINT.cfg b/libminifi/src/CPPLINT.cfg
index 9205687..bba5060 100644
--- a/libminifi/src/CPPLINT.cfg
+++ b/libminifi/src/CPPLINT.cfg
@@ -1,3 +1,2 @@
-set noparent
filter=-build/include_order,-build/include_alpha
exclude_files=ResourceClaim.cpp
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index a874a93..38b9c19 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -29,6 +29,7 @@ const char *Configure::nifi_flow_configuration_file_exit_failure = "nifi.flow.co
const char *Configure::nifi_flow_configuration_file_backup_update = "nifi.flow.configuration.backup.on.update";
const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads";
const char *Configure::nifi_flow_engine_alert_period = "nifi.flow.engine.alert.period";
+const char *Configure::nifi_flow_engine_event_driven_time_slice = "nifi.flow.engine.event.driven.time.slice";
const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration";
const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
const char *Configure::nifi_graceful_shutdown_seconds = "nifi.flowcontroller.graceful.shutdown.period";
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp
index 41ffa96..53c2522 100644
--- a/libminifi/src/CronDrivenSchedulingAgent.cpp
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -32,7 +32,7 @@ namespace apache {
namespace nifi {
namespace minifi {
-uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
if (this->running_ && processor->isRunning()) {
std::chrono::system_clock::time_point leap_nanos;
@@ -52,7 +52,7 @@ uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &
// we may be woken up a little early so that we can honor our time.
// in this case we can return the next time to run with the expectation
// that the wakeup mechanism gets more granular.
- return std::chrono::duration_cast<std::chrono::milliseconds>(result - from).count();
+ return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(result - from));
}
} else {
Bosma::Cron schedule(processor->getCronPeriod());
@@ -67,16 +67,15 @@ uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &
if (processor->isYield()) {
// Honor the yield
- return processor->getYieldTime();
+ return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
} else if (shouldYield && this->bored_yield_duration_ > 0) {
// No work to do or need to apply back pressure
- return this->bored_yield_duration_;
+ return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(this->bored_yield_duration_));
}
}
- auto sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(result - from).count();
- return sleep_time;
+ return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(result - from));
}
- return 0;
+ return utils::TaskRescheduleInfo::Done();
}
} /* namespace minifi */
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index c56ac58..ef771b9 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -19,9 +19,6 @@
*/
#include "EventDrivenSchedulingAgent.h"
#include <chrono>
-#include <memory>
-#include <thread>
-#include <iostream>
#include "core/Processor.h"
#include "core/ProcessContext.h"
#include "core/ProcessSessionFactory.h"
@@ -32,28 +29,25 @@ namespace apache {
namespace nifi {
namespace minifi {
-uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
- while (this->running_) {
- bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
-
- if (processor->isYield()) {
- // Honor the yield
- return processor->getYieldTime();
- } else if (shouldYield && this->bored_yield_duration_ > 0) {
- // No work to do or need to apply back pressure
- return this->bored_yield_duration_;
- }
-
- // Block until work is available
-
- processor->waitForWork(1000);
-
- if (!processor->isWorkAvailable()) {
- return 1000;
+ if (this->running_) {
+ auto start_time = std::chrono::steady_clock::now();
+ // trigger processor until it has work to do, but no more than half a sec
+ while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) {
+ bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+ if (processor->isYield()) {
+ // Honor the yield
+ return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
+ } else if (shouldYield) {
+ // No work to do or need to apply back pressure
+ return utils::TaskRescheduleInfo::RetryIn(
+ std::chrono::milliseconds((this->bored_yield_duration_ > 0) ? this->bored_yield_duration_ : 10)); // No work left to do, stand by
+ }
}
+ return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available
}
- return 0;
+ return utils::TaskRescheduleInfo::Done();
}
} /* namespace minifi */
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 9de7822..31160c8 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -74,8 +74,6 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode)
: core::controller::ControllerServiceProvider(core::getClassName<FlowController>()),
root_(nullptr),
- max_timer_driven_threads_(0),
- max_event_driven_threads_(0),
running_(false),
updating_(false),
c2_enabled_(true),
@@ -84,6 +82,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
flow_file_repo_(flow_file_repo),
protocol_(0),
controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
+ thread_pool_(2, false, nullptr, "Flowcontroller threadpool"),
timer_scheduler_(nullptr),
event_scheduler_(nullptr),
cron_scheduler_(nullptr),
@@ -101,14 +100,11 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
}
id_generator_->generate(uuid_);
setUUID(uuid_);
-
flow_update_ = false;
// Setup the default values
if (flow_configuration_ != nullptr) {
configuration_filename_ = flow_configuration_->getConfigurationPath();
}
- max_event_driven_threads_ = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
- max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
running_ = false;
initialized_ = false;
c2_initialized_ = false;
@@ -247,6 +243,7 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
this->timer_scheduler_->stop();
this->event_scheduler_->stop();
this->cron_scheduler_->stop();
+ this->thread_pool_.shutdown();
running_ = false;
}
return 0;
@@ -313,21 +310,25 @@ void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool
controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
+ auto base_shared_ptr = std::dynamic_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this());
+
+ if (!thread_pool_.isRunning() || reload) {
+ thread_pool_.shutdown();
+ thread_pool_.setMaxConcurrentTasks(configuration_->getInt(Configure::nifi_flow_engine_threads, 2));
+ thread_pool_.setControllerServiceProvider(base_shared_ptr);
+ thread_pool_.start();
+ }
+
if (nullptr == timer_scheduler_ || reload) {
- timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(
- std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
- configuration_);
+ timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
}
+
if (nullptr == event_scheduler_ || reload) {
- event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(
- std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
- configuration_);
+ event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
}
if (nullptr == cron_scheduler_ || reload) {
- cron_scheduler_ = std::make_shared<CronDrivenSchedulingAgent>(
- std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
- configuration_);
+ cron_scheduler_ = std::make_shared<CronDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
}
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
@@ -769,7 +770,7 @@ void FlowController::removeControllerService(const std::shared_ptr<core::control
* Enables the controller service services
* @param serviceNode service node which will be disabled, along with linked services.
*/
-std::future<uint64_t> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<utils::TaskRescheduleInfo> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
return controller_service_provider_->enableControllerService(serviceNode);
}
@@ -784,7 +785,7 @@ void FlowController::enableControllerServices(std::vector<std::shared_ptr<core::
* Disables controller services
* @param serviceNode service node which will be disabled, along with linked services.
*/
-std::future<uint64_t> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<utils::TaskRescheduleInfo> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
return controller_service_provider_->disableControllerService(serviceNode);
}
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index a8684c3..65a79ae 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -26,6 +26,7 @@
#include "Exception.h"
#include "core/Processor.h"
#include "utils/ScopeGuard.h"
+#include "utils/GeneralUtils.h"
namespace org {
namespace apache {
@@ -40,41 +41,41 @@ bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) {
return false;
}
-std::future<uint64_t> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<utils::TaskRescheduleInfo> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
// reference the enable function from serviceNode
- std::function<uint64_t()> f_ex = [serviceNode] {
+ std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] {
serviceNode->enable();
- return 0;
+ return utils::TaskRescheduleInfo::Done();
};
// only need to run this once.
- std::unique_ptr<SingleRunMonitor> monitor = std::unique_ptr<SingleRunMonitor>(new SingleRunMonitor(&running_));
- utils::Worker<uint64_t> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
+ auto monitor = utils::make_unique<utils::ComplexMonitor>();
+ utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
// move the functor into the thread pool. While a future is returned
// we aren't terribly concerned with the result.
- std::future<uint64_t> future;
+ std::future<utils::TaskRescheduleInfo> future;
thread_pool_.execute(std::move(functor), future);
if (future.valid())
future.wait();
return future;
}
-std::future<uint64_t> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<utils::TaskRescheduleInfo> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
logger_->log_info("Disabling CSN in SchedulingAgent %s", serviceNode->getName());
// reference the disable function from serviceNode
- std::function<uint64_t()> f_ex = [serviceNode] {
+ std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] {
serviceNode->disable();
- return 0;
+ return utils::TaskRescheduleInfo::Done();
};
// only need to run this once.
- std::unique_ptr<SingleRunMonitor> monitor = std::unique_ptr<SingleRunMonitor>(new SingleRunMonitor(&running_));
- utils::Worker<uint64_t> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
+ auto monitor = utils::make_unique<utils::ComplexMonitor>();
+ utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
// move the functor into the thread pool. While a future is returned
// we aren't terribly concerned with the result.
- std::future<uint64_t> future;
+ std::future<utils::TaskRescheduleInfo> future;
thread_pool_.execute(std::move(functor), future);
if (future.valid())
future.wait();
@@ -121,10 +122,6 @@ bool SchedulingAgent::onTrigger(const std::shared_ptr<core::Processor> &processo
try {
processor->onTrigger(processContext, sessionFactory);
processor->decrementActiveTask();
- } catch (Exception &exception) {
- // Normal exception
- logger_->log_debug("Caught Exception %s", exception.what());
- processor->decrementActiveTask();
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
processor->yield(admin_yield_duration_);
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 0b072d8..c096fbb 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -35,6 +35,7 @@
#include "core/ProcessContextBuilder.h"
#include "core/ProcessSession.h"
#include "core/ProcessSessionFactory.h"
+#include "utils/GeneralUtils.h"
namespace org {
namespace apache {
@@ -44,7 +45,7 @@ namespace minifi {
void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processor) {
std::lock_guard<std::mutex> lock(mutex_);
- admin_yield_duration_ = 0;
+ admin_yield_duration_ = 100; // We should prevent burning CPU in case of rollbacks
std::string yieldValue;
if (configure_->get(Configure::nifi_administrative_yield_duration, yieldValue)) {
@@ -67,7 +68,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
return;
}
- if (thread_pool_.isRunning(processor->getUUIDStr())) {
+ if (thread_pool_.isTaskRunning(processor->getUUIDStr())) {
logger_->log_warn("Can not schedule threads for processor %s because there are existing threads running", processor->getName());
return;
}
@@ -92,25 +93,30 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
// reference the disable function from serviceNode
processor->incrementActiveTasks();
- std::function<uint64_t()> f_ex = [agent, processor, processContext, sessionFactory] () {
+ std::function<utils::TaskRescheduleInfo()> f_ex = [agent, processor, processContext, sessionFactory] () {
return agent->run(processor, processContext, sessionFactory);
};
// create a functor that will be submitted to the thread pool.
- std::unique_ptr<TimerAwareMonitor> monitor = std::unique_ptr<TimerAwareMonitor>(new TimerAwareMonitor(&running_));
- utils::Worker<uint64_t> functor(f_ex, processor->getUUIDStr(), std::move(monitor));
+ auto monitor = utils::make_unique<utils::ComplexMonitor>();
+ utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, processor->getUUIDStr(), std::move(monitor));
// move the functor into the thread pool. While a future is returned
// we aren't terribly concerned with the result.
- std::future<uint64_t> future;
+ std::future<utils::TaskRescheduleInfo> future;
thread_pool_.execute(std::move(functor), future);
}
logger_->log_debug("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName());
+ processors_running_.insert(processor->getUUIDStr());
return;
}
void ThreadedSchedulingAgent::stop() {
SchedulingAgent::stop();
- thread_pool_.shutdown();
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (const auto& p : processors_running_) {
+ logger_->log_error("SchedulingAgent is stopped before processor was unscheduled: %s", p);
+ thread_pool_.stopTasks(p);
+ }
}
void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) {
@@ -127,6 +133,8 @@ void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> proces
processor->clearActiveTask();
processor->setScheduledState(core::STOPPED);
+
+ processors_running_.erase(processor->getUUIDStr());
}
} /* namespace minifi */
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 13a3439..1b6b7f6 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -29,20 +29,21 @@ namespace apache {
namespace nifi {
namespace minifi {
-uint64_t TimerDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
- while (this->running_ && processor->isRunning()) {
+ if (this->running_ && processor->isRunning()) {
bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
if (processor->isYield()) {
// Honor the yield
- return processor->getYieldTime();
+ return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
} else if (shouldYield && this->bored_yield_duration_ > 0) {
// No work to do or need to apply back pressure
- return this->bored_yield_duration_;
+ return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(this->bored_yield_duration_));
}
- return processor->getSchedulingPeriodNano() / 1000000;
+ return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::nanoseconds(processor->getSchedulingPeriodNano())));
}
- return processor->getSchedulingPeriodNano() / 1000000;
+ return utils::TaskRescheduleInfo::Done();
}
} /* namespace minifi */
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index d3e579f..46d0794 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -268,6 +268,7 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
bool Processor::isWorkAvailable() {
// We have work if any incoming connection has work
+ std::lock_guard<std::mutex> lock(mutex_);
bool hasWork = false;
try {
diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp
new file mode 100644
index 0000000..039e136
--- /dev/null
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+ thread->is_running_ = true;
+ while (running_.load()) {
+ if (UNLIKELY(thread_reduction_count_ > 0)) {
+ if (--thread_reduction_count_ >= 0) {
+ deceased_thread_queue_.enqueue(thread);
+ thread->is_running_ = false;
+ break;
+ } else {
+ thread_reduction_count_++;
+ }
+ }
+
+ Worker<T> task;
+ if (worker_queue_.try_dequeue(task)) {
+ {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+ if (task.run()) {
+ if (task.getNextExecutionTime() <= std::chrono::steady_clock::now()) {
+ // it can be rescheduled again as soon as there is a worker available
+ worker_queue_.enqueue(std::move(task));
+ continue;
+ }
+ // Task will be put to the delayed queue as next exec time is in the future
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ bool need_to_notify =
+ delayed_worker_queue_.empty() ||
+ task.getNextExecutionTime() < delayed_worker_queue_.top().getNextExecutionTime();
+
+ delayed_worker_queue_.push(std::move(task));
+ if (need_to_notify) {
+ delayed_task_available_.notify_all();
+ }
+ }
+ } else {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ tasks_available_.wait(lock);
+ }
+ }
+ current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+ while (running_) {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+ // Put the tasks ready to run in the worker queue
+ while (!delayed_worker_queue_.empty() &&
+ delayed_worker_queue_.top().getNextExecutionTime() <= std::chrono::steady_clock::now()) {
+ // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+ Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+ delayed_worker_queue_.pop();
+ worker_queue_.enqueue(std::move(task));
+ tasks_available_.notify_one();
+ }
+ if (delayed_worker_queue_.empty()) {
+ delayed_task_available_.wait(lock);
+ } else {
+ auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(
+ delayed_worker_queue_.top().getNextExecutionTime() - std::chrono::steady_clock::now());
+ delayed_task_available_.wait_for(lock, (std::max)(wait_time, std::chrono::milliseconds(1)));
+ }
+ }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+ {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ task_status_[task.getIdentifier()] = true;
+ }
+ future = std::move(task.getPromise()->get_future());
+ bool enqueued = worker_queue_.enqueue(std::move(task));
+ if (running_) {
+ tasks_available_.notify_one();
+ }
+
+ task_count_++;
+
+ return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+ for (int i = 0; i < max_worker_threads_; i++) {
+ std::stringstream thread_name;
+ thread_name << name_ << " #" << i;
+ auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+ worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+ thread_queue_.push_back(worker_thread);
+ current_workers_++;
+ }
+
+ if (daemon_threads_) {
+ for (auto &thread : thread_queue_) {
+ thread->thread_.detach();
+ }
+ }
+
+ if (nullptr != thread_manager_) {
+ while (running_) {
+ auto waitperiod = std::chrono::milliseconds(500);
+ {
+ std::unique_lock<std::recursive_mutex> lock(manager_mutex_, std::try_to_lock);
+ if (!lock.owns_lock()) {
+ // Threadpool is being stopped/started or config is being changed, better wait a bit
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ if (thread_manager_->isAboveMax(current_workers_)) {
+ auto max = thread_manager_->getMaxConcurrentTasks();
+ auto differential = current_workers_ - max;
+ thread_reduction_count_ += differential;
+ } else if (thread_manager_->shouldReduce()) {
+ if (current_workers_ > 1)
+ thread_reduction_count_++;
+ thread_manager_->reduce();
+ } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) { // increase slowly
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ auto worker_thread = std::make_shared<WorkerThread>();
+ worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+ if (daemon_threads_) {
+ worker_thread->thread_.detach();
+ }
+ thread_queue_.push_back(worker_thread);
+ current_workers_++;
+ }
+ std::shared_ptr<WorkerThread> thread_ref;
+ while (deceased_thread_queue_.try_dequeue(thread_ref)) {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (thread_ref->thread_.joinable())
+ thread_ref->thread_.join();
+ thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
+ }
+ }
+ std::this_thread::sleep_for(waitperiod);
+ }
+ } else {
+ for (auto &thread : thread_queue_) {
+ if (thread->thread_.joinable())
+ thread->thread_.join();
+ }
+ }
+}
+
+template<typename T>
+void ThreadPool<T>::start() {
+ if (nullptr != controller_service_provider_) {
+ auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
+ thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
+ } else {
+ thread_manager_ = nullptr;
+ }
+
+ std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+ if (!running_) {
+ running_ = true;
+ manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
+ if (worker_queue_.size_approx() > 0) {
+ tasks_available_.notify_all();
+ }
+
+ std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_);
+ delayed_scheduler_thread_ = std::thread(&ThreadPool<T>::manage_delayed_queue, this);
+ }
+}
+
+template<typename T>
+void ThreadPool<T>::stopTasks(const std::string &identifier) {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ task_status_[identifier] = false;
+}
+
+template<typename T>
+void ThreadPool<T>::shutdown() {
+ if (running_.load()) {
+ std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+ running_.store(false);
+
+ drain();
+
+ task_status_.clear();
+ if (manager_thread_.joinable()) {
+ manager_thread_.join();
+ }
+
+ delayed_task_available_.notify_all();
+ if (delayed_scheduler_thread_.joinable()) {
+ delayed_scheduler_thread_.join();
+ }
+
+ for (const auto &thread : thread_queue_) {
+ if (thread->thread_.joinable())
+ thread->thread_.join();
+ }
+
+ thread_queue_.clear();
+ current_workers_ = 0;
+ while (!delayed_worker_queue_.empty()) {
+ delayed_worker_queue_.pop();
+ }
+
+ while (worker_queue_.size_approx() > 0) {
+ Worker<T> task;
+ worker_queue_.try_dequeue(task);
+ }
+ }
+}
+
+template class utils::ThreadPool<utils::TaskRescheduleInfo>;
+template class utils::ThreadPool<int>;
+template class utils::ThreadPool<bool>;
+template class utils::ThreadPool<state::Update>;
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/test/resources/TestUpdateAttribute.yml b/libminifi/test/resources/TestUpdateAttribute.yml
index f741bfb..522484e 100644
--- a/libminifi/test/resources/TestUpdateAttribute.yml
+++ b/libminifi/test/resources/TestUpdateAttribute.yml
@@ -35,36 +35,30 @@ Processors:
id: 2438e3c8-015a-1000-79ca-83af40ec1992
class: org.apache.nifi.processors.standard.UpdateAttribute
max concurrent tasks: 1
- scheduling strategy: TIMER_DRIVEN
- scheduling period: 1 sec
+ scheduling strategy: EVENT_DRIVEN
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
- auto-terminated relationships list: failure
Properties:
route_condition_attr: true
- name: roa
id: 2438e3c8-015a-1000-79ca-83af40ec1993
class: org.apache.nifi.processors.standard.RouteOnAttribute
max concurrent tasks: 1
- scheduling strategy: TIMER_DRIVEN
- scheduling period: 1 sec
+ scheduling strategy: EVENT_DRIVEN
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
- auto-terminated relationships list: failure
Properties:
route_matched: ${route_condition_attr}
- name: up2
id: 2438e3c8-015a-1000-79ca-83af40ec1994
class: org.apache.nifi.processors.standard.UpdateAttribute
max concurrent tasks: 1
- scheduling strategy: TIMER_DRIVEN
- scheduling period: 1 sec
+ scheduling strategy: EVENT_DRIVEN
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
- auto-terminated relationships list: failure
Properties:
route_check_attr: good
variable_attribute: ${nifi.variable.test}
@@ -72,12 +66,12 @@ Processors:
id: 2438e3c8-015a-1000-79ca-83af40ec1995
class: org.apache.nifi.processors.standard.LogAttribute
max concurrent tasks: 1
- scheduling strategy: TIMER_DRIVEN
- scheduling period: 1 sec
+ scheduling strategy: EVENT_DRIVEN
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
- auto-terminated relationships list: success
+ auto-terminated relationships list:
+ - success
Properties:
Connections:
diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp
index 816ff63..806f95c 100644
--- a/libminifi/test/unit/BackTraceTests.cpp
+++ b/libminifi/test/unit/BackTraceTests.cpp
@@ -41,14 +41,14 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> {
~WorkerNumberExecutions() {
}
- virtual bool isFinished(const int &result) {
+ bool isFinished(const int &result) override {
if (result > 0 && ++runs < tasks) {
return false;
} else {
return true;
}
}
- virtual bool isCancelled(const int &result) {
+ bool isCancelled(const int &result) override {
return false;
}
@@ -56,9 +56,9 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> {
return runs;
}
- virtual int64_t wait_time() {
+ std::chrono::milliseconds wait_time() override {
// wait 50ms
- return 50;
+ return std::chrono::milliseconds(50);
}
protected:
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp
index 6849aa6..48301b1 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -41,14 +41,14 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> {
~WorkerNumberExecutions() {
}
- virtual bool isFinished(const int &result) {
+ bool isFinished(const int &result) override {
if (result > 0 && ++runs < tasks) {
return false;
} else {
return true;
}
}
- virtual bool isCancelled(const int &result) {
+ bool isCancelled(const int &result) override {
return false;
}
@@ -56,9 +56,9 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> {
return runs;
}
- virtual int64_t wait_time() {
+ std::chrono::milliseconds wait_time() override {
// wait 50ms
- return 50;
+ return std::chrono::milliseconds(50);
}
protected: