You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/09/12 17:51:30 UTC

[07/11] nifi-minifi-cpp git commit: MINIFI-338: Improve wait decay per pull request comments

MINIFI-338: Improve wait decay per pull request comments


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/35d23d09
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/35d23d09
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/35d23d09

Branch: refs/heads/master
Commit: 35d23d091a9b3c6be7eb09fa47b6d8c65bb5334f
Parents: ead9e84
Author: Marc <ma...@gmail.com>
Authored: Thu Jul 20 19:28:26 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:03 2017 -0400

----------------------------------------------------------------------
 libminifi/include/utils/ThreadPool.h | 23 +++++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/35d23d09/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 8ff3975..5335c81 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -129,6 +129,10 @@ class Worker {
   virtual uint64_t getTimeSlice() {
     return time_slice_;
   }
+  
+  virtual uint64_t getWaitTime(){
+    return run_determinant_->wait_time();
+  }
 
   Worker<T>(const Worker<T>&) = delete;
   Worker<T>& operator =(const Worker<T>&) = delete;
@@ -352,11 +356,19 @@ void ThreadPool<T>::run_tasks() {
   uint64_t wait_decay_ = 0;
   while (running_.load()) {
 
+    // if we exceed 500ms of wait due to not being able to run any tasks and there are tasks available, meaning
+    // they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state
+    // BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from
+    // there. This ensures we don't have arbitrarily long sleep cycles. 
+    if (wait_decay_ > 500000000L){
+     wait_decay_ = 100000000L;
+    }
     // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
     // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
     // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
     // be more likely to run. This is intentional.
-    if (wait_decay_ > 1000) {
+    
+    if (wait_decay_ > 2000) {
       std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
     }
     Worker<T> task;
@@ -376,9 +388,12 @@ void ThreadPool<T>::run_tasks() {
 
     bool wait_to_run = false;
     if (task.getTimeSlice() > 1) {
+      double wt = (double)task.getWaitTime();
       auto now = std::chrono::system_clock::now().time_since_epoch();
-      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
-      if (task.getTimeSlice() > ms.count()) {
+      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
+      // if our differential is < 10% of the wait time we will not put the task into a wait state
+      // since requeuing will break the time slice contract.
+      if (task.getTimeSlice() > ms && (task.getTimeSlice() - ms) > (wt*.10)) {
         wait_to_run = true;
       }
     }
@@ -392,7 +407,7 @@ void ThreadPool<T>::run_tasks() {
       }
       worker_queue_.enqueue(std::move(task));
 
-      wait_decay_ += 100;
+      wait_decay_ += 25;
       continue;
     }