You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2023/01/30 10:17:18 UTC

[nifi-minifi-cpp] 02/03: MINIFICPP-2008 Differentiate successful onTriggers from throwing onTriggers in ScheduleAgents

This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 9f2040abc6d66602e82a2c1c0ac708ddfe9f6570
Author: Martin Zink <ma...@apache.org>
AuthorDate: Mon Jan 30 11:13:51 2023 +0100

    MINIFICPP-2008 Differentiate successful onTriggers from throwing onTriggers in ScheduleAgents
    
    Signed-off-by: Adam Debreceni <ad...@apache.org>
    
    This closes #1474
---
 libminifi/include/CronDrivenSchedulingAgent.h  |  2 --
 libminifi/include/EventDrivenSchedulingAgent.h | 25 +++--------------------
 libminifi/include/SchedulingAgent.h            | 18 ++++++-----------
 libminifi/include/ThreadedSchedulingAgent.h    | 26 +++---------------------
 libminifi/include/TimerDrivenSchedulingAgent.h | 27 +++----------------------
 libminifi/src/CronDrivenSchedulingAgent.cpp    | 17 +++++++---------
 libminifi/src/EventDrivenSchedulingAgent.cpp   | 16 +++------------
 libminifi/src/SchedulingAgent.cpp              | 28 ++++++++++++++++----------
 libminifi/src/TimerDrivenSchedulingAgent.cpp   | 23 +++++----------------
 9 files changed, 47 insertions(+), 135 deletions(-)

diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h
index 3595f302f..88ee42e6b 100644
--- a/libminifi/include/CronDrivenSchedulingAgent.h
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -45,8 +45,6 @@ class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent {
       : ThreadedSchedulingAgent(controller_service_provider, std::move(repo), std::move(flow_repo), std::move(content_repo), std::move(configuration), thread_pool) {
   }
 
-  CronDrivenSchedulingAgent(const CronDrivenSchedulingAgent& parent) = delete;
-  CronDrivenSchedulingAgent& operator=(const CronDrivenSchedulingAgent& parent) = delete;
   ~CronDrivenSchedulingAgent() override = default;
 
   utils::TaskRescheduleInfo run(core::Processor *processor,
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index e9439a925..d017da38c 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
+#pragma once
 
 #include <memory>
 #include <string>
@@ -31,18 +30,10 @@
 #include "core/ProcessSessionFactory.h"
 #include "ThreadedSchedulingAgent.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
-// EventDrivenSchedulingAgent Class
 class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
  public:
-  // Constructor
-  /*!
-   * Create a new event driven scheduling agent.
-   */
   EventDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
                              std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
                              utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
@@ -56,21 +47,11 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
 
   void schedule(core::Processor* processor) override;
 
-  // Run function for the thread
   utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
       const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
  private:
-  // Prevent default copy constructor and assignment operation
-  // Only support pass by reference or pointer
-  EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent);
-  EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent);
-
   std::chrono::milliseconds time_slice_;
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-#endif  // LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 24992c928..968d03860 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -30,6 +30,7 @@
 #include <algorithm>
 #include <thread>
 #include "utils/CallBackTimer.h"
+#include "utils/expected.h"
 #include "utils/Monitors.h"
 #include "utils/TimeUtil.h"
 #include "utils/ThreadPool.h"
