You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2023/01/30 10:17:18 UTC
[nifi-minifi-cpp] 02/03: MINIFICPP-2008 Differentiate successful onTriggers from throwing onTriggers in ScheduleAgents
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9f2040abc6d66602e82a2c1c0ac708ddfe9f6570
Author: Martin Zink <ma...@apache.org>
AuthorDate: Mon Jan 30 11:13:51 2023 +0100
MINIFICPP-2008 Differentiate successful onTriggers from throwing onTriggers in ScheduleAgents
Signed-off-by: Adam Debreceni <ad...@apache.org>
This closes #1474
---
libminifi/include/CronDrivenSchedulingAgent.h | 2 --
libminifi/include/EventDrivenSchedulingAgent.h | 25 +++--------------------
libminifi/include/SchedulingAgent.h | 18 ++++++-----------
libminifi/include/ThreadedSchedulingAgent.h | 26 +++---------------------
libminifi/include/TimerDrivenSchedulingAgent.h | 27 +++----------------------
libminifi/src/CronDrivenSchedulingAgent.cpp | 17 +++++++---------
libminifi/src/EventDrivenSchedulingAgent.cpp | 16 +++------------
libminifi/src/SchedulingAgent.cpp | 28 ++++++++++++++++----------
libminifi/src/TimerDrivenSchedulingAgent.cpp | 23 +++++----------------
9 files changed, 47 insertions(+), 135 deletions(-)
diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h
index 3595f302f..88ee42e6b 100644
--- a/libminifi/include/CronDrivenSchedulingAgent.h
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -45,8 +45,6 @@ class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent {
: ThreadedSchedulingAgent(controller_service_provider, std::move(repo), std::move(flow_repo), std::move(content_repo), std::move(configuration), thread_pool) {
}
- CronDrivenSchedulingAgent(const CronDrivenSchedulingAgent& parent) = delete;
- CronDrivenSchedulingAgent& operator=(const CronDrivenSchedulingAgent& parent) = delete;
~CronDrivenSchedulingAgent() override = default;
utils::TaskRescheduleInfo run(core::Processor *processor,
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index e9439a925..d017da38c 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -17,8 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
+#pragma once
#include <memory>
#include <string>
@@ -31,18 +30,10 @@
#include "core/ProcessSessionFactory.h"
#include "ThreadedSchedulingAgent.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
-// EventDrivenSchedulingAgent Class
class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
public:
- // Constructor
- /*!
- * Create a new event driven scheduling agent.
- */
EventDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
@@ -56,21 +47,11 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
void schedule(core::Processor* processor) override;
- // Run function for the thread
utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
private:
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent);
- EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent);
-
std::chrono::milliseconds time_slice_;
};
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-#endif // LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 24992c928..968d03860 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -30,6 +30,7 @@
#include <algorithm>
#include <thread>
#include "utils/CallBackTimer.h"
+#include "utils/expected.h"
#include "utils/Monitors.h"
#include "utils/TimeUtil.h"
#include "utils/ThreadPool.h"
@@ -49,7 +50,6 @@
namespace org::apache::nifi::minifi {
-// SchedulingAgent Class
class SchedulingAgent {
public:
// Constructor
@@ -87,14 +87,15 @@ class SchedulingAgent {
logger_->log_trace("Destroying scheduling agent");
}
- // onTrigger, return whether the yield is need
- bool onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
- // start
+ nonstd::expected<void, std::exception_ptr> onTrigger(core::Processor* processor,
+ const std::shared_ptr<core::ProcessContext>& process_context,
+ const std::shared_ptr<core::ProcessSessionFactory>& session_factory);
+
void start() {
running_ = true;
thread_pool_.start();
}
- // stop
+
virtual void stop() {
running_ = false;
}
@@ -110,13 +111,9 @@ class SchedulingAgent {
SchedulingAgent &operator=(const SchedulingAgent &parent) = delete;
protected:
- // Mutex for protection
std::mutex mutex_;
- // Whether it is running
std::atomic<bool> running_;
- // AdministrativeYieldDuration
std::chrono::milliseconds admin_yield_duration_;
- // BoredYieldDuration
std::chrono::milliseconds bored_yield_duration_;
std::shared_ptr<Configure> configure_;
@@ -126,9 +123,7 @@ class SchedulingAgent {
std::shared_ptr<core::Repository> flow_repo_;
std::shared_ptr<core::ContentRepository> content_repo_;
- // thread pool for components.
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
- // controller service provider reference
gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider_;
private:
@@ -148,7 +143,6 @@ class SchedulingAgent {
}
};
- // Logger
std::shared_ptr<core::logging::Logger> logger_;
mutable std::mutex watchdog_mtx_; // used to protect the set below
std::set<SchedulingInfo> scheduled_processors_; // set was chosen to avoid iterator invalidation
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index c53249096..e51279154 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -17,8 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
+#pragma once
#include <memory>
#include <set>
@@ -31,10 +30,7 @@
#include "core/ProcessContext.h"
#include "SchedulingAgent.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
/**
* An abstract scheduling agent which creates and manages a pool of threads for
@@ -42,42 +38,26 @@ namespace minifi {
*/
class ThreadedSchedulingAgent : public SchedulingAgent {
public:
- // Constructor
- /*!
- * Create a new threaded scheduling agent.
- */
ThreadedSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo,
std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
: SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
}
- // Destructor
~ThreadedSchedulingAgent() override = default;
- // Run function for the thread
virtual utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) = 0;
public:
- // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
void schedule(core::Processor* processor) override;
- // unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
void unschedule(core::Processor* processor) override;
void stop() override;
private:
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
- ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger();
std::set<utils::Identifier> processors_running_; // Set just for easy usage
};
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-#endif // LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index b4322a1f3..9e74fec2d 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -17,8 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
+#pragma once
#include <memory>
@@ -28,40 +27,20 @@
#include "core/Repository.h"
#include "ThreadedSchedulingAgent.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-// TimerDrivenSchedulingAgent Class
+namespace org::apache::nifi::minifi {
class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
public:
- // Constructor
- /*!
- * Create a new processor
- */
TimerDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
: ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure, thread_pool) {
}
- /**
- * Run function that accepts the processor, context and session factory.
- */
utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
private:
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
- TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent);
-
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TimerDrivenSchedulingAgent>::getLogger();
};
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-#endif // LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp
index 3ad94f1ea..0e12bd3ae 100644
--- a/libminifi/src/CronDrivenSchedulingAgent.cpp
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -52,19 +52,16 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(core::Processor* proces
if (*next_to_last_trigger > current_time.get_local_time())
return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_to_last_trigger-current_time.get_local_time()));
- last_exec_[uuid] = current_time.get_local_time();
- bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+ auto on_trigger_result = this->onTrigger(processor, processContext, sessionFactory);
- if (processor->isYield()) {
+ if (on_trigger_result)
+ last_exec_[uuid] = current_time.get_local_time();
+
+ if (processor->isYield())
return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
- } else if (shouldYield && this->bored_yield_duration_ > 0ms) {
- return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_);
- }
- auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time());
- if (!next_trigger)
- return utils::TaskRescheduleInfo::Done();
- return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time()));
+ if (auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time()))
+ return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time()));
}
return utils::TaskRescheduleInfo::Done();
}
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 10edd6405..3a23ba839 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -26,10 +26,7 @@
using namespace std::literals::chrono_literals;
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
void EventDrivenSchedulingAgent::schedule(core::Processor* processor) {
if (!processor->hasIncomingConnections()) {
@@ -44,13 +41,9 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce
auto start_time = std::chrono::steady_clock::now();
// trigger processor until it has work to do, but no more than half a sec
while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) {
- bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+ this->onTrigger(processor, processContext, sessionFactory);
if (processor->isYield()) {
- // Honor the yield
return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
- } else if (shouldYield) {
- // No work to do or need to apply back pressure
- return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_ > 0ms ? this->bored_yield_duration_ : 10ms); // No work left to do, stand by
}
}
return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available
@@ -58,7 +51,4 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce
return utils::TaskRescheduleInfo::Done();
}
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 9dfca5c58..2f8ccdb6b 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -25,6 +25,8 @@
#include "core/Processor.h"
#include "utils/gsl.h"
+using namespace std::literals::chrono_literals;
+
namespace {
bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
// Whether it has work to do
@@ -34,25 +36,28 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
namespace org::apache::nifi::minifi {
-bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
- const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Processor* processor,
+ const std::shared_ptr<core::ProcessContext> &process_context,
+ const std::shared_ptr<core::ProcessSessionFactory> &session_factory) {
gsl_Expects(processor);
if (processor->isYield()) {
logger_->log_debug("Not running %s since it must yield", processor->getName());
- return false;
+ return {};
}
// No need to yield, reset yield expiration to 0
processor->clearYield();
+ auto bored_yield_duration = bored_yield_duration_ > 0ms ? bored_yield_duration_ : 10ms;
+
if (!hasWorkToDo(processor)) {
- // No work to do, yield
- return true;
+ processor->yield(bored_yield_duration);
+ return {};
}
if (processor->isThrottledByBackpressure()) {
logger_->log_debug("backpressure applied because too much outgoing for %s %s", processor->getUUIDStr(), processor->getName());
- // need to apply backpressure
- return true;
+ processor->yield(bored_yield_duration);
+ return {};
}
auto schedule_it = scheduled_processors_.end();
@@ -68,20 +73,21 @@ bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_pt
});
processor->incrementActiveTasks();
+ auto decrement_task = gsl::finally([processor]() { processor->decrementActiveTask(); });
try {
- processor->onTrigger(processContext, sessionFactory);
+ processor->onTrigger(process_context, session_factory);
} catch (const std::exception& exception) {
logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor %s (uuid: %s), type: %s, what: %s",
processor->getName(), processor->getUUIDStr(), typeid(exception).name(), exception.what());
processor->yield(admin_yield_duration_);
+ return nonstd::make_unexpected(std::current_exception());
} catch (...) {
logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor %s (uuid: %s), type: %s",
processor->getName(), processor->getUUIDStr(), getCurrentExceptionTypeName());
processor->yield(admin_yield_duration_);
+ return nonstd::make_unexpected(std::current_exception());
}
- processor->decrementActiveTask();
-
- return false;
+ return {};
}
void SchedulingAgent::watchDogFunc() {
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 7864a7945..2b4072170 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -19,35 +19,22 @@
*/
#include "TimerDrivenSchedulingAgent.h"
#include <chrono>
-#include <thread>
#include <memory>
-#include <iostream>
-#include "core/Property.h"
using namespace std::literals::chrono_literals;
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
if (this->running_ && processor->isRunning()) {
- bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
- if (processor->isYield()) {
- // Honor the yield
+ this->onTrigger(processor, processContext, sessionFactory);
+ if (processor->isYield())
return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
- } else if (shouldYield && this->bored_yield_duration_ > 0ms) {
- // No work to do or need to apply back pressure
- return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_);
- }
+
return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano()));
}
return utils::TaskRescheduleInfo::Done();
}
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi