You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/01/03 01:54:01 UTC

[3/3] kudu git commit: [util] use lighter locking scheme for ThreadMgr

[util] use lighter locking scheme for ThreadMgr

Use the rw_spinlock primitive to guard the ThreadMgr's registry
threads instead of Mutex.  Also, use the shared lock pattern to deal
with write and read access to the guarded entities of the ThreadMgr.
In addition, do not hold the lock for the whole duration of generating
the /threadz page for the embedded webserver.

With this patch, ThreadMgr now uses std::unordered_map as a container
for category-specific thread information and enforces stricter
consistency rules while removing thread information from its registry.
The process of adding an information about a thread into the registry
is not guarded by stricter consistency checks (i.e. it stays as it was
before this patch); see below for explanation.

NOTE:
  The stricter consistency around adding a thread into the ThreadMgr's
  registry caused issues with multiprocessing in Python tests,
  and I spent some time trying to work around that.  However, that was
  not fruitful.  I think the proper solution would be keeping the thread
  registry bound to some top-level object (like Kudu client or
  ServerBase object) and cleaning it up in accordance with the object's
  life cycle.

  In essence, the problem happens due to the combination of the
  following:
    * The way how workers are spawned by the multiprocessing.Pool,
      i.e. calling fork() at the point where Pool is instantiated.
    * The fact that pthread_t handles might be the same for threads
      in different processes.

  In detail, if multiprocessing.Pool is spawning worker processes after
  an instance of Kudu client has been created in the main test process,
  every worker gets a copy of the thread registry in its address space.
  The unexpected copy of the registry in a worker process is populated
  with the information on threads spawned due to the activity of the
  Kudu client in the main test process.  Later on, if a worker
  instantiates a Kudu client on its own, the newly spawned threads by
  the worker's Kudu client might have the same pthread_t handles
  as the threads whose information records are inadvertently inherited
  from the main test process.

  Also, it turned out it's impossible to handle worker's crash in a
  multiprocessing.Pool, and that's by design: for details see:
    https://stackoverflow.com/questions/24894682/

  BTW, in Python 3 the problem with the duplicated address space
  in worker processes has been resolved by using context and additional
  worker spawn modes (e.g., 'forkserver').

Change-Id: I4d49c1c39392e01c45019844430a4fe3d116c277
Reviewed-on: http://gerrit.cloudera.org:8080/12112
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c7a2d69f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c7a2d69f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c7a2d69f

Branch: refs/heads/master
Commit: c7a2d69fbd4f0a6f3b0938c31a439049f3733767
Parents: 7337cf5
Author: Alexey Serbin <al...@apache.org>
Authored: Wed Dec 19 13:23:03 2018 -0800
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Thu Jan 3 01:53:05 2019 +0000

----------------------------------------------------------------------
 src/kudu/util/os-util.cc |  23 ++---
 src/kudu/util/thread.cc  | 206 +++++++++++++++++++++++++-----------------
 2 files changed, 131 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c7a2d69f/src/kudu/util/os-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/os-util.cc b/src/kudu/util/os-util.cc
index df7761f..85ceb9f 100644
--- a/src/kudu/util/os-util.cc
+++ b/src/kudu/util/os-util.cc
@@ -29,7 +29,7 @@
 #include <unistd.h>
 
 #include <cstddef>
-#include <fstream>
+#include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
@@ -45,10 +45,8 @@
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/logging.h"
+#include "kudu/util/status.h"
 
-using std::ifstream;
-using std::istreambuf_iterator;
-using std::ostringstream;
 using std::string;
 using std::vector;
 using strings::Split;
@@ -75,7 +73,7 @@ static const int64_t kIoWait = 41 - 2;
 // Largest offset we are interested in, to check we get a well formed stat file.
 static const int64_t kMaxOffset = kIoWait;
 
