You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/11/20 18:17:30 UTC

kudu git commit: KUDU-2215. kernel_stack_watchdog: avoid blocking thread exit

Repository: kudu
Updated Branches:
  refs/heads/master 5bb3f430d -> fb637ce92


KUDU-2215. kernel_stack_watchdog: avoid blocking thread exit

This changes the stack watchdog so that thread unregistration no longer
blocks if the watchdog thread is in the middle of dumping a stack.

This is to try to avoid cases where a user thread is waiting to join on
another thread, but that thread is blocked due to watchdog interference.

A new stress-test/benchmark verifies the improvement. It simulates slow
stack trace collection by injecting latency into the watchdog thread,
and then starts and joins threads in a loop for 5 seconds. Without the
fix, it was only able to start about 1000 threads/second, whereas with
the fix it's able to start 10,000 threads/second.

Change-Id: Ib6a349666e8484c00b2f43c5918205ec1a4c09ab
Reviewed-on: http://gerrit.cloudera.org:8080/8536
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fb637ce9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fb637ce9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fb637ce9

Branch: refs/heads/master
Commit: fb637ce9259a4e087ac5f6840ba977806c2bf6b0
Parents: 5bb3f43
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Nov 13 12:17:58 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Nov 20 18:17:13 2017 +0000

----------------------------------------------------------------------
 src/kudu/util/fault_injection.cc       |  4 ++
 src/kudu/util/fault_injection.h        | 13 +++++
 src/kudu/util/kernel_stack_watchdog.cc | 85 +++++++++++++++++++++++------
 src/kudu/util/kernel_stack_watchdog.h  | 25 ++++++++-
 src/kudu/util/stack_watchdog-test.cc   | 43 +++++++++++++++
 5 files changed, 149 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fb637ce9/src/kudu/util/fault_injection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/fault_injection.cc b/src/kudu/util/fault_injection.cc
index 32cb954..61b1437 100644
--- a/src/kudu/util/fault_injection.cc
+++ b/src/kudu/util/fault_injection.cc
@@ -65,6 +65,10 @@ void DoInjectRandomLatency(double max_latency_ms) {
   SleepFor(MonoDelta::FromMilliseconds(g_random->NextDoubleFraction() * max_latency_ms));
 }
 
+void DoInjectFixedLatency(int32_t latency_ms) {
+  SleepFor(MonoDelta::FromMilliseconds(latency_ms));
+}
+
 bool DoMaybeTrue(double fraction) {
   GoogleOnceInit(&g_random_once, InitRandom);
   return PREDICT_FALSE(g_random->NextDoubleFraction() <= fraction);

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb637ce9/src/kudu/util/fault_injection.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/fault_injection.h b/src/kudu/util/fault_injection.h
index c46b118..7a71698 100644
--- a/src/kudu/util/fault_injection.h
+++ b/src/kudu/util/fault_injection.h
@@ -17,6 +17,8 @@
 #ifndef KUDU_UTIL_FAULT_INJECTION_H
 #define KUDU_UTIL_FAULT_INJECTION_H
 
+#include <stdint.h>
+
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/status.h"
@@ -43,6 +45,10 @@
 #define MAYBE_INJECT_RANDOM_LATENCY(max_ms_flag) \
   kudu::fault_injection::MaybeInjectRandomLatency(max_ms_flag)
 
+// Inject a specific amount of latency.
+#define MAYBE_INJECT_FIXED_LATENCY(ms_flag) \
+  kudu::fault_injection::MaybeInjectFixedLatency(ms_flag)
+
 // With some probability, return the status described by 'status_expr'.
 // This will not evaluate 'status_expr' if 'fraction_flag' is zero.
 #define MAYBE_RETURN_FAILURE(fraction_flag, status_expr) \
@@ -63,6 +69,7 @@ constexpr int kExitStatus = 85;
 // Out-of-line implementation.
 void DoMaybeFault(const char* fault_str, double fraction);
 void DoInjectRandomLatency(double max_latency_ms);
+void DoInjectFixedLatency(int32_t latency_ms);
 bool DoMaybeTrue(double fraction);
 
 inline bool MaybeTrue(double fraction) {
@@ -80,6 +87,12 @@ inline void MaybeInjectRandomLatency(double max_latency) {
   DoInjectRandomLatency(max_latency);
 }
 
+inline void MaybeInjectFixedLatency(int32_t latency) {
+  if (PREDICT_TRUE(latency <= 0)) return;
+  DoInjectFixedLatency(latency);
+}
+
+
 } // namespace fault_injection
 } // namespace kudu
 #endif /* KUDU_UTIL_FAULT_INJECTION_H */

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb637ce9/src/kudu/util/kernel_stack_watchdog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/kernel_stack_watchdog.cc b/src/kudu/util/kernel_stack_watchdog.cc
index b11c305..a7e4ec8 100644
--- a/src/kudu/util/kernel_stack_watchdog.cc
+++ b/src/kudu/util/kernel_stack_watchdog.cc
@@ -29,29 +29,36 @@
 #include <gflags/gflags.h>
 
 #include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/debug-util.h"
 #include "kudu/util/debug/leakcheck_disabler.h"
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/monotime.h"
-#include "kudu/util/thread.h"
 #include "kudu/util/status.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/thread.h"
 
 DEFINE_int32(hung_task_check_interval_ms, 200,
              "Number of milliseconds in between checks for hung threads");
 TAG_FLAG(hung_task_check_interval_ms, hidden);
 
