You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/09/12 17:51:27 UTC
[04/11] nifi-minifi-cpp git commit: MINIFI-338: Convert processor
threads to use thread pools
MINIFI-338: Convert processor threads to use thread pools
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/747691d7
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/747691d7
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/747691d7
Branch: refs/heads/master
Commit: 747691d7f3a787d7b38a4f67bac56d26001cf400
Parents: 5fca46f
Author: Marc Parisi <ph...@apache.org>
Authored: Fri Jun 30 10:05:15 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:03 2017 -0400
----------------------------------------------------------------------
libminifi/include/EventDrivenSchedulingAgent.h | 2 +-
libminifi/include/SchedulingAgent.h | 2 +-
libminifi/include/ThreadedSchedulingAgent.h | 56 +++++-
libminifi/include/TimerDrivenSchedulingAgent.h | 2 +-
libminifi/include/core/Processor.h | 5 +-
libminifi/include/utils/ThreadPool.h | 180 +++++++++++++++++++-
libminifi/src/EventDrivenSchedulingAgent.cpp | 13 +-
libminifi/src/FlowController.cpp | 15 +-
libminifi/src/SchedulingAgent.cpp | 4 +-
libminifi/src/ThreadedSchedulingAgent.cpp | 44 ++---
libminifi/src/TimerDrivenSchedulingAgent.cpp | 13 +-
libminifi/test/unit/SocketTests.cpp | 2 +-
libminifi/test/unit/ThreadPoolTests.cpp | 2 +-
13 files changed, 281 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/EventDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index c838b11..ca9f021 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -46,7 +46,7 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
virtual ~EventDrivenSchedulingAgent() {
}
// Run function for the thread
- void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+ uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
private:
// Prevent default copy constructor and assignment operation
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 1ff3fac..130c088 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -84,7 +84,7 @@ class SchedulingAgent {
running_ = true;
}
// stop
- void stop() {
+ virtual void stop() {
running_ = false;
component_lifecycle_thread_pool_.shutdown();
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index b4db4bf..27b8b3a 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -20,6 +20,7 @@
#ifndef __THREADED_SCHEDULING_AGENT_H__
#define __THREADED_SCHEDULING_AGENT_H__
+#include <chrono>
#include "properties/Configure.h"
#include "core/logging/LoggerConfiguration.h"
#include "core/Processor.h"
@@ -33,6 +34,47 @@ namespace nifi {
namespace minifi {
/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<uint64_t> {
+ public:
+ TimerAwareMonitor(std::atomic<bool> *run_monitor)
+ : run_monitor_(run_monitor),
+ current_wait_(0) {
+
+ }
+ explicit TimerAwareMonitor(TimerAwareMonitor &&other)
+ : AfterExecute(std::move(other)),
+ run_monitor_(std::move(other.run_monitor_)) {
+ current_wait_.store(other.current_wait_.load());
+ }
+ virtual bool isFinished(const uint64_t &result) {
+ current_wait_.store(result);
+ if (*run_monitor_) {
+ return false;
+ }
+ return true;
+ }
+ virtual bool isCancelled(const uint64_t &result) {
+ if (*run_monitor_) {
+ return false;
+ }
+ return true;
+ }
+ /**
+ * Time to wait before re-running this task if necessary
+ * @return milliseconds since epoch after which we are eligible to re-run this task.
+ */
+ virtual int64_t wait_time() {
+ return current_wait_.load();
+ }
+ private:
+
+ std::atomic<uint64_t> current_wait_;
+ std::atomic<bool> *run_monitor_;
+};
+
+/**
* An abstract scheduling agent which creates and manages a pool of threads for
* each processor scheduled.
*/
@@ -48,13 +90,18 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
std::shared_ptr<Configure> configuration)
: SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration),
logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
+
+ utils::ThreadPool<uint64_t> pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true);
+ thread_pool_ = std::move(pool);
+ thread_pool_.start();
+
}
// Destructor
virtual ~ThreadedSchedulingAgent() {
}
// Run function for the thread
- virtual void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0;
+ virtual uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0;
public:
// schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
@@ -62,9 +109,12 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
// unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
virtual void unschedule(std::shared_ptr<core::Processor> processor);
+ virtual void stop();
+ protected:
+ utils::ThreadPool<uint64_t> thread_pool_;
+
protected:
- // Threads
- std::map<std::string, std::vector<std::thread *>> _threads;
+
private:
// Prevent default copy constructor and assignment operation
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 816bcec..1502c47 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -49,7 +49,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
/**
* Run function that accepts the processor, context and session factory.
*/
- void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
+ uint64_t run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory);
private:
// Prevent default copy constructor and assignment operation
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 251ec47..0853c11 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -220,6 +220,9 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
virtual void onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory) {
}
+ // Check all incoming connections for work
+ bool isWorkAvailable();
+
protected:
// Processor state
@@ -246,8 +249,6 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
// Yield Expiration
std::atomic<uint64_t> yield_expiration_;
- // Check all incoming connections for work
- bool isWorkAvailable();
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
Processor(const Processor &parent);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 77772cd..8ff3975 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -33,6 +33,35 @@ namespace minifi {
namespace utils {
/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+ virtual ~AfterExecute() {
+
+ }
+
+ explicit AfterExecute() {
+
+ }
+
+ explicit AfterExecute(AfterExecute &&other) {
+
+ }
+ virtual bool isFinished(const T &result) = 0;
+ virtual bool isCancelled(const T &result) = 0;
+ /**
+ * Time to wait before re-running this task if necessary
+ * @return milliseconds since epoch after which we are eligible to re-run this task.
+ */
+ virtual int64_t wait_time() {
+ return 0;
+ }
+};
+
+/**
* Worker task
* purpose: Provides a wrapper for the functor
* and returns a future based on the template argument.
@@ -40,12 +69,29 @@ namespace utils {
template<typename T>
class Worker {
public:
- explicit Worker(std::function<T()> &task)
- : task(task) {
+ explicit Worker(std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
+ : task(task),
+ run_determinant_(std::move(run_determinant)),
+ identifier_(identifier),
+ time_slice_(0) {
+ promise = std::make_shared<std::promise<T>>();
+ }
+
+ explicit Worker(std::function<T()> &task, const std::string &identifier)
+ : task(task),
+ run_determinant_(nullptr),
+ identifier_(identifier),
+ time_slice_(0) {
promise = std::make_shared<std::promise<T>>();
}
- explicit Worker() {
+ explicit Worker(const std::string identifier = "")
+ : identifier_(identifier),
+ time_slice_(0) {
+ }
+
+ virtual ~Worker() {
+
}
/**
@@ -53,16 +99,35 @@ class Worker {
*/
Worker(Worker &&other)
: task(std::move(other.task)),
- promise(other.promise) {
+ promise(other.promise),
+ time_slice_(std::move(other.time_slice_)),
+ identifier_(std::move(other.identifier_)),
+ run_determinant_(std::move(other.run_determinant_)) {
}
/**
- * Runs the task and takes the output from the funtor
+ * Runs the task and takes the output from the functor
* setting the result into the promise
+ * @return whether or not to continue running
+ * false == finished || error
+ * true == run again
*/
- void run() {
+ virtual bool run() {
T result = task();
- promise->set_value(result);
+ if (run_determinant_ == nullptr || (run_determinant_->isFinished(result) || run_determinant_->isCancelled(result))) {
+ promise->set_value(result);
+ return false;
+ }
+ time_slice_ = increment_time(run_determinant_->wait_time());
+ return true;
+ }
+
+ virtual void setIdentifier(const std::string identifier) {
+ identifier_ = identifier;
+ }
+
+ virtual uint64_t getTimeSlice() {
+ return time_slice_;
}
Worker<T>(const Worker<T>&) = delete;
@@ -72,8 +137,22 @@ class Worker {
std::shared_ptr<std::promise<T>> getPromise();
- private:
+ const std::string &getIdentifier() {
+ return identifier_;
+ }
+ protected:
+
+ inline uint64_t increment_time(const uint64_t &time) {
+ std::chrono::time_point<std::chrono::system_clock> now =
+ std::chrono::system_clock::now();
+ auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
+ return millis + time;
+ }
+
+ std::string identifier_;
+ uint64_t time_slice_;
std::function<T()> task;
+ std::unique_ptr<AfterExecute<T>> run_determinant_;
std::shared_ptr<std::promise<T>> promise;
};
@@ -81,6 +160,9 @@ template<typename T>
Worker<T>& Worker<T>::operator =(Worker<T> && other) {
task = std::move(other.task);
promise = other.promise;
+ time_slice_ = std::move(other.time_slice_);
+ identifier_ = std::move(other.identifier_);
+ run_determinant_ = std::move(other.run_determinant_);
return *this;
}
@@ -125,6 +207,21 @@ class ThreadPool {
* @return true if future can be created and thread pool is in a running state.
*/
bool execute(Worker<T> &&task, std::future<T> &future);
+
+ /**
+ * attempts to stop tasks with the provided identifier.
+ * @param identifier for worker tasks. Note that these tasks won't
+ * immediately stop.
+ */
+ void stopTasks(const std::string &identifier);
+
+ /**
+ * Returns true if a task is running.
+ */
+ bool isRunning(const std::string &identifier) {
+ return task_status_[identifier] == true;
+ }
+
/**
* Starts the Thread Pool
*/
@@ -199,6 +296,8 @@ class ThreadPool {
moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
// notification for available work
std::condition_variable tasks_available_;
+ // map to identify if a task should be
+ std::map<std::string, bool> task_status_;
// manager mutex
std::recursive_mutex manager_mutex_;
// work queue mutex
@@ -218,6 +317,10 @@ class ThreadPool {
template<typename T>
bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+ {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ task_status_[task.getIdentifier()] = true;
+ }
future = std::move(task.getPromise()->get_future());
bool enqueued = worker_queue_.enqueue(std::move(task));
if (running_) {
@@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
template<typename T>
void ThreadPool<T>::run_tasks() {
auto waitperiod = std::chrono::milliseconds(1) * 100;
+ uint64_t wait_decay_ = 0;
while (running_.load()) {
+ // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
+ // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
+ // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
+ // be more likely to run. This is intentional.
+ if (wait_decay_ > 1000) {
+ std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
+ }
Worker<T> task;
if (!worker_queue_.try_dequeue(task)) {
+
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
tasks_available_.wait_for(lock, waitperiod);
continue;
}
- task.run();
+ else {
+
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+
+ bool wait_to_run = false;
+ if (task.getTimeSlice() > 1) {
+ auto now = std::chrono::system_clock::now().time_since_epoch();
+ auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
+ if (task.getTimeSlice() > ms.count()) {
+ wait_to_run = true;
+ }
+ }
+ // if we have to wait we re-queue the worker.
+ if (wait_to_run) {
+ {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+ worker_queue_.enqueue(std::move(task));
+
+ wait_decay_ += 100;
+ continue;
+ }
+
+ const bool task_renew = task.run();
+ wait_decay_ = 0;
+ if (task_renew) {
+
+ {
+ // even if we have more work to do we will not
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+ worker_queue_.enqueue(std::move(task));
+
+ }
}
current_workers_--;
@@ -272,12 +427,19 @@ void ThreadPool<T>::start() {
}
template<typename T>
+void ThreadPool<T>::stopTasks(const std::string &identifier) {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ task_status_[identifier] = false;
+}
+
+template<typename T>
void ThreadPool<T>::shutdown() {
if (running_.load()) {
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
running_.store(false);
drain();
+ task_status_.clear();
if (manager_thread_.joinable())
manager_thread_.join();
{
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/EventDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 8a2a874..db5ca08 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -32,22 +32,27 @@ namespace apache {
namespace nifi {
namespace minifi {
-void EventDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
+uint64_t EventDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
while (this->running_) {
bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
if (processor->isYield()) {
// Honor the yield
- std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
+ return processor->getYieldTime();
} else if (shouldYield && this->bored_yield_duration_ > 0) {
// No work to do or need to apply back pressure
- std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_));
+ return this->bored_yield_duration_;
}
// Block until work is available
+
processor->waitForWork(1000);
+
+ if (!processor->isWorkAvailable()) {
+ return 1000;
+ }
}
- return;
+ return 0;
}
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 6358ed0..32fd298 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -183,17 +183,16 @@ void FlowController::stop(bool force) {
std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
if (running_) {
// immediately indicate that we are not running
- running_ = false;
-
logger_->log_info("Stop Flow Controller");
- this->timer_scheduler_->stop();
- this->event_scheduler_->stop();
- this->flow_file_repo_->stop();
- this->provenance_repo_->stop();
- // Wait for sometime for thread stop
- std::this_thread::sleep_for(std::chrono::milliseconds(3000));
if (this->root_)
this->root_->stopProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get());
+ this->flow_file_repo_->stop();
+ this->provenance_repo_->stop();
+ // stop after we've attempted to stop the processors.
+ this->timer_scheduler_->stop();
+ this->event_scheduler_->stop();
+ running_ = false;
+
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 1060830..e228ba5 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -46,7 +46,7 @@ void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::
return serviceNode->enable();
};
// create a functor that will be submitted to the thread pool.
- utils::Worker<bool> functor(f_ex);
+ utils::Worker<bool> functor(f_ex, serviceNode->getUUIDStr());
// move the functor into the thread pool. While a future is returned
// we aren't terribly concerned with the result.
std::future<bool> future;
@@ -59,7 +59,7 @@ void SchedulingAgent::disableControllerService(std::shared_ptr<core::controller:
return serviceNode->disable();
};
// create a functor that will be submitted to the thread pool.
- utils::Worker<bool> functor(f_ex);
+ utils::Worker<bool> functor(f_ex, serviceNode->getUUIDStr());
// move the functor into the thread pool. While a future is returned
// we aren't terribly concerned with the result.
std::future<bool> future;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 7b4ce85..d6b8fae 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -61,8 +61,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
return;
}
- std::map<std::string, std::vector<std::thread *>>::iterator it = _threads.find(processor->getUUIDStr());
- if (it != _threads.end()) {
+ if (thread_pool_.isRunning(processor->getUUIDStr())) {
logger_->log_info("Can not schedule threads for processor %s because there are existing threads running");
return;
}
@@ -74,20 +73,33 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
processor->onSchedule(processContext.get(), sessionFactory.get());
std::vector<std::thread *> threads;
+
+ ThreadedSchedulingAgent *agent = this;
for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) {
- ThreadedSchedulingAgent *agent = this;
- std::thread *thread = new std::thread([agent, processor, processContext, sessionFactory] () {
- agent->run(processor, processContext.get(), sessionFactory.get());
- });
- thread->detach();
- threads.push_back(thread);
- logger_->log_info("Scheduled thread %d running for process %s", thread->get_id(), processor->getName().c_str());
+
+ // reference the disable function from serviceNode
+ std::function<uint64_t()> f_ex = [agent, processor, processContext, sessionFactory] () {
+ return agent->run(processor, processContext.get(), sessionFactory.get());
+ };
+ // create a functor that will be submitted to the thread pool.
+ std::unique_ptr<TimerAwareMonitor> monitor = std::unique_ptr<TimerAwareMonitor>(new TimerAwareMonitor(&running_));
+ utils::Worker<uint64_t> functor(f_ex, processor->getUUIDStr(), std::move(monitor));
+ // move the functor into the thread pool. While a future is returned
+ // we aren't terribly concerned with the result.
+ std::future<uint64_t> future;
+ thread_pool_.execute(std::move(functor), future);
+
}
- _threads[processor->getUUIDStr().c_str()] = threads;
+ logger_->log_info("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName().c_str());
return;
}
+void ThreadedSchedulingAgent::stop() {
+ SchedulingAgent::stop();
+ thread_pool_.shutdown();
+}
+
void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) {
std::lock_guard < std::mutex > lock(mutex_);
logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str());
@@ -97,18 +109,8 @@ void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> proces
return;
}
- std::map<std::string, std::vector<std::thread *>>::iterator it = _threads.find(processor->getUUIDStr());
+ thread_pool_.stopTasks(processor->getUUIDStr());
- if (it == _threads.end()) {
- logger_->log_info("Cannot unschedule threads for processor %s because there are no existing threads running", processor->getName().c_str());
- return;
- }
- for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread) {
- std::thread *thread = *itThread;
- logger_->log_info("Scheduled thread %d deleted for process %s", thread->get_id(), processor->getName().c_str());
- delete thread;
- }
- _threads.erase(processor->getUUIDStr());
processor->clearActiveTask();
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index b9a41ea..3276470 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -29,19 +29,22 @@ namespace apache {
namespace nifi {
namespace minifi {
-void TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
+uint64_t TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) {
while (this->running_) {
bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
if (processor->isYield()) {
// Honor the yield
- std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
+ return processor->getYieldTime();
} else if (shouldYield && this->bored_yield_duration_ > 0) {
// No work to do or need to apply back pressure
- std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_));
+ //std::this_thread::sleep_for(std::chrono::milliseconds(x));
+ return this->bored_yield_duration_;
}
- std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
+ return processor->getSchedulingPeriodNano() / 1000000;
+ //std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
}
- return;
+ return 0;
+ //return;
}
} /* namespace minifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/test/unit/SocketTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp
index a791b3f..0576d5f 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -200,7 +200,7 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket6]") {
std::vector<std::future<bool>> futures;
for (int i = 0; i < 20; i++) {
std::function<bool()> f_ex = createSocket;
- utils::Worker<bool> functor(f_ex);
+ utils::Worker<bool> functor(f_ex, "id");
std::future<bool> fut;
REQUIRE(true == pool.execute(std::move(functor), fut));
futures.push_back(std::move(fut));
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/747691d7/libminifi/test/unit/ThreadPoolTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp
index 0bba767..670958a 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -29,7 +29,7 @@ bool function() {
TEST_CASE("ThreadPoolTest1", "[TPT1]") {
utils::ThreadPool<bool> pool(5);
std::function<bool()> f_ex = function;
- utils::Worker<bool> functor(f_ex);
+ utils::Worker<bool> functor(f_ex, "id");
pool.start();
std::future<bool> fut;
REQUIRE(true == pool.execute(std::move(functor), fut));