-Status ParseStat(const std::string& buffer, std::string* name, ThreadStats* stats) {
+Status ParseStat(const string& buffer, string* name, ThreadStats* stats) {
   DCHECK(stats != nullptr);
 
   // The thread name should be the only field with parentheses. But the name
@@ -117,18 +115,11 @@ Status GetThreadStats(int64_t tid, ThreadStats* stats) {
   if (kTicksPerSec <= 0) {
     return Status::NotSupported("ThreadStats not supported");
   }
+  faststring buf;
+  RETURN_NOT_OK(ReadFileToString(
+      Env::Default(), Substitute("/proc/self/task/$0/stat", tid), &buf));
 
-  ostringstream proc_path;
-  proc_path << "/proc/self/task/" << tid << "/stat";
-  ifstream proc_file(proc_path.str().c_str());
-  if (!proc_file.is_open()) {
-    return Status::IOError("Could not open ifstream");
-  }
-
-  string buffer((istreambuf_iterator<char>(proc_file)),
-      istreambuf_iterator<char>());
-
-  return ParseStat(buffer, nullptr, stats); // don't want the name
+  return ParseStat(buf.ToString(), nullptr, stats);
 }
 
 void DisableCoreDumps() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/c7a2d69f/src/kudu/util/thread.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/thread.cc b/src/kudu/util/thread.cc
index bb53f5a..e134a27 100644
--- a/src/kudu/util/thread.cc
+++ b/src/kudu/util/thread.cc
@@ -30,6 +30,7 @@
 #include <cstring>
 #include <map>
 #include <memory>
+#include <mutex>
 #include <sstream>
 #include <unordered_map>
 #include <utility>
@@ -44,6 +45,7 @@
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/bind_helpers.h"
 #include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/mathlimits.h"
 #include "kudu/gutil/once.h"
 #include "kudu/gutil/port.h"
@@ -51,10 +53,10 @@
 #include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/kernel_stack_watchdog.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
-#include "kudu/util/mutex.h"
 #include "kudu/util/os-util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
@@ -64,12 +66,12 @@
 
 using boost::bind;
 using boost::mem_fn;
-using std::endl;
 using std::map;
 using std::ostringstream;
 using std::shared_ptr;
 using std::string;
 using std::vector;
+using std::unordered_map;
 using strings::Substitute;
 
 METRIC_DEFINE_gauge_uint64(server, threads_started,
@@ -162,13 +164,13 @@ class ThreadMgr {
   }
 
   ~ThreadMgr() {
-    MutexLock l(lock_);
     thread_categories_.clear();
   }
 
-  static void SetThreadName(const std::string& name, int64_t tid);
+  static void SetThreadName(const string& name, int64_t tid);
 
-  Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, WebCallbackRegistry* web);
+  Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics,
+                              WebCallbackRegistry* web) const;
 
   // Registers a thread to the supplied category. The key is a pthread_t,
   // not the system TID, since pthread_t is less prone to being recycled.
@@ -180,7 +182,7 @@ class ThreadMgr {
   void RemoveThread(const pthread_t& pthread_id, const string& category);
 
   // Metric callback for number of threads running. Also used for error messages.
-  uint64_t ReadThreadsRunning();
+  uint64_t ReadThreadsRunning() const;
 
  private:
   // Container class for any details we want to capture about a thread
@@ -204,18 +206,29 @@ class ThreadMgr {
     int64_t thread_id_;
   };
 
+  struct ThreadIdHash {
+    size_t operator()(pthread_t thread_id) const noexcept {
+      return std::hash<pthread_t>()(thread_id);
+    }
+  };
+
+  struct ThreadIdEqual {
+    bool operator()(pthread_t lhs, pthread_t rhs) const {
+      return pthread_equal(lhs, rhs) != 0;
+    }
+  };
+
   // A ThreadCategory is a set of threads that are logically related.
-  // TODO: unordered_map is incompatible with pthread_t, but would be more
-  // efficient here.
-  typedef map<const pthread_t, ThreadDescriptor> ThreadCategory;
+  typedef unordered_map<const pthread_t, ThreadDescriptor,
+                        ThreadIdHash, ThreadIdEqual> ThreadCategory;
 
-  // All thread categorys, keyed on the category name.
-  typedef map<string, ThreadCategory> ThreadCategoryMap;
+  // All thread categories, keyed on the category name.
+  typedef unordered_map<string, ThreadCategory> ThreadCategoryMap;
 
   // Protects thread_categories_ and thread metrics.
-  Mutex lock_;
+  mutable rw_spinlock lock_;
 
-  // All thread categorys that ever contained a thread, even if empty
+  // All thread categories that ever contained a thread, even if empty.
   ThreadCategoryMap thread_categories_;
 
   // Counters to track all-time total number of threads, and the
@@ -224,12 +237,13 @@ class ThreadMgr {
   uint64_t threads_running_metric_;
 
   // Metric callback for number of threads started.
-  uint64_t ReadThreadsStarted();
+  uint64_t ReadThreadsStarted() const;
 
   // Webpage callback; prints all threads by category.
   void ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
-                         WebCallbackRegistry::PrerenderedWebResponse* resp);
-  void PrintThreadCategoryRows(const ThreadCategory& category, ostringstream* output);
+                         WebCallbackRegistry::PrerenderedWebResponse* resp) const;
+  void PrintThreadDescriptorRow(const ThreadDescriptor& desc,
+                                ostringstream* output) const;
 };
 
 void ThreadMgr::SetThreadName(const string& name, int64_t tid) {
@@ -258,9 +272,7 @@ void ThreadMgr::SetThreadName(const string& name, int64_t tid) {
 }
 
 Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metrics,
-                                       WebCallbackRegistry* web) {
-  MutexLock l(lock_);
-
+                                       WebCallbackRegistry* web) const {
   // Use function gauges here so that we can register a unique copy of these metrics in
   // multiple tservers, even though the ThreadMgr is itself a singleton.
   metrics->NeverRetire(
@@ -292,13 +304,13 @@ Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metric
   return Status::OK();
 }
 
-uint64_t ThreadMgr::ReadThreadsStarted() {
-  MutexLock l(lock_);
+uint64_t ThreadMgr::ReadThreadsStarted() const {
+  shared_lock<decltype(lock_)> l(lock_);
   return threads_started_metric_;
 }
 
-uint64_t ThreadMgr::ReadThreadsRunning() {
-  MutexLock l(lock_);
+uint64_t ThreadMgr::ReadThreadsRunning() const {
+  shared_lock<decltype(lock_)> l(lock_);
   return threads_running_metric_;
 }
 
@@ -319,10 +331,23 @@ void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name,
   ANNOTATE_IGNORE_SYNC_BEGIN();
   ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
   {
-    MutexLock l(lock_);
-    thread_categories_[category][pthread_id] = ThreadDescriptor(category, name, tid);
-    threads_running_metric_++;
-    threads_started_metric_++;
+    // NOTE: Not using EmplaceOrDie() here -- that's because in environments
+    //   where fork() is called after some threads have been spawned, child
+    //   processes will inadvertently inherit the contents of the thread
+    //   registry (i.e. the entries in the thread_categories_ container).
+    //   For some platforms, pthread_t handles for threads in different
+    //   processes might be the same, so using EmplaceOrDie() would induce
+    //   a crash when ThreadMgr::AddThread() is called for a new thread
+    //   in the child process.
+    //
+    // TODO(aserbin): maybe, keep the thread_categories_ registry not in a
+    //   global static container, but bind the container with the life cycle
+    //   of some top-level object that uses the ThreadMgr as a singleton.
+    std::lock_guard<decltype(lock_)> l(lock_);
+    thread_categories_[category][pthread_id] =
+        ThreadDescriptor(category, name, tid);
+    ++threads_running_metric_;
+    ++threads_started_metric_;
   }
   ANNOTATE_IGNORE_SYNC_END();
   ANNOTATE_IGNORE_READS_AND_WRITES_END();
@@ -332,77 +357,94 @@ void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& category
   ANNOTATE_IGNORE_SYNC_BEGIN();
   ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
   {
-    MutexLock l(lock_);
-    auto category_it = thread_categories_.find(category);
-    DCHECK(category_it != thread_categories_.end());
-    category_it->second.erase(pthread_id);
-    threads_running_metric_--;
+    std::lock_guard<decltype(lock_)> l(lock_);
+    auto& threads = FindOrDie(thread_categories_, category);
+    auto num_erased = threads.erase(pthread_id);
+    CHECK_EQ(1, num_erased);
+    --threads_running_metric_;
   }
   ANNOTATE_IGNORE_SYNC_END();
   ANNOTATE_IGNORE_READS_AND_WRITES_END();
 }
 
-void ThreadMgr::PrintThreadCategoryRows(const ThreadCategory& category,
-    ostringstream* output) {
-  for (const ThreadCategory::value_type& thread : category) {
-    ThreadStats stats;
-    Status status = GetThreadStats(thread.second.thread_id(), &stats);
-    if (!status.ok()) {
-      KLOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: "
-                              << status.ToString();
-    }
-    (*output) << "<tr><td>" << thread.second.name() << "</td><td>"
-              << (static_cast<double>(stats.user_ns) / 1e9) << "</td><td>"
-              << (static_cast<double>(stats.kernel_ns) / 1e9) << "</td><td>"
-              << (static_cast<double>(stats.iowait_ns) / 1e9) << "</td></tr>";
+void ThreadMgr::PrintThreadDescriptorRow(const ThreadDescriptor& desc,
+                                         ostringstream* output) const {
+  ThreadStats stats;
+  Status status = GetThreadStats(desc.thread_id(), &stats);
+  if (!status.ok()) {
+    KLOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: "
+                            << status.ToString();
   }
+  (*output) << "<tr><td>" << desc.name() << "</td><td>"
+            << (static_cast<double>(stats.user_ns) / 1e9) << "</td><td>"
+            << (static_cast<double>(stats.kernel_ns) / 1e9) << "</td><td>"
+            << (static_cast<double>(stats.iowait_ns) / 1e9) << "</td></tr>";
 }
 
-void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
-                                  WebCallbackRegistry::PrerenderedWebResponse* resp) {
-  ostringstream* output = resp->output;
-  MutexLock l(lock_);
-  vector<const ThreadCategory*> categories_to_print;
-  auto category_name = req.parsed_args.find("group");
+void ThreadMgr::ThreadPathHandler(
+    const WebCallbackRegistry::WebRequest& req,
+    WebCallbackRegistry::PrerenderedWebResponse* resp) const {
+  ostringstream& output = *(resp->output);
+  vector<ThreadDescriptor> descriptors_to_print;
+  const auto category_name = req.parsed_args.find("group");
   if (category_name != req.parsed_args.end()) {
-    string group = EscapeForHtmlToString(category_name->second);
-    (*output) << "<h2>Thread Group: " << group << "</h2>" << endl;
+    const auto& group = category_name->second;
+    const auto& group_esc = EscapeForHtmlToString(group);
+    output << "<h2>Thread Group: " << group_esc << "</h2>";
     if (group != "all") {
-      ThreadCategoryMap::const_iterator category = thread_categories_.find(group);
-      if (category == thread_categories_.end()) {
-        (*output) << "Thread group '" << group << "' not found" << endl;
+      shared_lock<decltype(lock_)> l(lock_);
+      const auto it = thread_categories_.find(group);
+      if (it == thread_categories_.end()) {
+        output << "Thread group '" << group_esc << "' not found";
         return;
       }
-      categories_to_print.push_back(&category->second);
-      (*output) << "<h3>" << category->first << " : " << category->second.size()
-                << "</h3>";
+      for (const auto& elem : it->second) {
+        descriptors_to_print.push_back(elem.second);
+      }
+      output << "<h3>" << it->first << " : " << it->second.size() << "</h3>";
     } else {
-      for (const ThreadCategoryMap::value_type& category : thread_categories_) {
-        categories_to_print.push_back(&category.second);
+      shared_lock<decltype(lock_)> l(lock_);
+      for (const auto& category : thread_categories_) {
+        for (const auto& elem : category.second) {
+          descriptors_to_print.push_back(elem.second);
+        }
       }
-      (*output) << "<h3>All Threads : </h3>";
+      output << "<h3>All Threads : </h3>";
     }
-
-    (*output) << "<table class='table table-hover table-border'>";
-    (*output) << "<thead><tr><th>Thread name</th><th>Cumulative User CPU(s)</th>"
-              << "<th>Cumulative Kernel CPU(s)</th>"
-              << "<th>Cumulative IO-wait(s)</th></tr></thead>";
-    (*output) << "<tbody>\n";
-
-    for (const ThreadCategory* category : categories_to_print) {
-      PrintThreadCategoryRows(*category, output);
+    output << "<table class='table table-hover table-border'>"
+              "<thead><tr><th>Thread name</th><th>Cumulative User CPU(s)</th>"
+              "<th>Cumulative Kernel CPU(s)</th>"
+              "<th>Cumulative IO-wait(s)</th></tr></thead>"
+              "<tbody>\n";
+    // Sort the entries in the table by the name of a thread.
+    // TODO(aserbin): use "mustache + fancy table" instead.
+    std::sort(descriptors_to_print.begin(), descriptors_to_print.end(),
+              [](const ThreadDescriptor& lhs, const ThreadDescriptor& rhs) {
+                return lhs.name() < rhs.name();
+              });
+    for (const auto& desc : descriptors_to_print) {
+      PrintThreadDescriptorRow(desc, &output);
     }
-    (*output) << "</tbody></table>";
+    output << "</tbody></table>";
   } else {
-    (*output) << "<h2>Thread Groups</h2>";
-    (*output) << "<h4>" << threads_running_metric_ << " thread(s) running";
-    (*output) << "<a href='/threadz?group=all'><h3>All Threads</h3>";
-
-    for (const ThreadCategoryMap::value_type& category : thread_categories_) {
+    // Using the tree map (std::map) to have the list of the thread categories
+    // at the '/threadz' page sorted alphabetically.
+    // TODO(aserbin): use "mustache + fancy table" instead.
+    map<string, size_t> thread_categories_info;
+    {
+      shared_lock<decltype(lock_)> l(lock_);
+      output << "<h2>Thread Groups</h2>"
+                "<h4>" << threads_running_metric_ << " thread(s) running"
+                "<a href='/threadz?group=all'><h3>All Threads</h3>";
+      for (const auto& category : thread_categories_) {
+        thread_categories_info.emplace(category.first, category.second.size());
+      }
+    }
+    for (const auto& elem : thread_categories_info) {
       string category_arg;
-      UrlEncode(category.first, &category_arg);
-      (*output) << "<a href='/threadz?group=" << category_arg << "'><h3>"
-                << category.first << " : " << category.second.size() << "</h3></a>";
+      UrlEncode(elem.first, &category_arg);
+      output << "<a href='/threadz?group=" << category_arg << "'><h3>"
+             << elem.first << " : " << elem.second << "</h3></a>";
     }
   }
 }
@@ -496,7 +538,7 @@ Thread::~Thread() {
   }
 }
 
-std::string Thread::ToString() const {
+string Thread::ToString() const {
   return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), name_, category_);
 }
 
@@ -513,7 +555,7 @@ int64_t Thread::WaitForTid() const {
 }
 
 
-Status Thread::StartThread(const std::string& category, const std::string& name,
+Status Thread::StartThread(const string& category, const string& name,
                            const ThreadFunctor& functor, uint64_t flags,
                            scoped_refptr<Thread> *holder) {
   TRACE_COUNTER_INCREMENT("threads_started", 1);