You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/09/08 20:59:31 UTC

[kudu] branch master updated: [threadpool] optimize queue overload detection

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new c4997af  [threadpool] optimize queue overload detection
c4997af is described below

commit c4997af4b90b4a102597775e575320c95eb3c1ba
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sun Sep 6 17:15:41 2020 -0700

    [threadpool] optimize queue overload detection
    
    This patch simplifies the queue overload detection and makes it a bit
    more robust, according to 'perf stat' running
    
      KUDU_ALLOW_SLOW_TESTS=1 perf stat ./bin/threadpool-test \
        --gtest_filter='*ThreadPoolPerformanceTest.ConcurrentAndSerialTasksMix/1'
    
    Before:
         147699.960062 task-clock                #   44.348 CPUs utilized
               371,519 context-switches          #    0.003 M/sec
                   653 cpu-migrations            #    0.004 K/sec
                 3,664 page-faults               #    0.025 K/sec
       423,794,029,838 cycles                    #    2.869 GHz
        94,656,186,092 instructions              #    0.22  insns per cycle
        34,018,899,188 branches                  #  230.324 M/sec
            22,374,862 branch-misses             #    0.07% of all branches
    
           3.330492839 seconds time elapsed
    
    After:
         126357.374737 task-clock                #   40.768 CPUs utilized
               350,907 context-switches          #    0.003 M/sec
                 3,167 cpu-migrations            #    0.025 K/sec
                 3,478 page-faults               #    0.028 K/sec
       362,348,476,629 cycles                    #    2.868 GHz
        82,964,368,394 instructions              #    0.23  insns per cycle
        29,553,336,891 branches                  #  233.887 M/sec
            16,945,558 branch-misses             #    0.06% of all branches
    
           3.099419461 seconds time elapsed
    
    Change-Id: Ic01ca7419beba92d7067c60ef520811136d67587
    Reviewed-on: http://gerrit.cloudera.org:8080/16424
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <gr...@apache.org>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/util/threadpool.cc | 28 ++++++++++++++++------------
 src/kudu/util/threadpool.h  | 11 ++++++++---
 2 files changed, 24 insertions(+), 15 deletions(-)

diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 6e2097d..569c968 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -771,6 +771,7 @@ ThreadPool::QueueLoadMeter::QueueLoadMeter(
     : tpool_(tpool),
       queue_time_threshold_(queue_time_threshold),
       queue_time_history_length_(queue_time_history_length),
+      over_queue_threshold_num_(0),
       queue_head_submit_time_(MonoTime()),
       overloaded_since_(MonoTime()),
       has_spare_thread_(true) {
@@ -826,18 +827,23 @@ void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked(
     const MonoTime& queue_head_submit_time,
     bool has_spare_thread) {
   tpool_.lock_.AssertAcquired();
+
   if (task_queue_time.Initialized()) {
-    // TODO(aserbin): any better way of tracking the running minimum of N numbers?
     queue_times_.emplace_back(task_queue_time);
-    queue_times_ordered_.insert(task_queue_time);
+    // Given the criterion to detect whether the queue is overloaded, it's not
+    // exactly necessary to track the running minimum of queue times for the
+    // specified history window. It's enough just to keep an eye on whether the
+    // window contains at least one element that's not over the threshold.
+    if (task_queue_time > queue_time_threshold_) {
+      ++over_queue_threshold_num_;
+    }
     if (queue_times_.size() > queue_time_history_length_) {
       const auto& elem = queue_times_.front();
-      auto it = queue_times_ordered_.find(elem);
-      DCHECK(it != queue_times_ordered_.end());
-      queue_times_ordered_.erase(it);
+      if (elem > queue_time_threshold_) {
+        --over_queue_threshold_num_;
+      }
       queue_times_.pop_front();
     }
-    min_queue_time_ = *queue_times_.begin();
   }
   queue_head_submit_time_.store(queue_head_submit_time);
   has_spare_thread_.store(has_spare_thread);
@@ -848,11 +854,10 @@ void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked(
   const bool queue_empty = !queue_head_submit_time.Initialized();
   const auto queue_time = queue_empty
       ? MonoDelta::FromSeconds(0) : now - queue_head_submit_time;
-  const auto min_queue_time = min_queue_time_.Initialized()
-      ? min_queue_time_ : MonoDelta::FromSeconds(0);
   const bool was_overloaded = overloaded_since_.load().Initialized();
   const bool overloaded = !has_spare_thread &&
-      std::max(min_queue_time, queue_time) > queue_time_threshold_;
+      (over_queue_threshold_num_ == queue_time_history_length_ ||
+       queue_time > queue_time_threshold_);
   // Track the state transitions and update overloaded_since_.
   if (!was_overloaded && overloaded) {
     VLOG(3) << Substitute("state transition: normal --> overloaded");
@@ -863,13 +868,12 @@ void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked(
   }
   VLOG(4) << Substitute("state refreshed: overloaded since $0, queue $1, "
                         "has $2 thread, queue head submit time $3, "
-                        "queue time $4, min queue time $5",
+                        "queue time $4",
                         overloaded_since_.load().ToString(),
                         queue_empty ? "empty" : "not empty",
                         has_spare_thread ? "spare" : "no spare",
                         queue_head_submit_time.ToString(),
-                        queue_time.ToString(),
-                        min_queue_time.ToString());
+                        queue_time.ToString());
 }
 
 std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 403acbd..ff1fae8 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -16,13 +16,14 @@
 // under the License.
 #pragma once
 
+#include <sys/types.h>
+
 #include <atomic>
 #include <cstddef>
 #include <deque>
 #include <functional>
 #include <iosfwd>
 #include <memory>
-#include <set>
 #include <string>
 #include <unordered_set>
 
@@ -340,11 +341,15 @@ class ThreadPool {
     // to the parameter 'N' in the description of the algorithm above.
     const size_t queue_time_history_length_;
 
+    // Number of elements in the queue history measurement window which are
+    // over the threshold specified by 'queue_time_threshold_'. Using the
+    // terminology from above, (min(QT_historic(M) > T_overload) iff
+    // (over_queue_threshold_num_ == M).
+    ssize_t over_queue_threshold_num_;
+
     // Queue timings of the most recent samples. The size of these containers
     // is kept under queue_time_history_length_ limit.
     std::deque<MonoDelta> queue_times_;
-    std::multiset<MonoDelta> queue_times_ordered_;
-    MonoDelta min_queue_time_;
 
     // Below fields are to store the latest snapshot of the information about
     // the task queue of the pool the meter is attached to.