You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/09/12 17:51:30 UTC
[07/11] nifi-minifi-cpp git commit: MINIFI-338: Improve wait decay
per pull request comments
MINIFI-338: Improve wait decay per pull request comments
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/35d23d09
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/35d23d09
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/35d23d09
Branch: refs/heads/master
Commit: 35d23d091a9b3c6be7eb09fa47b6d8c65bb5334f
Parents: ead9e84
Author: Marc <ma...@gmail.com>
Authored: Thu Jul 20 19:28:26 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Sep 12 13:51:03 2017 -0400
----------------------------------------------------------------------
libminifi/include/utils/ThreadPool.h | 23 +++++++++++++++++++----
1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/35d23d09/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 8ff3975..5335c81 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -129,6 +129,10 @@ class Worker {
virtual uint64_t getTimeSlice() {
return time_slice_;
}
+
+ virtual uint64_t getWaitTime(){
+ return run_determinant_->wait_time();
+ }
Worker<T>(const Worker<T>&) = delete;
Worker<T>& operator =(const Worker<T>&) = delete;
@@ -352,11 +356,19 @@ void ThreadPool<T>::run_tasks() {
uint64_t wait_decay_ = 0;
while (running_.load()) {
+ // if we exceed 500ms of wait due to not being able to run any tasks and there are tasks available, meaning
+ // they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state
+ // BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from
+ // there. This ensures we don't have arbitrarily long sleep cycles.
+ if (wait_decay_ > 500000000L){
+ wait_decay_ = 100000000L;
+ }
// if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
// we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
// wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
// be more likely to run. This is intentional.
- if (wait_decay_ > 1000) {
+
+ if (wait_decay_ > 2000) {
std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
}
Worker<T> task;
@@ -376,9 +388,12 @@ void ThreadPool<T>::run_tasks() {
bool wait_to_run = false;
if (task.getTimeSlice() > 1) {
+ double wt = (double)task.getWaitTime();
auto now = std::chrono::system_clock::now().time_since_epoch();
- auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
- if (task.getTimeSlice() > ms.count()) {
+ auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
+ // if our differential is < 10% of the wait time we will not put the task into a wait state
+ // since requeuing will break the time slice contract.
+ if (task.getTimeSlice() > ms && (task.getTimeSlice() - ms) > (wt*.10)) {
wait_to_run = true;
}
}
@@ -392,7 +407,7 @@ void ThreadPool<T>::run_tasks() {
}
worker_queue_.enqueue(std::move(task));
- wait_decay_ += 100;
+ wait_decay_ += 25;
continue;
}