You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/01/23 21:42:13 UTC

nifi-minifi-cpp git commit: MINIFICPP-378: Detach new threads if daemon threads is true

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 994ab3d29 -> 465ae42d8


MINIFICPP-378: Detach new threads if daemon threads is true

Resolve issue with starving single tasks on reduced thread pool

This closes #247.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/465ae42d
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/465ae42d
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/465ae42d

Branch: refs/heads/master
Commit: 465ae42d8dbce74dc9d4de85b220ed56fe6180c3
Parents: 994ab3d
Author: Marc Parisi <ph...@apache.org>
Authored: Mon Jan 22 11:45:54 2018 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Jan 23 16:41:44 2018 -0500

----------------------------------------------------------------------
 libminifi/include/utils/ThreadPool.h | 100 +++++++++++++++++-------------
 1 file changed, 57 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/465ae42d/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index f04e319..9fc47f5 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -220,6 +220,7 @@ class ThreadPool {
         running_(false),
         controller_service_provider_(controller_service_provider) {
     current_workers_ = 0;
+    task_count_ = 0;
     thread_manager_ = nullptr;
   }
 
@@ -232,6 +233,7 @@ class ThreadPool {
         controller_service_provider_(std::move(other.controller_service_provider_)),
         thread_manager_(std::move(other.thread_manager_)) {
     current_workers_ = 0;
+    task_count_ = 0;
   }
 
   ~ThreadPool() {
@@ -339,6 +341,7 @@ class ThreadPool {
   int max_worker_threads_;
 // current worker tasks.
   std::atomic<int> current_workers_;
+  std::atomic<int> task_count_;
 // thread queue
   std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
 // manager thread
@@ -383,7 +386,6 @@ class ThreadPool {
 
 template<typename T>
 bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
-
   {
     std::unique_lock<std::mutex> lock(worker_queue_mutex_);
     task_status_[task.getIdentifier()] = true;
@@ -393,6 +395,9 @@ bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
   if (running_) {
     tasks_available_.notify_one();
   }
+
+  task_count_++;
+
   return enqueued;
 }
 
@@ -428,6 +433,9 @@ void ThreadPool<T>::manageWorkers() {
           std::unique_lock<std::mutex> lock(worker_queue_mutex_);
           auto worker_thread = std::make_shared<WorkerThread>();
           worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
           thread_queue_.push_back(worker_thread);
           current_workers_++;
         }
@@ -456,7 +464,6 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
   uint64_t wait_decay_ = 0;
   uint64_t yield_backoff = 10;  // start at 10 ms
   while (running_.load()) {
-
     if (UNLIKELY(thread_reduction_count_ > 0)) {
       if (--thread_reduction_count_ >= 0) {
         deceased_thread_queue_.enqueue(thread);
@@ -493,66 +500,73 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
       yield_backoff = 10;
     }
     Worker<T> task;
-    if (!worker_queue_.try_dequeue(task)) {
-      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-      if (worker_priority_queue_.size() > 0) {
-        // this is safe as we are going to immediately pop the queue
-        while (!worker_priority_queue_.empty()) {
-          task = std::move(const_cast<Worker<T>&>(worker_priority_queue_.top()));
-          worker_priority_queue_.pop();
-          worker_queue_.enqueue(std::move(task));
-          continue;
-        }
 
-      }
-      tasks_available_.wait_for(lock, waitperiod);
-      continue;
-    } else {
-      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-      if (!task_status_[task.getIdentifier()]) {
-        continue;
-      }
-    }
+    bool prioritized_task = false;
 
-    bool wait_to_run = false;
-    if (task.getTimeSlice() > 1) {
-      double wt = (double) task.getWaitTime();
-      auto now = std::chrono::system_clock::now().time_since_epoch();
-      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
-      // if our differential is < 10% of the wait time we will not put the task into a wait state
-      // since requeuing will break the time slice contract.
-      if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) {
-        wait_to_run = true;
-      }
-    }
-    // if we have to wait we re-queue the worker.
-    if (wait_to_run) {
-      {
+    if (!prioritized_task) {
+      if (!worker_queue_.try_dequeue(task)) {
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        if (worker_priority_queue_.size() > 0) {
+          // this is safe as we are going to immediately pop the queue
+          while (!worker_priority_queue_.empty()) {
+            task = std::move(const_cast<Worker<T>&>(worker_priority_queue_.top()));
+            worker_priority_queue_.pop();
+            worker_queue_.enqueue(std::move(task));
+            continue;
+          }
+
+        }
+        tasks_available_.wait_for(lock, waitperiod);
+        continue;
+      } else {
         std::unique_lock<std::mutex> lock(worker_queue_mutex_);
         if (!task_status_[task.getIdentifier()]) {
           continue;
         }
-        // put it on the priority queue
-        worker_priority_queue_.push(std::move(task));
       }
-      //worker_queue_.enqueue(std::move(task));
 
-      wait_decay_ += 25;
-      continue;
-    }
+      bool wait_to_run = false;
+      if (task.getTimeSlice() > 1) {
+        double wt = (double) task.getWaitTime();
+        auto now = std::chrono::system_clock::now().time_since_epoch();
+        auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
 
+        // if our differential is < 10% of the wait time we will not put the task into a wait state
+        // since requeuing will break the time slice contract.
+        if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) {
+          wait_to_run = true;
+        }
+      }
+      // if we have to wait we re-queue the worker.
+      if (wait_to_run) {
+        {
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          if (!task_status_[task.getIdentifier()]) {
+            continue;
+          }
+          // put it on the priority queue
+          worker_priority_queue_.push(std::move(task));
+        }
+
+        wait_decay_ += 25;
+        continue;
+      }
+    }
     const bool task_renew = task.run();
     wait_decay_ = 0;
     if (task_renew) {
 
-      {
+      if (UNLIKELY(task_count_ > current_workers_)) {
         // even if we have more work to do we will not
         std::unique_lock<std::mutex> lock(worker_queue_mutex_);
         if (!task_status_[task.getIdentifier()]) {
           continue;
         }
+
+        worker_priority_queue_.push(std::move(task));
+      } else {
+        worker_queue_.enqueue(std::move(task));
       }
-      worker_queue_.enqueue(std::move(task));
     }
   }
   current_workers_--;