You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2022/02/16 04:54:06 UTC

[kudu] branch master updated: [thread] Small refactor to improve ThreadMgr performance

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 31a9b01  [thread] Small refactor to improve ThreadMgr performance
31a9b01 is described below

commit 31a9b0158984fc000f35cff483856eaec5338132
Author: Yingchun Lai <ac...@gmail.com>
AuthorDate: Sun Feb 13 23:53:49 2022 +0800

    [thread] Small refactor to improve ThreadMgr performance
    
    Move threads_started_metric_ and threads_running_metric_ out of lock and
    use atomic to improve performance.
    
    Change-Id: I219c509c5818b618e864c534bbf40cc4f9a7dc13
    Reviewed-on: http://gerrit.cloudera.org:8080/18229
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Yingchun Lai <ac...@gmail.com>
---
 src/kudu/util/thread.cc | 54 +++++++++++++++++++++++--------------------------
 1 file changed, 25 insertions(+), 29 deletions(-)

diff --git a/src/kudu/util/thread.cc b/src/kudu/util/thread.cc
index 79d3e85..6cfa891 100644
--- a/src/kudu/util/thread.cc
+++ b/src/kudu/util/thread.cc
@@ -27,6 +27,7 @@
 #include <unistd.h>
 
 #include <algorithm>
+#include <atomic>
 #include <cerrno>
 #include <cstring>
 #include <memory>
@@ -62,6 +63,7 @@
 #include "kudu/util/url-coding.h"
 #include "kudu/util/web_callback_registry.h"
 
+using std::atomic;
 using std::ostringstream;
 using std::pair;
 using std::shared_ptr;
@@ -185,7 +187,6 @@ class ThreadMgr {
   }
 
   ~ThreadMgr() {
-    thread_categories_.clear();
   }
 
   static void SetThreadName(const string& name, int64_t tid);
@@ -196,14 +197,16 @@ class ThreadMgr {
   // Registers a thread to the supplied category. The key is a pthread_t,
   // not the system TID, since pthread_t is less prone to being recycled.
   void AddThread(const pthread_t& pthread_id, const string& name, const string& category,
-      int64_t tid);
+                 int64_t tid);
 
   // Removes a thread from the supplied category. If the thread has
   // already been removed, this is a no-op.
   void RemoveThread(const pthread_t& pthread_id, const string& category);
 
   // Metric callback for number of threads running. Also used for error messages.
-  uint64_t ReadThreadsRunning() const;
+  uint64_t ReadThreadsRunning() const {
+    return threads_running_metric_;
+  }
 
  private:
   // Container class for any details we want to capture about a thread
@@ -252,7 +255,7 @@ class ThreadMgr {
   // All thread categories, keyed on the category name.
   typedef unordered_map<string, ThreadCategory> ThreadCategoryMap;
 
-  // Protects thread_categories_ and thread metrics.
+  // Protects thread_categories_.
   mutable rw_spinlock lock_;
 
   // All thread categories that ever contained a thread, even if empty.
@@ -260,11 +263,13 @@ class ThreadMgr {
 
   // Counters to track all-time total number of threads, and the
   // current number of running threads.
-  uint64_t threads_started_metric_;
-  uint64_t threads_running_metric_;
+  atomic<uint64_t> threads_started_metric_;
+  atomic<uint64_t> threads_running_metric_;
 
   // Metric callback for number of threads started.
-  uint64_t ReadThreadsStarted() const;
+  uint64_t ReadThreadsStarted() const {
+    return threads_started_metric_;
+  }
 
   // Webpage callback; prints all threads by category.
   void ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
@@ -336,16 +341,6 @@ Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metric
   return Status::OK();
 }
 
-uint64_t ThreadMgr::ReadThreadsStarted() const {
-  shared_lock<decltype(lock_)> l(lock_);
-  return threads_started_metric_;
-}
-
-uint64_t ThreadMgr::ReadThreadsRunning() const {
-  shared_lock<decltype(lock_)> l(lock_);
-  return threads_running_metric_;
-}
-
 void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name,
     const string& category, int64_t tid) {
   // These annotations cause TSAN to ignore the synchronization on lock_
@@ -375,9 +370,10 @@ void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name,
     // TODO(aserbin): maybe, keep the thread_categories_ registry not in a
     //   global static container, but bind the container with the life cycle
     //   of some top-level object that uses the ThreadMgr as a singleton.
-    std::lock_guard<decltype(lock_)> l(lock_);
-    thread_categories_[category][pthread_id] =
-        ThreadDescriptor(category, name, tid);
+    {
+      std::lock_guard<decltype(lock_)> l(lock_);
+      thread_categories_[category][pthread_id] = ThreadDescriptor(category, name, tid);
+    }
     ++threads_running_metric_;
     ++threads_started_metric_;
   }
@@ -393,8 +389,8 @@ void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& category
     auto& threads = FindOrDie(thread_categories_, category);
     auto num_erased = threads.erase(pthread_id);
     CHECK_EQ(1, num_erased);
-    --threads_running_metric_;
   }
+  --threads_running_metric_;
   ANNOTATE_IGNORE_SYNC_END();
   ANNOTATE_IGNORE_READS_AND_WRITES_END();
 }
@@ -454,18 +450,16 @@ void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
   } else {
     // List all thread groups and the number of threads running in each.
     vector<pair<string, uint64_t>> thread_categories_info;
-    uint64_t running;
     {
       // See comment above regarding short critical sections.
       shared_lock<decltype(lock_)> l(lock_);
-      running = threads_running_metric_;
       thread_categories_info.reserve(thread_categories_.size());
       for (const auto& category : thread_categories_) {
         thread_categories_info.emplace_back(category.first, category.second.size());
       }
     }
 
-    output["total_threads_running"] = running;
+    uint64_t running = 0;
     EasyJson groups = output.Set("groups", EasyJson::kArray);
     for (const auto& elem : thread_categories_info) {
       string category_arg;
@@ -483,7 +477,9 @@ void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
       g["encoded_group_name"] = category_arg;
       g["group_name"] = elem.first;
       g["threads_running"] = elem.second;
+      running += elem.second;
     }
+    output["total_threads_running"] = running;
   }
 }
 
@@ -623,10 +619,10 @@ Status Thread::StartThread(string category, string name,
   t->AddRef();
 
   auto cleanup = MakeScopedCleanup([&]() {
-      // If we failed to create the thread, we need to undo all of our prep work.
-      t->tid_ = INVALID_TID;
-      t->Release();
-    });
+    // If we failed to create the thread, we need to undo all of our prep work.
+    t->tid_ = INVALID_TID;
+    t->Release();
+  });
 
   if (PREDICT_FALSE(FLAGS_thread_inject_start_latency_ms > 0)) {
     LOG(INFO) << "Injecting " << FLAGS_thread_inject_start_latency_ms << "ms sleep on thread start";
@@ -638,7 +634,7 @@ Status Thread::StartThread(string category, string name,
     SCOPED_WATCH_STACK((flags & NO_STACK_WATCHDOG) ? 0 : 250);
     int ret = pthread_create(&t->thread_, nullptr, &Thread::SuperviseThread, t.get());
     if (ret) {
-      string msg = "";
+      string msg;
       if (ret == EAGAIN) {
         uint64_t rlimit_nproc = Env::Default()->GetResourceLimit(
             Env::ResourceLimitType::RUNNING_THREADS_PER_EUID);