You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/01/23 21:42:13 UTC
nifi-minifi-cpp git commit: MINIFICPP-378: Detach new threads if
daemon threads is true
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 994ab3d29 -> 465ae42d8
MINIFICPP-378: Detach new threads if daemon threads is true
Resolve issue with starving single tasks on reduced thread pool
This closes #247.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/465ae42d
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/465ae42d
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/465ae42d
Branch: refs/heads/master
Commit: 465ae42d8dbce74dc9d4de85b220ed56fe6180c3
Parents: 994ab3d
Author: Marc Parisi <ph...@apache.org>
Authored: Mon Jan 22 11:45:54 2018 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Jan 23 16:41:44 2018 -0500
----------------------------------------------------------------------
libminifi/include/utils/ThreadPool.h | 100 +++++++++++++++++-------------
1 file changed, 57 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/465ae42d/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index f04e319..9fc47f5 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -220,6 +220,7 @@ class ThreadPool {
running_(false),
controller_service_provider_(controller_service_provider) {
current_workers_ = 0;
+ task_count_ = 0;
thread_manager_ = nullptr;
}
@@ -232,6 +233,7 @@ class ThreadPool {
controller_service_provider_(std::move(other.controller_service_provider_)),
thread_manager_(std::move(other.thread_manager_)) {
current_workers_ = 0;
+ task_count_ = 0;
}
~ThreadPool() {
@@ -339,6 +341,7 @@ class ThreadPool {
int max_worker_threads_;
// current worker tasks.
std::atomic<int> current_workers_;
+ std::atomic<int> task_count_;
// thread queue
std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
// manager thread
@@ -383,7 +386,6 @@ class ThreadPool {
template<typename T>
bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
-
{
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
task_status_[task.getIdentifier()] = true;
@@ -393,6 +395,9 @@ bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
if (running_) {
tasks_available_.notify_one();
}
+
+ task_count_++;
+
return enqueued;
}
@@ -428,6 +433,9 @@ void ThreadPool<T>::manageWorkers() {
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
auto worker_thread = std::make_shared<WorkerThread>();
worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+ if (daemon_threads_) {
+ worker_thread->thread_.detach();
+ }
thread_queue_.push_back(worker_thread);
current_workers_++;
}
@@ -456,7 +464,6 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
uint64_t wait_decay_ = 0;
uint64_t yield_backoff = 10; // start at 10 ms
while (running_.load()) {
-
if (UNLIKELY(thread_reduction_count_ > 0)) {
if (--thread_reduction_count_ >= 0) {
deceased_thread_queue_.enqueue(thread);
@@ -493,66 +500,73 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
yield_backoff = 10;
}
Worker<T> task;
- if (!worker_queue_.try_dequeue(task)) {
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- if (worker_priority_queue_.size() > 0) {
- // this is safe as we are going to immediately pop the queue
- while (!worker_priority_queue_.empty()) {
- task = std::move(const_cast<Worker<T>&>(worker_priority_queue_.top()));
- worker_priority_queue_.pop();
- worker_queue_.enqueue(std::move(task));
- continue;
- }
- }
- tasks_available_.wait_for(lock, waitperiod);
- continue;
- } else {
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- if (!task_status_[task.getIdentifier()]) {
- continue;
- }
- }
+ bool prioritized_task = false;
- bool wait_to_run = false;
- if (task.getTimeSlice() > 1) {
- double wt = (double) task.getWaitTime();
- auto now = std::chrono::system_clock::now().time_since_epoch();
- auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
- // if our differential is < 10% of the wait time we will not put the task into a wait state
- // since requeuing will break the time slice contract.
- if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) {
- wait_to_run = true;
- }
- }
- // if we have to wait we re-queue the worker.
- if (wait_to_run) {
- {
+ if (!prioritized_task) {
+ if (!worker_queue_.try_dequeue(task)) {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (worker_priority_queue_.size() > 0) {
+ // this is safe as we are going to immediately pop the queue
+ while (!worker_priority_queue_.empty()) {
+ task = std::move(const_cast<Worker<T>&>(worker_priority_queue_.top()));
+ worker_priority_queue_.pop();
+ worker_queue_.enqueue(std::move(task));
+ continue;
+ }
+
+ }
+ tasks_available_.wait_for(lock, waitperiod);
+ continue;
+ } else {
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
if (!task_status_[task.getIdentifier()]) {
continue;
}
- // put it on the priority queue
- worker_priority_queue_.push(std::move(task));
}
- //worker_queue_.enqueue(std::move(task));
- wait_decay_ += 25;
- continue;
- }
+ bool wait_to_run = false;
+ if (task.getTimeSlice() > 1) {
+ double wt = (double) task.getWaitTime();
+ auto now = std::chrono::system_clock::now().time_since_epoch();
+ auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
+ // if our differential is < 10% of the wait time we will not put the task into a wait state
+ // since requeuing will break the time slice contract.
+ if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) {
+ wait_to_run = true;
+ }
+ }
+ // if we have to wait we re-queue the worker.
+ if (wait_to_run) {
+ {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ // put it on the priority queue
+ worker_priority_queue_.push(std::move(task));
+ }
+
+ wait_decay_ += 25;
+ continue;
+ }
+ }
const bool task_renew = task.run();
wait_decay_ = 0;
if (task_renew) {
- {
+ if (UNLIKELY(task_count_ > current_workers_)) {
// even if we have more work to do we will not
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
if (!task_status_[task.getIdentifier()]) {
continue;
}
+
+ worker_priority_queue_.push(std::move(task));
+ } else {
+ worker_queue_.enqueue(std::move(task));
}
- worker_queue_.enqueue(std::move(task));
}
}
current_workers_--;