@@ -49,7 +50,6 @@
 
 namespace org::apache::nifi::minifi {
 
-// SchedulingAgent Class
 class SchedulingAgent {
  public:
   // Constructor
@@ -87,14 +87,15 @@ class SchedulingAgent {
     logger_->log_trace("Destroying scheduling agent");
   }
 
-  // onTrigger, return whether the yield is need
-  bool onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
-  // start
+  nonstd::expected<void, std::exception_ptr> onTrigger(core::Processor* processor,
+      const std::shared_ptr<core::ProcessContext>& process_context,
+      const std::shared_ptr<core::ProcessSessionFactory>& session_factory);
+
   void start() {
     running_ = true;
     thread_pool_.start();
   }
-  // stop
+
   virtual void stop() {
     running_ = false;
   }
@@ -110,13 +111,9 @@ class SchedulingAgent {
   SchedulingAgent &operator=(const SchedulingAgent &parent) = delete;
 
  protected:
-  // Mutex for protection
   std::mutex mutex_;
-  // Whether it is running
   std::atomic<bool> running_;
-  // AdministrativeYieldDuration
   std::chrono::milliseconds admin_yield_duration_;
-  // BoredYieldDuration
   std::chrono::milliseconds bored_yield_duration_;
 
   std::shared_ptr<Configure> configure_;
@@ -126,9 +123,7 @@ class SchedulingAgent {
   std::shared_ptr<core::Repository> flow_repo_;
 
   std::shared_ptr<core::ContentRepository> content_repo_;
-  // thread pool for components.
   utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
-  // controller service provider reference
   gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider_;
 
  private:
@@ -148,7 +143,6 @@ class SchedulingAgent {
     }
   };
 
-  // Logger
   std::shared_ptr<core::logging::Logger> logger_;
   mutable std::mutex watchdog_mtx_;  // used to protect the set below
   std::set<SchedulingInfo> scheduled_processors_;  // set was chosen to avoid iterator invalidation
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index c53249096..e51279154 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
+#pragma once
 
 #include <memory>
 #include <set>
@@ -31,10 +30,7 @@
 #include "core/ProcessContext.h"
 #include "SchedulingAgent.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 /**
  * An abstract scheduling agent which creates and manages a pool of threads for
@@ -42,42 +38,26 @@ namespace minifi {
  */
 class ThreadedSchedulingAgent : public SchedulingAgent {
  public:
-  // Constructor
-  /*!
-   * Create a new threaded scheduling agent.
-   */
   ThreadedSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
         std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo,
         std::shared_ptr<Configure> configuration,  utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
       : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
   }
-  // Destructor
   ~ThreadedSchedulingAgent() override = default;
 
-  // Run function for the thread
   virtual utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
                        const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) = 0;
 
  public:
-  // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
   void schedule(core::Processor* processor) override;
-  // unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
   void unschedule(core::Processor* processor) override;
 
   void stop() override;
 
  private:
-  // Prevent default copy constructor and assignment operation
-  // Only support pass by reference or pointer
-  ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
-  ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger();
 
   std::set<utils::Identifier> processors_running_;  // Set just for easy usage
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-#endif  // LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index b4322a1f3..9e74fec2d 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
+#pragma once
 
 #include <memory>
 
@@ -28,40 +27,20 @@
 #include "core/Repository.h"
 #include "ThreadedSchedulingAgent.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-//  TimerDrivenSchedulingAgent Class
+namespace org::apache::nifi::minifi {
 class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
  public:
-  //  Constructor
-  /*!
-   * Create a new processor
-   */
   TimerDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
                              std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
                              utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
       : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure, thread_pool) {
   }
 
-  /**
-   * Run function that accepts the processor, context and session factory.
-   */
   utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
       const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
  private:
-  // Prevent default copy constructor and assignment operation
-  // Only support pass by reference or pointer
-  TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
-  TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent);
-
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TimerDrivenSchedulingAgent>::getLogger();
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-#endif  // LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp
index 3ad94f1ea..0e12bd3ae 100644
--- a/libminifi/src/CronDrivenSchedulingAgent.cpp
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -52,19 +52,16 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(core::Processor* proces
     if (*next_to_last_trigger > current_time.get_local_time())
       return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_to_last_trigger-current_time.get_local_time()));
 
-    last_exec_[uuid] = current_time.get_local_time();
-    bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+    auto on_trigger_result = this->onTrigger(processor, processContext, sessionFactory);
 
-    if (processor->isYield()) {
+    if (on_trigger_result)
+      last_exec_[uuid] = current_time.get_local_time();
+
+    if (processor->isYield())
       return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
-    } else if (shouldYield && this->bored_yield_duration_ > 0ms) {
-      return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_);
-    }
 
