You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2023/01/30 10:17:16 UTC
[nifi-minifi-cpp] branch main updated (2dc589158 -> 169cd6822)
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
from 2dc589158 MINIFICPP-1983 Mount files in minifi test containers
new 417756064 MINIFICPP-1983 Modify correct minifi.properties file for test containers
new 9f2040abc MINIFICPP-2008 Differentiate successful onTriggers from throwing onTriggers in ScheduleAgents
new 169cd6822 MINIFICPP-2036 Upgrade gcc to version 11
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
README.md | 2 +-
centos.sh | 4 ++--
docker/Dockerfile | 3 ++-
docker/centos/Dockerfile | 2 +-
.../cluster/containers/MinifiContainer.py | 7 +++---
.../resources/minifi-c2-server-ssl/config.yml | 8 +++----
libminifi/include/CronDrivenSchedulingAgent.h | 2 --
libminifi/include/EventDrivenSchedulingAgent.h | 25 +++----------------
libminifi/include/SchedulingAgent.h | 18 +++++---------
libminifi/include/ThreadedSchedulingAgent.h | 26 +++-----------------
libminifi/include/TimerDrivenSchedulingAgent.h | 27 +++------------------
libminifi/src/CronDrivenSchedulingAgent.cpp | 17 ++++++-------
libminifi/src/EventDrivenSchedulingAgent.cpp | 16 +++----------
libminifi/src/SchedulingAgent.cpp | 28 +++++++++++++---------
libminifi/src/TimerDrivenSchedulingAgent.cpp | 23 ++++--------------
15 files changed, 61 insertions(+), 147 deletions(-)
[nifi-minifi-cpp] 01/03: MINIFICPP-1983 Modify correct minifi.properties file for test containers
Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 41775606425a9382c7733db066f2c5d732b93344
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Jan 30 11:11:17 2023 +0100
MINIFICPP-1983 Modify correct minifi.properties file for test containers
Signed-off-by: Adam Debreceni <ad...@apache.org>
This closes #1496
---
docker/test/integration/cluster/containers/MinifiContainer.py | 7 ++++---
docker/test/integration/resources/minifi-c2-server-ssl/config.yml | 8 ++++----
2 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py
index f5dde6a62..93d65ddda 100644
--- a/docker/test/integration/cluster/containers/MinifiContainer.py
+++ b/docker/test/integration/cluster/containers/MinifiContainer.py
@@ -65,7 +65,7 @@ class MinifiContainer(FlowContainer):
config_file.write(test_flow_yaml.encode('utf-8'))
def _create_properties(self):
- properties_file_path = os.path.join(self.config_dir, 'minifi.properties')
+ properties_file_path = os.path.join(self.container_specific_config_dir, 'minifi.properties')
with open(properties_file_path, 'a') as f:
if self.options.enable_c2:
f.write("nifi.c2.enable=true\n")
@@ -82,6 +82,7 @@ class MinifiContainer(FlowContainer):
f.write("nifi.c2.rest.url.ack=https://minifi-c2-server:10090/c2/config/acknowledge\n")
f.write("nifi.c2.rest.ssl.context.service=SSLContextService\n")
f.write("nifi.c2.flow.base.url=https://minifi-c2-server:10090/c2/config/\n")
+ f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation\n")
f.write("nifi.c2.full.heartbeat=false\n")
f.write("nifi.c2.agent.class=minifi-test-class\n")
f.write("nifi.c2.agent.identifier=minifi-test-id\n")
@@ -99,9 +100,9 @@ class MinifiContainer(FlowContainer):
self._create_config()
self._create_properties()
- self.vols[os.path.join(self.config_dir, 'minifi.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi.properties'), "mode": "rw"}
+ self.vols[os.path.join(self.container_specific_config_dir, 'minifi.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi.properties'), "mode": "rw"}
self.vols[os.path.join(self.container_specific_config_dir, 'config.yml')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'config.yml'), "mode": "rw"}
- self.vols[os.path.join(self.config_dir, 'minifi-log.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi-log.properties'), "mode": "rw"}
+ self.vols[os.path.join(self.container_specific_config_dir, 'minifi-log.properties')] = {"bind": os.path.join(MinifiContainer.MINIFI_ROOT, 'conf', 'minifi-log.properties'), "mode": "rw"}
def deploy(self):
if not self.set_deployed():
diff --git a/docker/test/integration/resources/minifi-c2-server-ssl/config.yml b/docker/test/integration/resources/minifi-c2-server-ssl/config.yml
index d4c69534c..239541288 100644
--- a/docker/test/integration/resources/minifi-c2-server-ssl/config.yml
+++ b/docker/test/integration/resources/minifi-c2-server-ssl/config.yml
@@ -33,11 +33,11 @@ Controller Services:
class: SSLContextService
Properties:
Client Certificate:
- - value: /tmp/minifi-c2-server-ssl/minifi-c2-client.crt
+ - value: /tmp/resources/minifi-c2-server-ssl/minifi-cpp-flow.crt
Private Key:
- - value: /tmp/minifi-c2-server-ssl/minifi-c2-client.key
+ - value: /tmp/resources/minifi-c2-server-ssl/minifi-cpp-flow.key
Passphrase:
- - value: abcdefgh
+ - value: abcdefgh
CA Certificate:
- - value: /tmp/minifi-c2-server-ssl/minifi-c2-server.crt
+ - value: /tmp/resources/minifi-c2-server-ssl/root-ca.pem
Remote Process Groups: []
[nifi-minifi-cpp] 02/03: MINIFICPP-2008 Differentiate successful onTriggers from throwing onTriggers in ScheduleAgents
Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9f2040abc6d66602e82a2c1c0ac708ddfe9f6570
Author: Martin Zink <ma...@apache.org>
AuthorDate: Mon Jan 30 11:13:51 2023 +0100
MINIFICPP-2008 Differentiate successful onTriggers from throwing onTriggers in ScheduleAgents
Signed-off-by: Adam Debreceni <ad...@apache.org>
This closes #1474
---
libminifi/include/CronDrivenSchedulingAgent.h | 2 --
libminifi/include/EventDrivenSchedulingAgent.h | 25 +++--------------------
libminifi/include/SchedulingAgent.h | 18 ++++++-----------
libminifi/include/ThreadedSchedulingAgent.h | 26 +++---------------------
libminifi/include/TimerDrivenSchedulingAgent.h | 27 +++----------------------
libminifi/src/CronDrivenSchedulingAgent.cpp | 17 +++++++---------
libminifi/src/EventDrivenSchedulingAgent.cpp | 16 +++------------
libminifi/src/SchedulingAgent.cpp | 28 ++++++++++++++++----------
libminifi/src/TimerDrivenSchedulingAgent.cpp | 23 +++++----------------
9 files changed, 47 insertions(+), 135 deletions(-)
diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h
index 3595f302f..88ee42e6b 100644
--- a/libminifi/include/CronDrivenSchedulingAgent.h
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -45,8 +45,6 @@ class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent {
: ThreadedSchedulingAgent(controller_service_provider, std::move(repo), std::move(flow_repo), std::move(content_repo), std::move(configuration), thread_pool) {
}
- CronDrivenSchedulingAgent(const CronDrivenSchedulingAgent& parent) = delete;
- CronDrivenSchedulingAgent& operator=(const CronDrivenSchedulingAgent& parent) = delete;
~CronDrivenSchedulingAgent() override = default;
utils::TaskRescheduleInfo run(core::Processor *processor,
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index e9439a925..d017da38c 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -17,8 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
+#pragma once
#include <memory>
#include <string>
@@ -31,18 +30,10 @@
#include "core/ProcessSessionFactory.h"
#include "ThreadedSchedulingAgent.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
-// EventDrivenSchedulingAgent Class
class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
public:
- // Constructor
- /*!
- * Create a new event driven scheduling agent.
- */
EventDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
@@ -56,21 +47,11 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
void schedule(core::Processor* processor) override;
- // Run function for the thread
utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
private:
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent);
- EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent);
-
std::chrono::milliseconds time_slice_;
};
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-#endif // LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 24992c928..968d03860 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -30,6 +30,7 @@
#include <algorithm>
#include <thread>
#include "utils/CallBackTimer.h"
+#include "utils/expected.h"
#include "utils/Monitors.h"
#include "utils/TimeUtil.h"
#include "utils/ThreadPool.h"
@@ -49,7 +50,6 @@
namespace org::apache::nifi::minifi {
-// SchedulingAgent Class
class SchedulingAgent {
public:
// Constructor
@@ -87,14 +87,15 @@ class SchedulingAgent {
logger_->log_trace("Destroying scheduling agent");
}
- // onTrigger, return whether the yield is need
- bool onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
- // start
+ nonstd::expected<void, std::exception_ptr> onTrigger(core::Processor* processor,
+ const std::shared_ptr<core::ProcessContext>& process_context,
+ const std::shared_ptr<core::ProcessSessionFactory>& session_factory);
+
void start() {
running_ = true;
thread_pool_.start();
}
- // stop
+
virtual void stop() {
running_ = false;
}
@@ -110,13 +111,9 @@ class SchedulingAgent {
SchedulingAgent &operator=(const SchedulingAgent &parent) = delete;
protected:
- // Mutex for protection
std::mutex mutex_;
- // Whether it is running
std::atomic<bool> running_;
- // AdministrativeYieldDuration
std::chrono::milliseconds admin_yield_duration_;
- // BoredYieldDuration
std::chrono::milliseconds bored_yield_duration_;
std::shared_ptr<Configure> configure_;
@@ -126,9 +123,7 @@ class SchedulingAgent {
std::shared_ptr<core::Repository> flow_repo_;
std::shared_ptr<core::ContentRepository> content_repo_;
- // thread pool for components.
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
- // controller service provider reference
gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider_;
private:
@@ -148,7 +143,6 @@ class SchedulingAgent {
}
};
- // Logger
std::shared_ptr<core::logging::Logger> logger_;
mutable std::mutex watchdog_mtx_; // used to protect the set below
std::set<SchedulingInfo> scheduled_processors_; // set was chosen to avoid iterator invalidation
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index c53249096..e51279154 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -17,8 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
+#pragma once
#include <memory>
#include <set>
@@ -31,10 +30,7 @@
#include "core/ProcessContext.h"
#include "SchedulingAgent.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
/**
* An abstract scheduling agent which creates and manages a pool of threads for
@@ -42,42 +38,26 @@ namespace minifi {
*/
class ThreadedSchedulingAgent : public SchedulingAgent {
public:
- // Constructor
- /*!
- * Create a new threaded scheduling agent.
- */
ThreadedSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo,
std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
: SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
}
- // Destructor
~ThreadedSchedulingAgent() override = default;
- // Run function for the thread
virtual utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) = 0;
public:
- // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
void schedule(core::Processor* processor) override;
- // unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
void unschedule(core::Processor* processor) override;
void stop() override;
private:
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
- ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger();
std::set<utils::Identifier> processors_running_; // Set just for easy usage
};
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-#endif // LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index b4322a1f3..9e74fec2d 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -17,8 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
-#define LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
+#pragma once
#include <memory>
@@ -28,40 +27,20 @@
#include "core/Repository.h"
#include "ThreadedSchedulingAgent.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-// TimerDrivenSchedulingAgent Class
+namespace org::apache::nifi::minifi {
class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
public:
- // Constructor
- /*!
- * Create a new processor
- */
TimerDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
: ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure, thread_pool) {
}
- /**
- * Run function that accepts the processor, context and session factory.
- */
utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
private:
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
- TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent);
-
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TimerDrivenSchedulingAgent>::getLogger();
};
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-#endif // LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp
index 3ad94f1ea..0e12bd3ae 100644
--- a/libminifi/src/CronDrivenSchedulingAgent.cpp
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -52,19 +52,16 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(core::Processor* proces
if (*next_to_last_trigger > current_time.get_local_time())
return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_to_last_trigger-current_time.get_local_time()));
- last_exec_[uuid] = current_time.get_local_time();
- bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+ auto on_trigger_result = this->onTrigger(processor, processContext, sessionFactory);
- if (processor->isYield()) {
+ if (on_trigger_result)
+ last_exec_[uuid] = current_time.get_local_time();
+
+ if (processor->isYield())
return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
- } else if (shouldYield && this->bored_yield_duration_ > 0ms) {
- return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_);
- }
- auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time());
- if (!next_trigger)
- return utils::TaskRescheduleInfo::Done();
- return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time()));
+ if (auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time()))
+ return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time()));
}
return utils::TaskRescheduleInfo::Done();
}
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 10edd6405..3a23ba839 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -26,10 +26,7 @@
using namespace std::literals::chrono_literals;
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
void EventDrivenSchedulingAgent::schedule(core::Processor* processor) {
if (!processor->hasIncomingConnections()) {
@@ -44,13 +41,9 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce
auto start_time = std::chrono::steady_clock::now();
// trigger processor until it has work to do, but no more than half a sec
while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) {
- bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+ this->onTrigger(processor, processContext, sessionFactory);
if (processor->isYield()) {
- // Honor the yield
return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
- } else if (shouldYield) {
- // No work to do or need to apply back pressure
- return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_ > 0ms ? this->bored_yield_duration_ : 10ms); // No work left to do, stand by
}
}
return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available
@@ -58,7 +51,4 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce
return utils::TaskRescheduleInfo::Done();
}
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 9dfca5c58..2f8ccdb6b 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -25,6 +25,8 @@
#include "core/Processor.h"
#include "utils/gsl.h"
+using namespace std::literals::chrono_literals;
+
namespace {
bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
// Whether it has work to do
@@ -34,25 +36,28 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
namespace org::apache::nifi::minifi {
-bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
- const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Processor* processor,
+ const std::shared_ptr<core::ProcessContext> &process_context,
+ const std::shared_ptr<core::ProcessSessionFactory> &session_factory) {
gsl_Expects(processor);
if (processor->isYield()) {
logger_->log_debug("Not running %s since it must yield", processor->getName());
- return false;
+ return {};
}
// No need to yield, reset yield expiration to 0
processor->clearYield();
+ auto bored_yield_duration = bored_yield_duration_ > 0ms ? bored_yield_duration_ : 10ms;
+
if (!hasWorkToDo(processor)) {
- // No work to do, yield
- return true;
+ processor->yield(bored_yield_duration);
+ return {};
}
if (processor->isThrottledByBackpressure()) {
logger_->log_debug("backpressure applied because too much outgoing for %s %s", processor->getUUIDStr(), processor->getName());
- // need to apply backpressure
- return true;
+ processor->yield(bored_yield_duration);
+ return {};
}
auto schedule_it = scheduled_processors_.end();
@@ -68,20 +73,21 @@ bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_pt
});
processor->incrementActiveTasks();
+ auto decrement_task = gsl::finally([processor]() { processor->decrementActiveTask(); });
try {
- processor->onTrigger(processContext, sessionFactory);
+ processor->onTrigger(process_context, session_factory);
} catch (const std::exception& exception) {
logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor %s (uuid: %s), type: %s, what: %s",
processor->getName(), processor->getUUIDStr(), typeid(exception).name(), exception.what());
processor->yield(admin_yield_duration_);
+ return nonstd::make_unexpected(std::current_exception());
} catch (...) {
logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor %s (uuid: %s), type: %s",
processor->getName(), processor->getUUIDStr(), getCurrentExceptionTypeName());
processor->yield(admin_yield_duration_);
+ return nonstd::make_unexpected(std::current_exception());
}
- processor->decrementActiveTask();
-
- return false;
+ return {};
}
void SchedulingAgent::watchDogFunc() {
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 7864a7945..2b4072170 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -19,35 +19,22 @@
*/
#include "TimerDrivenSchedulingAgent.h"
#include <chrono>
-#include <thread>
#include <memory>
-#include <iostream>
-#include "core/Property.h"
using namespace std::literals::chrono_literals;
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
if (this->running_ && processor->isRunning()) {
- bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
- if (processor->isYield()) {
- // Honor the yield
+ this->onTrigger(processor, processContext, sessionFactory);
+ if (processor->isYield())
return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
- } else if (shouldYield && this->bored_yield_duration_ > 0ms) {
- // No work to do or need to apply back pressure
- return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_);
- }
+
return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano()));
}
return utils::TaskRescheduleInfo::Done();
}
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi
[nifi-minifi-cpp] 03/03: MINIFICPP-2036 Upgrade gcc to version 11
Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 169cd68226a69c9815abedc174cc6713ba9bc957
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Mon Jan 30 11:15:38 2023 +0100
MINIFICPP-2036 Upgrade gcc to version 11
- on centos: use devtoolset-11 instead of devtoolset-10
- on alpine: use version 3.16 instead of 3.13
- on both: do not install the boost package, as it is no longer required
Signed-off-by: Adam Debreceni <ad...@apache.org>
This closes #1495
---
README.md | 2 +-
centos.sh | 4 ++--
docker/Dockerfile | 3 ++-
docker/centos/Dockerfile | 2 +-
4 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/README.md b/README.md
index 096cea87d..9a19458ea 100644
--- a/README.md
+++ b/README.md
@@ -117,7 +117,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension
#### Utilities
* CMake 3.17 or greater
-* gcc 10 or greater
+* gcc 11 or greater
* bison 3.0.x+ (3.2 has been shown to fail builds)
* flex 2.6 or greater
diff --git a/centos.sh b/centos.sh
index a01482273..279275604 100644
--- a/centos.sh
+++ b/centos.sh
@@ -18,8 +18,8 @@
get_toolset_name() {
case "$OS_MAJOR" in
- 7) TOOLSET_NAME=devtoolset-10 ;;
- 8) TOOLSET_NAME=gcc-toolset-10 ;;
+ 7) TOOLSET_NAME=devtoolset-11 ;;
+ 8) TOOLSET_NAME=gcc-toolset-12 ;;
9) TOOLSET_NAME=gcc-toolset-12 ;;
esac
}
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 58bf2a3a6..d5bdb0117 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -16,7 +16,7 @@
# under the License.
#
-ARG BASE_ALPINE_IMAGE="alpine:3.13"
+ARG BASE_ALPINE_IMAGE="alpine:3.16"
# Build image
FROM ${BASE_ALPINE_IMAGE} AS build
@@ -36,6 +36,7 @@ ARG DOCKER_SKIP_TESTS=ON
RUN apk --no-cache add gcc \
g++ \
make \
+ bash \
bison \
flex \
flex-dev \
diff --git a/docker/centos/Dockerfile b/docker/centos/Dockerfile
index e426acfda..7b8307bf4 100644
--- a/docker/centos/Dockerfile
+++ b/docker/centos/Dockerfile
@@ -57,7 +57,7 @@ USER ${USER}
# Perform the build
RUN cd $MINIFI_BASE_DIR && \
cd build && \
- source /opt/rh/devtoolset-10/enable && \
+ source /opt/rh/devtoolset-11/enable && \
export PATH=/usr/lib64/ccache${PATH:+:${PATH}} && \
cmake3 -DSTATIC_BUILD= -DSKIP_TESTS=${DOCKER_SKIP_TESTS} ${MINIFI_OPTIONS} -DAWS_ENABLE_UNITY_BUILD=OFF -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. && \
make -j "$(nproc)" package