+DEFINE_int32(inject_latency_on_kernel_stack_lookup_ms, 0,
+             "Number of milliseconds of latency to inject when reading a thread's "
+             "kernel stack");
+TAG_FLAG(inject_latency_on_kernel_stack_lookup_ms, hidden);
+
 using std::lock_guard;
 using std::string;
+using std::unique_ptr;
+using std::vector;
 using strings::Substitute;
 
 namespace kudu {
 
-DEFINE_STATIC_THREAD_LOCAL(KernelStackWatchdog::TLS,
-                           KernelStackWatchdog, tls_);
+__thread KernelStackWatchdog::TLS* KernelStackWatchdog::tls_;
 
 KernelStackWatchdog::KernelStackWatchdog()
   : log_collector_(nullptr),
@@ -95,12 +102,25 @@ void KernelStackWatchdog::Register(TLS* tls) {
 
 void KernelStackWatchdog::Unregister() {
   int64_t tid = Thread::CurrentThreadId();
-  MutexLock l(unregister_lock_);
-  lock_guard<simple_spinlock> l2(tls_lock_);
-  CHECK(tls_by_tid_.erase(tid));
+
+  std::unique_ptr<TLS> tls(tls_);
+  {
+    std::unique_lock<Mutex> l(unregister_lock_, std::try_to_lock);
+    lock_guard<simple_spinlock> l2(tls_lock_);
+    CHECK(tls_by_tid_.erase(tid));
+    if (!l.owns_lock()) {
+      // The watchdog is in the middle of running and might be accessing
+      // 'tls', so just enqueue it for later deletion. Otherwise it
+      // will go out of scope at the end of this function and get
+      // deleted here.
+      pending_delete_.emplace_back(std::move(tls));
+    }
+  }
+  tls_ = nullptr;
 }
 
 Status GetKernelStack(pid_t p, string* ret) {
+  MAYBE_INJECT_FIXED_LATENCY(FLAGS_inject_latency_on_kernel_stack_lookup_ms);
   faststring buf;
   RETURN_NOT_OK(ReadFileToString(Env::Default(), Substitute("/proc/$0/stack", p), &buf));
   *ret = buf.ToString();
@@ -115,21 +135,27 @@ void KernelStackWatchdog::RunThread() {
       break;
     }
 
-    // Prevent threads from unregistering between the snapshot loop and the sending of
-    // signals. This makes it safe for us to access their TLS. We might delay the thread
-    // exit a bit, but it would be unusual for any code to block on a thread exit, whereas
-    // it's relatively important for threads to _start_ quickly.
+    // Prevent threads from deleting their TLS objects between the snapshot loop and the sending of
+    // signals. This makes it safe for us to access their TLS.
+    //
+    // NOTE: it's still possible that the thread will have exited in between grabbing its pointer
+    // and sending a signal, but DumpThreadStack() already is safe about not sending a signal
+    // to some other non-Kudu thread.
     MutexLock l(unregister_lock_);
 
     // Take the snapshot of the thread information under a short lock.
     //
-    // 'lock_' prevents new threads from starting, so we don't want to do any lengthy work
+    // 'tls_lock_' prevents new threads from starting, so we don't want to do any lengthy work
     // (such as gathering stack traces) under this lock.
     TLSMap tls_map_copy;
+    vector<unique_ptr<TLS>> to_delete;
     {
       lock_guard<simple_spinlock> l(tls_lock_);
+      to_delete.swap(pending_delete_);
       tls_map_copy = tls_by_tid_;
     }
+    // Actually delete the no-longer-used TLS entries outside of the lock.
+    to_delete.clear();
 
     MicrosecondsInt64 now = GetMonoTimeMicros();
     for (const auto& entry : tls_map_copy) {
@@ -151,6 +177,16 @@ void KernelStackWatchdog::RunThread() {
 
           string user_stack = DumpThreadStack(p);
 
+          // If the thread exited the frame we're looking at in between when we started
+          // grabbing the stack and now, then our stack isn't correct. We shouldn't log it.
+          //
+          // We just use unprotected reads here since this is a somewhat best-effort
+          // check.
+          if (ANNOTATE_UNPROTECTED_READ(tls->depth_) < tls_copy.depth_ ||
+              ANNOTATE_UNPROTECTED_READ(tls->frames_[i].start_time_) != frame->start_time_) {
+            break;
+          }
+
           lock_guard<simple_spinlock> l(log_lock_);
           LOG_STRING(WARNING, log_collector_.get())
               << "Thread " << p << " stuck at " << frame->status_
@@ -163,21 +199,34 @@ void KernelStackWatchdog::RunThread() {
   }
 }
 
-KernelStackWatchdog::TLS* KernelStackWatchdog::GetTLS() {
+void KernelStackWatchdog::ThreadExiting(void* /* unused */) {
+  KernelStackWatchdog::GetInstance()->Unregister();
+}
+
+void KernelStackWatchdog::CreateAndRegisterTLS() {
+  DCHECK(!tls_);
   // Disable leak check. LSAN sometimes gets false positives on thread locals.
   // See: https://github.com/google/sanitizers/issues/757
   debug::ScopedLeakCheckDisabler d;
-  INIT_STATIC_THREAD_LOCAL(KernelStackWatchdog::TLS, tls_);
-  return tls_;
+  auto* tls = new TLS();
+  KernelStackWatchdog::GetInstance()->Register(tls);
+  tls_ = tls;
+
+  // We manually install a thread-exit function here making use of the internal
+  // functionality of the thread-local module, rather than using Thread::CallAtExit().
+  // This is because we may use the stack watchdog in contexts such as the client
+  // where it's likely that threads aren't associated with a kudu::Thread instance.
+  auto* dtor_list = new threadlocal::internal::PerThreadDestructorList();
+  dtor_list->destructor = &ThreadExiting;
+  dtor_list->arg = nullptr;
+  kudu::threadlocal::internal::AddDestructor(dtor_list);
 }
 
 KernelStackWatchdog::TLS::TLS() {
   memset(&data_, 0, sizeof(data_));
-  KernelStackWatchdog::GetInstance()->Register(this);
 }
 
 KernelStackWatchdog::TLS::~TLS() {
-  KernelStackWatchdog::GetInstance()->Unregister();
 }
 
 // Optimistic concurrency control approach to snapshot the value of another

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb637ce9/src/kudu/util/kernel_stack_watchdog.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/kernel_stack_watchdog.h b/src/kudu/util/kernel_stack_watchdog.h
index 26b8253..6ec7b50 100644
--- a/src/kudu/util/kernel_stack_watchdog.h
+++ b/src/kudu/util/kernel_stack_watchdog.h
@@ -54,6 +54,7 @@
 #define KUDU_UTIL_KERNEL_STACK_WATCHDOG_H
 
 #include <ctime>
+#include <memory>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -164,14 +165,27 @@ class KernelStackWatchdog {
   ~KernelStackWatchdog();
 
   // Get or create the TLS for the current thread.
-  static TLS* GetTLS();
+  static TLS* GetTLS() {
+    if (PREDICT_FALSE(!tls_)) {
+      CreateAndRegisterTLS();
+    }
+    return tls_;
+  }
+
+  // Create a new TLS for the current thread, and register it with the watchdog.
+  // Installs a callback to automatically unregister the thread upon its exit.
+  static void CreateAndRegisterTLS();
+
+  // Callback which is registered to run at thread-exit time by CreateAndRegisterTLS().
+  static void ThreadExiting(void* tls_void);
 
   // Register a new thread's TLS with the watchdog.
   // Called by any thread the first time it enters a watched section, when its TLS
   // is constructed.
   void Register(TLS* tls);
 
-  // Called when a thread's TLS is destructed (i.e. when the thread exits).
+  // Called when a thread is in the process of exiting, and has a registered TLS
+  // object.
   void Unregister();
 
   // The actual watchdog loop that the watchdog thread runs.
@@ -182,6 +196,11 @@ class KernelStackWatchdog {
   typedef std::unordered_map<pid_t, TLS*> TLSMap;
   TLSMap tls_by_tid_;
 
+  // If a thread exits while the watchdog is in the middle of accessing the TLS
+  // objects, we can't immediately delete the TLS struct. Instead, the thread
+  // enqueues it here for later deletion by the watchdog thread within RunThread().
+  std::vector<std::unique_ptr<TLS>> pending_delete_;
+
   // If non-NULL, warnings will be emitted into this vector instead of glog.
   // Used by tests.
   gscoped_ptr<std::vector<std::string> > log_collector_;
@@ -189,7 +208,7 @@ class KernelStackWatchdog {
   // Lock protecting log_collector_.
   mutable simple_spinlock log_lock_;
 
-  // Lock protecting tls_by_tid_.
+  // Lock protecting tls_by_tid_ and pending_delete_.
   mutable simple_spinlock tls_lock_;
 
   // Lock which prevents threads from unregistering while the watchdog

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb637ce9/src/kudu/util/stack_watchdog-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/stack_watchdog-test.cc b/src/kudu/util/stack_watchdog-test.cc
index a08887f..aefe220 100644
--- a/src/kudu/util/stack_watchdog-test.cc
+++ b/src/kudu/util/stack_watchdog-test.cc
@@ -17,12 +17,16 @@
 
 #include "kudu/util/kernel_stack_watchdog.h"
 
+#include <ostream>
 #include <string>
+#include <thread>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -32,10 +36,12 @@
 #include "kudu/util/test_util.h"
 
 using std::string;
+using std::thread;
 using std::vector;
 using strings::Substitute;
 
 DECLARE_int32(hung_task_check_interval_ms);
+DECLARE_int32(inject_latency_on_kernel_stack_lookup_ms);
 
 namespace kudu {
 
@@ -44,6 +50,8 @@ class StackWatchdogTest : public KuduTest {
   virtual void SetUp() OVERRIDE {
     KuduTest::SetUp();
     KernelStackWatchdog::GetInstance()->SaveLogsForTests(true);
+    ANNOTATE_BENIGN_RACE(&FLAGS_hung_task_check_interval_ms, "");
+    ANNOTATE_BENIGN_RACE(&FLAGS_inject_latency_on_kernel_stack_lookup_ms, "");
     FLAGS_hung_task_check_interval_ms = 10;
   }
 };
@@ -106,4 +114,39 @@ TEST_F(StackWatchdogTest, TestPerformance) {
     }
   }
 }
+
+// Stress test to ensure that we properly handle the case where threads are short-lived
+// and the watchdog may try to grab a stack of a thread that has already exited.
+//
+// This also serves as a benchmark -- we make the stack-grabbing especially slow and
+// ensure that we can still start and join threads quickly.
+TEST_F(StackWatchdogTest, TestShortLivedThreadsStress) {
+  // Run the stack watchdog continuously.
+  FLAGS_hung_task_check_interval_ms = 0;
+
+  // Make the actual stack trace collection slow. In practice we find that
+  // stack trace collection can often take quite some time due to symbolization, etc.
+  FLAGS_inject_latency_on_kernel_stack_lookup_ms = 1000;
+
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+  vector<thread> threads(100);
+  int started = 0;
+  while (MonoTime::Now() < deadline) {
+    thread* t = &threads[started % threads.size()];
+    if (t->joinable()) {
+      t->join();
+    }
+    *t = thread([&]() {
+        // Trigger watchdog at 1ms, but then sleep for 2ms, to ensure that
+        // the watchdog has plenty of work to do.
+        SCOPED_WATCH_STACK(1);
+        SleepFor(MonoDelta::FromMilliseconds(2));
+      });
+    started++;
+  }
+  for (auto& t : threads) {
+    if (t.joinable()) t.join();
+  }
+  LOG(INFO) << "started and joined " << started << " threads";
+}
 } // namespace kudu