-    auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time());
-    if (!next_trigger)
-      return utils::TaskRescheduleInfo::Done();
-    return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time()));
+    if (auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time()))
+      return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time()));
   }
   return utils::TaskRescheduleInfo::Done();
 }
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 10edd6405..3a23ba839 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -26,10 +26,7 @@
 
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 void EventDrivenSchedulingAgent::schedule(core::Processor* processor) {
   if (!processor->hasIncomingConnections()) {
@@ -44,13 +41,9 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce
     auto start_time = std::chrono::steady_clock::now();
     // trigger processor until it has work to do, but no more than half a sec
     while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) {
-      bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+      this->onTrigger(processor, processContext, sessionFactory);
       if (processor->isYield()) {
-        // Honor the yield
         return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
-      } else if (shouldYield) {
-        // No work to do or need to apply back pressure
-        return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_ > 0ms ? this->bored_yield_duration_ : 10ms);  // No work left to do, stand by
       }
     }
     return utils::TaskRescheduleInfo::RetryImmediately();  // Let's continue work as soon as a thread is available
@@ -58,7 +51,4 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce
   return utils::TaskRescheduleInfo::Done();
 }
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 9dfca5c58..2f8ccdb6b 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -25,6 +25,8 @@
 #include "core/Processor.h"
 #include "utils/gsl.h"
 
+using namespace std::literals::chrono_literals;
+
 namespace {
 bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
   // Whether it has work to do
@@ -34,25 +36,28 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
 
 namespace org::apache::nifi::minifi {
 
-bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
-                                const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Processor* processor,
+    const std::shared_ptr<core::ProcessContext> &process_context,
+    const std::shared_ptr<core::ProcessSessionFactory> &session_factory) {
   gsl_Expects(processor);
   if (processor->isYield()) {
     logger_->log_debug("Not running %s since it must yield", processor->getName());
-    return false;
+    return {};
   }
 
   // No need to yield, reset yield expiration to 0
   processor->clearYield();
 
+  auto bored_yield_duration = bored_yield_duration_ > 0ms ? bored_yield_duration_ : 10ms;
+
   if (!hasWorkToDo(processor)) {
-    // No work to do, yield
-    return true;
+    processor->yield(bored_yield_duration);
+    return {};
   }
   if (processor->isThrottledByBackpressure()) {
     logger_->log_debug("backpressure applied because too much outgoing for %s %s", processor->getUUIDStr(), processor->getName());
-    // need to apply backpressure
-    return true;
+    processor->yield(bored_yield_duration);
+    return {};
   }
 
   auto schedule_it = scheduled_processors_.end();
@@ -68,20 +73,21 @@ bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_pt
   });
 
   processor->incrementActiveTasks();
+  auto decrement_task = gsl::finally([processor]() { processor->decrementActiveTask(); });
   try {
-    processor->onTrigger(processContext, sessionFactory);
+    processor->onTrigger(process_context, session_factory);
   } catch (const std::exception& exception) {
     logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor %s (uuid: %s), type: %s, what: %s",
         processor->getName(), processor->getUUIDStr(), typeid(exception).name(), exception.what());
     processor->yield(admin_yield_duration_);
+    return nonstd::make_unexpected(std::current_exception());
   } catch (...) {
     logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor %s (uuid: %s), type: %s",
         processor->getName(), processor->getUUIDStr(), getCurrentExceptionTypeName());
     processor->yield(admin_yield_duration_);
+    return nonstd::make_unexpected(std::current_exception());
   }
-  processor->decrementActiveTask();
-
-  return false;
+  return {};
 }
 
 void SchedulingAgent::watchDogFunc() {
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 7864a7945..2b4072170 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -19,35 +19,22 @@
  */
 #include "TimerDrivenSchedulingAgent.h"
 #include <chrono>
-#include <thread>
 #include <memory>
-#include <iostream>
-#include "core/Property.h"
 
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
-    bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
-    if (processor->isYield()) {
-      // Honor the yield
+    this->onTrigger(processor, processContext, sessionFactory);
+    if (processor->isYield())
       return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
-    } else if (shouldYield && this->bored_yield_duration_ > 0ms) {
-      // No work to do or need to apply back pressure
-      return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_);
-    }
+
     return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano()));
   }
   return utils::TaskRescheduleInfo::Done();
 }
 
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi