You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2023/01/30 10:17:16 UTC

[nifi-minifi-cpp] branch main updated (2dc589158 -> 169cd6822)

This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from 2dc589158 MINIFICPP-1983 Mount files in minifi test containers
     new 417756064 MINIFICPP-1983 Modify correct minifi.properties file for test containers
     new 9f2040abc MINIFICPP-2008 Differentiate successful onTriggers from throwing onTriggers in ScheduleAgents
     new 169cd6822 MINIFICPP-2036 Upgrade gcc to version 11

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 README.md                                          |  2 +-
 centos.sh                                          |  4 ++--
 docker/Dockerfile                                  |  3 ++-
 docker/centos/Dockerfile                           |  2 +-
 .../cluster/containers/MinifiContainer.py          |  7 +++---
 .../resources/minifi-c2-server-ssl/config.yml      |  8 +++----
 libminifi/include/CronDrivenSchedulingAgent.h      |  2 --
 libminifi/include/EventDrivenSchedulingAgent.h     | 25 +++----------------
 libminifi/include/SchedulingAgent.h                | 18 +++++---------
 libminifi/include/ThreadedSchedulingAgent.h        | 26 +++-----------------
 libminifi/include/TimerDrivenSchedulingAgent.h     | 27 +++------------------
 libminifi/src/CronDrivenSchedulingAgent.cpp        | 17 ++++++-------
 libminifi/src/EventDrivenSchedulingAgent.cpp       | 16 +++----------
 libminifi/src/SchedulingAgent.cpp                  | 28 +++++++++++++---------
 libminifi/src/TimerDrivenSchedulingAgent.cpp       | 23 ++++--------------
 15 files changed, 61 insertions(+), 147 deletions(-)


[nifi-minifi-cpp] 01/03: MINIFICPP-1983 Modify correct minifi.properties file for test containers

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 41775606425a9382c7733db066f2c5d732b93344
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Jan 30 11:11:17 2023 +0100

    MINIFICPP-1983 Modify correct minifi.properties file for test containers
    
    Signed-off-by: Adam Debreceni <ad...@apache.org>
    
    This closes #1496
---
 docker/test/integration/cluster/containers/MinifiContainer.py     | 7 ++++---
 docker/test/integration/resources/minifi-c2-server-ssl/config.yml | 8 ++++----
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py
index f5dde6a62..93d65ddda 100644
--- a/docker/test/integration/cluster/containers/MinifiContainer.py
+++ b/docker/test/integration/cluster/containers/MinifiContainer.py
@@ -65,7 +65,7 @@ class MinifiContainer(FlowContainer):
             config_file.write(test_flow_yaml.encode('utf-8'))
 
     def _create_properties(self):
-        properties_file_path = os.path.join(self.config_dir, 'minifi.properties')
+        properties_file_path = os.path.join(self.container_specific_config_dir, 'minifi.properties')
         with open(properties_file_path, 'a') as f:
             if self.options.enable_c2:
                 f.write("nifi.c2.enable=true\n")
@@ -82,6 +82,7 @@ class MinifiContainer(FlowContainer):
                 f.write("nifi.c2.rest.url.ack=https://minifi-c2-server:10090/c2/config/acknowledge\n")
                 f.write("nifi.c2.rest.ssl.context.service=SSLContextService\n")
                 f.write("nifi.c2.flow.base.url=https://minifi-c2-server:10090/c2/config/\n")
+                f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation\n")
                 f.write("nifi.c2.full.heartbeat=false\n")
                 f.write("nifi.c2.agent.class=minifi-test-class\n")
                 f.write("nifi.c2.agent.identifier=minifi-test-id\n")
@@ -99,9 +100,9 @@ class MinifiContainer(FlowContainer):
         self._create_config()
         self._create_properties()
 
-        self.vols[os.path.join(self.config_dir, 'minifi.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi.properties'), "mode": "rw"}
+        self.vols[os.path.join(self.container_specific_config_dir, 'minifi.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi.properties'), "mode": "rw"}
         self.vols[os.path.join(self.container_specific_config_dir, 'config.yml')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'config.yml'), "mode": "rw"}
-        self.vols[os.path.join(self.config_dir, 'minifi-log.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi-log.properties'), "mode": "rw"}
+        self.vols[os.path.join(self.container_specific_config_dir, 'minifi-log.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi-log.properties'), "mode": "rw"}
 
     def deploy(self):
         if not self.set_deployed():
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/config.yml b/docker/test/integration/resources/minifi-c2-server-ssl/config.yml
index d4c69534c..239541288 100644
--- a/docker/test/integration/resources/minifi-c2-server-ssl/config.yml
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/config.yml
@@ -33,11 +33,11 @@ Controller Services:
     class: SSLContextService
     Properties:
       Client Certificate:
-          - value: /tmp/minifi-c2-server-ssl/minifi-c2-client.crt
+        - value: /tmp/resources/minifi-c2-server-ssl/minifi-cpp-flow.crt
       Private Key:
-          - value: /tmp/minifi-c2-server-ssl/minifi-c2-client.key
+        - value: /tmp/resources/minifi-c2-server-ssl/minifi-cpp-flow.key
       Passphrase:
-          - value: abcdefgh
+        - value: abcdefgh
       CA Certificate:
-          - value: /tmp/minifi-c2-server-ssl/minifi-c2-server.crt
+        - value: /tmp/resources/minifi-c2-server-ssl/root-ca.pem
 Remote Process Groups: []


[nifi-minifi-cpp] 02/03: MINIFICPP-2008 Differentiate successful onTriggers from throwing onTriggers in ScheduleAgents

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 9f2040abc6d66602e82a2c1c0ac708ddfe9f6570
Author: Martin Zink <ma...@apache.org>
AuthorDate: Mon Jan 30 11:13:51 2023 +0100

    MINIFICPP-2008 Differentiate successful onTriggers from throwing onTriggers in ScheduleAgents
    
    Signed-off-by: Adam Debreceni <ad...@apache.org>
    
    This closes #1474
---
 libminifi/include/CronDrivenSchedulingAgent.h  |  2 --
 libminifi/include/EventDrivenSchedulingAgent.h | 25 +++--------------------
 libminifi/include/SchedulingAgent.h            | 18 ++++++-----------
 libminifi/include/ThreadedSchedulingAgent.h    | 26 +++---------------------
 libminifi/include/TimerDrivenSchedulingAgent.h | 27 +++----------------------
 libminifi/src/CronDrivenSchedulingAgent.cpp    | 17 +++++++---------
 libminifi/src/EventDrivenSchedulingAgent.cpp   | 16 +++------------
 libminifi/src/SchedulingAgent.cpp              | 28 ++++++++++++++++----------
 libminifi/src/TimerDrivenSchedulingAgent.cpp   | 23 +++++----------------
 9 files changed, 47 insertions(+), 135 deletions(-)

diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h
index 3595f302f..88ee42e6b 100644
--- a/libminifi/include/CronDrivenSchedulingAgent.h
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -45,8 +45,6 @@ class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent {
       : ThreadedSchedulingAgent(controller_service_provider, std::move(repo), std::move(flow_repo), std::move(content_repo), std::move(configuration), thread_pool) {
   }
 
-  CronDrivenSchedulingAgent(const CronDrivenSchedulingAgent& parent) = delete;
-  CronDrivenSchedulingAgent& operator=(const CronDrivenSchedulingAgent& parent) = delete;
   ~CronDrivenSchedulingAgent() override = default;
 
   utils::TaskRescheduleInfo run(core::Processor *processor,
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index e9439a925..d017da38c 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
+#pragma once
 
 #include <memory>
 #include <string>
@@ -31,18 +30,10 @@
 #include "core/ProcessSessionFactory.h"
 #include "ThreadedSchedulingAgent.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
-// EventDrivenSchedulingAgent Class
 class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
  public:
-  // Constructor
-  /*!
-   * Create a new event driven scheduling agent.
-   */
   EventDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
                              std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
                              utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
@@ -56,21 +47,11 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
 
   void schedule(core::Processor* processor) override;
 
-  // Run function for the thread
   utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
       const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
  private:
-  // Prevent default copy constructor and assignment operation
-  // Only support pass by reference or pointer
-  EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent);
-  EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent);
-
   std::chrono::milliseconds time_slice_;
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-#endif  // LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 24992c928..968d03860 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -30,6 +30,7 @@
 #include <algorithm>
 #include <thread>
 #include "utils/CallBackTimer.h"
+#include "utils/expected.h"
 #include "utils/Monitors.h"
 #include "utils/TimeUtil.h"
 #include "utils/ThreadPool.h"
@@ -49,7 +50,6 @@
 
 namespace org::apache::nifi::minifi {
 
-// SchedulingAgent Class
 class SchedulingAgent {
  public:
   // Constructor
@@ -87,14 +87,15 @@ class SchedulingAgent {
     logger_->log_trace("Destroying scheduling agent");
   }
 
-  // onTrigger, return whether the yield is need
-  bool onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
-  // start
+  nonstd::expected<void, std::exception_ptr> onTrigger(core::Processor* processor,
+      const std::shared_ptr<core::ProcessContext>& process_context,
+      const std::shared_ptr<core::ProcessSessionFactory>& session_factory);
+
   void start() {
     running_ = true;
     thread_pool_.start();
   }
-  // stop
+
   virtual void stop() {
     running_ = false;
   }
@@ -110,13 +111,9 @@ class SchedulingAgent {
   SchedulingAgent &operator=(const SchedulingAgent &parent) = delete;
 
  protected:
-  // Mutex for protection
   std::mutex mutex_;
-  // Whether it is running
   std::atomic<bool> running_;
-  // AdministrativeYieldDuration
   std::chrono::milliseconds admin_yield_duration_;
-  // BoredYieldDuration
   std::chrono::milliseconds bored_yield_duration_;
 
   std::shared_ptr<Configure> configure_;
@@ -126,9 +123,7 @@ class SchedulingAgent {
   std::shared_ptr<core::Repository> flow_repo_;
 
   std::shared_ptr<core::ContentRepository> content_repo_;
-  // thread pool for components.
   utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
-  // controller service provider reference
   gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider_;
 
  private:
@@ -148,7 +143,6 @@ class SchedulingAgent {
     }
   };
 
-  // Logger
   std::shared_ptr<core::logging::Logger> logger_;
   mutable std::mutex watchdog_mtx_;  // used to protect the set below
   std::set<SchedulingInfo> scheduled_processors_;  // set was chosen to avoid iterator invalidation
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index c53249096..e51279154 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
+#pragma once
 
 #include <memory>
 #include <set>
@@ -31,10 +30,7 @@
 #include "core/ProcessContext.h"
 #include "SchedulingAgent.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 /**
  * An abstract scheduling agent which creates and manages a pool of threads for
@@ -42,42 +38,26 @@ namespace minifi {
  */
 class ThreadedSchedulingAgent : public SchedulingAgent {
  public:
-  // Constructor
-  /*!
-   * Create a new threaded scheduling agent.
-   */
   ThreadedSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
         std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo,
         std::shared_ptr<Configure> configuration,  utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
       : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
   }
-  // Destructor
   ~ThreadedSchedulingAgent() override = default;
 
-  // Run function for the thread
   virtual utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
                        const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) = 0;
 
  public:
-  // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
   void schedule(core::Processor* processor) override;
-  // unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
   void unschedule(core::Processor* processor) override;
 
   void stop() override;
 
  private:
-  // Prevent default copy constructor and assignment operation
-  // Only support pass by reference or pointer
-  ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
-  ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger();
 
   std::set<utils::Identifier> processors_running_;  // Set just for easy usage
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-#endif  // LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index b4322a1f3..9e74fec2d 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
+#pragma once
 
 #include <memory>
 
@@ -28,40 +27,20 @@
 #include "core/Repository.h"
 #include "ThreadedSchedulingAgent.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-//  TimerDrivenSchedulingAgent Class
+namespace org::apache::nifi::minifi {
 class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
  public:
-  //  Constructor
-  /*!
-   * Create a new processor
-   */
   TimerDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
                              std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
                              utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
       : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure, thread_pool) {
   }
 
-  /**
-   * Run function that accepts the processor, context and session factory.
-   */
   utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
       const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
  private:
-  // Prevent default copy constructor and assignment operation
-  // Only support pass by reference or pointer
-  TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
-  TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent);
-
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TimerDrivenSchedulingAgent>::getLogger();
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-#endif  // LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp
index 3ad94f1ea..0e12bd3ae 100644
--- a/libminifi/src/CronDrivenSchedulingAgent.cpp
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -52,19 +52,16 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(core::Processor* proces
     if (*next_to_last_trigger > current_time.get_local_time())
       return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_to_last_trigger-current_time.get_local_time()));
 
-    last_exec_[uuid] = current_time.get_local_time();
-    bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+    auto on_trigger_result = this->onTrigger(processor, processContext, sessionFactory);
 
-    if (processor->isYield()) {
+    if (on_trigger_result)
+      last_exec_[uuid] = current_time.get_local_time();
+
+    if (processor->isYield())
       return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
-    } else if (shouldYield && this->bored_yield_duration_ > 0ms) {
-      return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_);
-    }
 
