You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/09/08 20:59:31 UTC
[kudu] branch master updated: [threadpool] optimize queue overload
detection
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new c4997af [threadpool] optimize queue overload detection
c4997af is described below
commit c4997af4b90b4a102597775e575320c95eb3c1ba
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sun Sep 6 17:15:41 2020 -0700
[threadpool] optimize queue overload detection
This patch simplifies the queue overload detection and makes it a bit
more robust, according to 'perf stat' running
KUDU_ALLOW_SLOW_TESTS=1 perf stat ./bin/threadpool-test \
--gtest_filter='*ThreadPoolPerformanceTest.ConcurrentAndSerialTasksMix/1'
Before:
147699.960062 task-clock # 44.348 CPUs utilized
371,519 context-switches # 0.003 M/sec
653 cpu-migrations # 0.004 K/sec
3,664 page-faults # 0.025 K/sec
423,794,029,838 cycles # 2.869 GHz
94,656,186,092 instructions # 0.22 insns per cycle
34,018,899,188 branches # 230.324 M/sec
22,374,862 branch-misses # 0.07% of all branches
3.330492839 seconds time elapsed
After:
126357.374737 task-clock # 40.768 CPUs utilized
350,907 context-switches # 0.003 M/sec
3,167 cpu-migrations # 0.025 K/sec
3,478 page-faults # 0.028 K/sec
362,348,476,629 cycles # 2.868 GHz
82,964,368,394 instructions # 0.23 insns per cycle
29,553,336,891 branches # 233.887 M/sec
16,945,558 branch-misses # 0.06% of all branches
3.099419461 seconds time elapsed
Change-Id: Ic01ca7419beba92d7067c60ef520811136d67587
Reviewed-on: http://gerrit.cloudera.org:8080/16424
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <gr...@apache.org>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/util/threadpool.cc | 28 ++++++++++++++++------------
src/kudu/util/threadpool.h | 11 ++++++++---
2 files changed, 24 insertions(+), 15 deletions(-)
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 6e2097d..569c968 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -771,6 +771,7 @@ ThreadPool::QueueLoadMeter::QueueLoadMeter(
: tpool_(tpool),
queue_time_threshold_(queue_time_threshold),
queue_time_history_length_(queue_time_history_length),
+ over_queue_threshold_num_(0),
queue_head_submit_time_(MonoTime()),
overloaded_since_(MonoTime()),
has_spare_thread_(true) {
@@ -826,18 +827,23 @@ void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked(
const MonoTime& queue_head_submit_time,
bool has_spare_thread) {
tpool_.lock_.AssertAcquired();
+
if (task_queue_time.Initialized()) {
- // TODO(aserbin): any better way of tracking the running minimum of N numbers?
queue_times_.emplace_back(task_queue_time);
- queue_times_ordered_.insert(task_queue_time);
+ // Given the criterion to detect whether the queue is overloaded, it's not
+ // exactly necessary to track the running minimum of queue times for the
+ // specified history window. It's enough just to keep an eye on whether the
+ // window contains at least one element that's not over the threshold.
+ if (task_queue_time > queue_time_threshold_) {
+ ++over_queue_threshold_num_;
+ }
if (queue_times_.size() > queue_time_history_length_) {
const auto& elem = queue_times_.front();
- auto it = queue_times_ordered_.find(elem);
- DCHECK(it != queue_times_ordered_.end());
- queue_times_ordered_.erase(it);
+ if (elem > queue_time_threshold_) {
+ --over_queue_threshold_num_;
+ }
queue_times_.pop_front();
}
- min_queue_time_ = *queue_times_.begin();
}
queue_head_submit_time_.store(queue_head_submit_time);
has_spare_thread_.store(has_spare_thread);
@@ -848,11 +854,10 @@ void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked(
const bool queue_empty = !queue_head_submit_time.Initialized();
const auto queue_time = queue_empty
? MonoDelta::FromSeconds(0) : now - queue_head_submit_time;
- const auto min_queue_time = min_queue_time_.Initialized()
- ? min_queue_time_ : MonoDelta::FromSeconds(0);
const bool was_overloaded = overloaded_since_.load().Initialized();
const bool overloaded = !has_spare_thread &&
- std::max(min_queue_time, queue_time) > queue_time_threshold_;
+ (over_queue_threshold_num_ == queue_time_history_length_ ||
+ queue_time > queue_time_threshold_);
// Track the state transitions and update overloaded_since_.
if (!was_overloaded && overloaded) {
VLOG(3) << Substitute("state transition: normal --> overloaded");
@@ -863,13 +868,12 @@ void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked(
}
VLOG(4) << Substitute("state refreshed: overloaded since $0, queue $1, "
"has $2 thread, queue head submit time $3, "
- "queue time $4, min queue time $5",
+ "queue time $4",
overloaded_since_.load().ToString(),
queue_empty ? "empty" : "not empty",
has_spare_thread ? "spare" : "no spare",
queue_head_submit_time.ToString(),
- queue_time.ToString(),
- min_queue_time.ToString());
+ queue_time.ToString());
}
std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 403acbd..ff1fae8 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -16,13 +16,14 @@
// under the License.
#pragma once
+#include <sys/types.h>
+
#include <atomic>
#include <cstddef>
#include <deque>
#include <functional>
#include <iosfwd>
#include <memory>
-#include <set>
#include <string>
#include <unordered_set>
@@ -340,11 +341,15 @@ class ThreadPool {
// to the parameter 'N' in the description of the algorithm above.
const size_t queue_time_history_length_;
+ // Number of elements in the queue history measurement window which are
+ // over the threshold specified by 'queue_time_threshold_'. Using the
+ // terminology from above, (min(QT_historic(M) > T_overload) iff
+ // (over_queue_threshold_num_ == M).
+ ssize_t over_queue_threshold_num_;
+
// Queue timings of the most recent samples. The size of these containers
// is kept under queue_time_history_length_ limit.
std::deque<MonoDelta> queue_times_;
- std::multiset<MonoDelta> queue_times_ordered_;
- MonoDelta min_queue_time_;
// Below fields are to store the latest snapshot of the information about
// the task queue of the pool the meter is attached to.