-    auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time());
-    if (!next_trigger)
-      return utils::TaskRescheduleInfo::Done();
-    return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time()));
+    if (auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time()))
+      return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time()));
   }
   return utils::TaskRescheduleInfo::Done();
 }
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 10edd6405..3a23ba839 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -26,10 +26,7 @@
 
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 void EventDrivenSchedulingAgent::schedule(core::Processor* processor) {
   if (!processor->hasIncomingConnections()) {
@@ -44,13 +41,9 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce
     auto start_time = std::chrono::steady_clock::now();
     // trigger processor until it has work to do, but no more than half a sec
     while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) {
-      bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+      this->onTrigger(processor, processContext, sessionFactory);
       if (processor->isYield()) {
-        // Honor the yield
         return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
-      } else if (shouldYield) {
-        // No work to do or need to apply back pressure
-        return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_ > 0ms ? this->bored_yield_duration_ : 10ms);  // No work left to do, stand by
       }
     }
     return utils::TaskRescheduleInfo::RetryImmediately();  // Let's continue work as soon as a thread is available
@@ -58,7 +51,4 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce
   return utils::TaskRescheduleInfo::Done();
 }
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 9dfca5c58..2f8ccdb6b 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -25,6 +25,8 @@
 #include "core/Processor.h"
 #include "utils/gsl.h"
 
+using namespace std::literals::chrono_literals;
+
 namespace {
 bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
   // Whether it has work to do
@@ -34,25 +36,28 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
 
 namespace org::apache::nifi::minifi {
 
-bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
-                                const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Processor* processor,
+    const std::shared_ptr<core::ProcessContext> &process_context,
+    const std::shared_ptr<core::ProcessSessionFactory> &session_factory) {
   gsl_Expects(processor);
   if (processor->isYield()) {
     logger_->log_debug("Not running %s since it must yield", processor->getName());
-    return false;
+    return {};
   }
 
   // No need to yield, reset yield expiration to 0
   processor->clearYield();
 
+  auto bored_yield_duration = bored_yield_duration_ > 0ms ? bored_yield_duration_ : 10ms;
+
   if (!hasWorkToDo(processor)) {
-    // No work to do, yield
-    return true;
+    processor->yield(bored_yield_duration);
+    return {};
   }
   if (processor->isThrottledByBackpressure()) {
     logger_->log_debug("backpressure applied because too much outgoing for %s %s", processor->getUUIDStr(), processor->getName());
-    // need to apply backpressure
-    return true;
+    processor->yield(bored_yield_duration);
+    return {};
   }
 
   auto schedule_it = scheduled_processors_.end();
@@ -68,20 +73,21 @@ bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_pt
   });
 
   processor->incrementActiveTasks();
+  auto decrement_task = gsl::finally([processor]() { processor->decrementActiveTask(); });
   try {
-    processor->onTrigger(processContext, sessionFactory);
+    processor->onTrigger(process_context, session_factory);
   } catch (const std::exception& exception) {
     logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor %s (uuid: %s), type: %s, what: %s",
         processor->getName(), processor->getUUIDStr(), typeid(exception).name(), exception.what());
     processor->yield(admin_yield_duration_);
+    return nonstd::make_unexpected(std::current_exception());
   } catch (...) {
     logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor %s (uuid: %s), type: %s",
         processor->getName(), processor->getUUIDStr(), getCurrentExceptionTypeName());
     processor->yield(admin_yield_duration_);
+    return nonstd::make_unexpected(std::current_exception());
   }
-  processor->decrementActiveTask();
-
-  return false;
+  return {};
 }
 
 void SchedulingAgent::watchDogFunc() {
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 7864a7945..2b4072170 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -19,35 +19,22 @@
  */
 #include "TimerDrivenSchedulingAgent.h"
 #include <chrono>
-#include <thread>
 #include <memory>
-#include <iostream>
-#include "core/Property.h"
 
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
-    bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
-    if (processor->isYield()) {
-      // Honor the yield
+    this->onTrigger(processor, processContext, sessionFactory);
+    if (processor->isYield())
       return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
-    } else if (shouldYield && this->bored_yield_duration_ > 0ms) {
-      // No work to do or need to apply back pressure
-      return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_);
-    }
+
     return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano()));
   }
   return utils::TaskRescheduleInfo::Done();
 }
 
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi


[nifi-minifi-cpp] 03/03: MINIFICPP-2036 Upgrade gcc to version 11

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 169cd68226a69c9815abedc174cc6713ba9bc957
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Mon Jan 30 11:15:38 2023 +0100

    MINIFICPP-2036 Upgrade gcc to version 11
    
      - on centos: use devtoolset-11 instead of devtoolset-10
      - on alpine: use version 3.16 instead of 3.13
      - on both: do not install the boost package, as it is no longer required
    
    Signed-off-by: Adam Debreceni <ad...@apache.org>
    
    This closes #1495
---
 README.md                | 2 +-
 centos.sh                | 4 ++--
 docker/Dockerfile        | 3 ++-
 docker/centos/Dockerfile | 2 +-
 4 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/README.md b/README.md
index 096cea87d..9a19458ea 100644
--- a/README.md
+++ b/README.md
@@ -117,7 +117,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension
 
 #### Utilities
 * CMake 3.17 or greater
-* gcc 10 or greater
+* gcc 11 or greater
 * bison 3.0.x+ (3.2 has been shown to fail builds)
 * flex 2.6 or greater
 
diff --git a/centos.sh b/centos.sh
index a01482273..279275604 100644
--- a/centos.sh
+++ b/centos.sh
@@ -18,8 +18,8 @@
 
 get_toolset_name() {
     case "$OS_MAJOR" in
-        7) TOOLSET_NAME=devtoolset-10 ;;
-        8) TOOLSET_NAME=gcc-toolset-10 ;;
+        7) TOOLSET_NAME=devtoolset-11 ;;
+        8) TOOLSET_NAME=gcc-toolset-12 ;;
         9) TOOLSET_NAME=gcc-toolset-12 ;;
     esac
 }
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 58bf2a3a6..d5bdb0117 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -16,7 +16,7 @@
 # under the License.
 #
 
-ARG BASE_ALPINE_IMAGE="alpine:3.13"
+ARG BASE_ALPINE_IMAGE="alpine:3.16"
 
 # Build image
 FROM ${BASE_ALPINE_IMAGE} AS build
@@ -36,6 +36,7 @@ ARG DOCKER_SKIP_TESTS=ON
 RUN apk --no-cache add gcc \
   g++ \
   make \
+  bash \
   bison \
   flex \
   flex-dev \
diff --git a/docker/centos/Dockerfile b/docker/centos/Dockerfile
index e426acfda..7b8307bf4 100644
--- a/docker/centos/Dockerfile
+++ b/docker/centos/Dockerfile
@@ -57,7 +57,7 @@ USER ${USER}
 # Perform the build
 RUN cd $MINIFI_BASE_DIR && \
     cd build && \
-    source /opt/rh/devtoolset-10/enable && \
+    source /opt/rh/devtoolset-11/enable && \
     export PATH=/usr/lib64/ccache${PATH:+:${PATH}} && \
     cmake3 -DSTATIC_BUILD= -DSKIP_TESTS=${DOCKER_SKIP_TESTS} ${MINIFI_OPTIONS} -DAWS_ENABLE_UNITY_BUILD=OFF -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. && \
     make -j "$(nproc)" package