You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/11/20 18:17:30 UTC
kudu git commit: KUDU-2215. kernel_stack_watchdog: avoid blocking
thread exit
Repository: kudu
Updated Branches:
refs/heads/master 5bb3f430d -> fb637ce92
KUDU-2215. kernel_stack_watchdog: avoid blocking thread exit
This changes the stack watchdog so that thread unregistration no longer
blocks if the watchdog thread is in the middle of dumping a stack.
This is to try to avoid cases where a user thread is waiting to join on
another thread, but that thread is blocked due to watchdog interference.
A new stress-test/benchmark verifies the improvement. It simulates slow
stack trace collection by injecting latency into the watchdog thread,
and then starts and joins threads in a loop for 5 seconds. Without the
fix, it was only able to start about 1000 threads/second, whereas with
the fix it's able to start 10,000 threads/second.
Change-Id: Ib6a349666e8484c00b2f43c5918205ec1a4c09ab
Reviewed-on: http://gerrit.cloudera.org:8080/8536
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fb637ce9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fb637ce9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fb637ce9
Branch: refs/heads/master
Commit: fb637ce9259a4e087ac5f6840ba977806c2bf6b0
Parents: 5bb3f43
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Nov 13 12:17:58 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Nov 20 18:17:13 2017 +0000
----------------------------------------------------------------------
src/kudu/util/fault_injection.cc | 4 ++
src/kudu/util/fault_injection.h | 13 +++++
src/kudu/util/kernel_stack_watchdog.cc | 85 +++++++++++++++++++++++------
src/kudu/util/kernel_stack_watchdog.h | 25 ++++++++-
src/kudu/util/stack_watchdog-test.cc | 43 +++++++++++++++
5 files changed, 149 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/fb637ce9/src/kudu/util/fault_injection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/fault_injection.cc b/src/kudu/util/fault_injection.cc
index 32cb954..61b1437 100644
--- a/src/kudu/util/fault_injection.cc
+++ b/src/kudu/util/fault_injection.cc
@@ -65,6 +65,10 @@ void DoInjectRandomLatency(double max_latency_ms) {
SleepFor(MonoDelta::FromMilliseconds(g_random->NextDoubleFraction() * max_latency_ms));
}
+void DoInjectFixedLatency(int32_t latency_ms) {
+ SleepFor(MonoDelta::FromMilliseconds(latency_ms));
+}
+
bool DoMaybeTrue(double fraction) {
GoogleOnceInit(&g_random_once, InitRandom);
return PREDICT_FALSE(g_random->NextDoubleFraction() <= fraction);
http://git-wip-us.apache.org/repos/asf/kudu/blob/fb637ce9/src/kudu/util/fault_injection.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/fault_injection.h b/src/kudu/util/fault_injection.h
index c46b118..7a71698 100644
--- a/src/kudu/util/fault_injection.h
+++ b/src/kudu/util/fault_injection.h
@@ -17,6 +17,8 @@
#ifndef KUDU_UTIL_FAULT_INJECTION_H
#define KUDU_UTIL_FAULT_INJECTION_H
+#include <stdint.h>
+
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/util/status.h"
@@ -43,6 +45,10 @@
#define MAYBE_INJECT_RANDOM_LATENCY(max_ms_flag) \
kudu::fault_injection::MaybeInjectRandomLatency(max_ms_flag)
+// Inject a specific amount of latency.
+#define MAYBE_INJECT_FIXED_LATENCY(ms_flag) \
+ kudu::fault_injection::MaybeInjectFixedLatency(ms_flag)
+
// With some probability, return the status described by 'status_expr'.
// This will not evaluate 'status_expr' if 'fraction_flag' is zero.
#define MAYBE_RETURN_FAILURE(fraction_flag, status_expr) \
@@ -63,6 +69,7 @@ constexpr int kExitStatus = 85;
// Out-of-line implementation.
void DoMaybeFault(const char* fault_str, double fraction);
void DoInjectRandomLatency(double max_latency_ms);
+void DoInjectFixedLatency(int32_t latency_ms);
bool DoMaybeTrue(double fraction);
inline bool MaybeTrue(double fraction) {
@@ -80,6 +87,12 @@ inline void MaybeInjectRandomLatency(double max_latency) {
DoInjectRandomLatency(max_latency);
}
+inline void MaybeInjectFixedLatency(int32_t latency) {
+ if (PREDICT_TRUE(latency <= 0)) return;
+ DoInjectFixedLatency(latency);
+}
+
+
} // namespace fault_injection
} // namespace kudu
#endif /* KUDU_UTIL_FAULT_INJECTION_H */
http://git-wip-us.apache.org/repos/asf/kudu/blob/fb637ce9/src/kudu/util/kernel_stack_watchdog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/kernel_stack_watchdog.cc b/src/kudu/util/kernel_stack_watchdog.cc
index b11c305..a7e4ec8 100644
--- a/src/kudu/util/kernel_stack_watchdog.cc
+++ b/src/kudu/util/kernel_stack_watchdog.cc
@@ -29,29 +29,36 @@
#include <gflags/gflags.h>
#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/debug/leakcheck_disabler.h"
#include "kudu/util/env.h"
#include "kudu/util/faststring.h"
+#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/monotime.h"
-#include "kudu/util/thread.h"
#include "kudu/util/status.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/thread.h"
DEFINE_int32(hung_task_check_interval_ms, 200,
"Number of milliseconds in between checks for hung threads");
TAG_FLAG(hung_task_check_interval_ms, hidden);
+DEFINE_int32(inject_latency_on_kernel_stack_lookup_ms, 0,
+ "Number of milliseconds of latency to inject when reading a thread's "
+ "kernel stack");
+TAG_FLAG(inject_latency_on_kernel_stack_lookup_ms, hidden);
+
using std::lock_guard;
using std::string;
+using std::unique_ptr;
+using std::vector;
using strings::Substitute;
namespace kudu {
-DEFINE_STATIC_THREAD_LOCAL(KernelStackWatchdog::TLS,
- KernelStackWatchdog, tls_);
+__thread KernelStackWatchdog::TLS* KernelStackWatchdog::tls_;
KernelStackWatchdog::KernelStackWatchdog()
: log_collector_(nullptr),
@@ -95,12 +102,25 @@ void KernelStackWatchdog::Register(TLS* tls) {
void KernelStackWatchdog::Unregister() {
int64_t tid = Thread::CurrentThreadId();
- MutexLock l(unregister_lock_);
- lock_guard<simple_spinlock> l2(tls_lock_);
- CHECK(tls_by_tid_.erase(tid));
+
+ std::unique_ptr<TLS> tls(tls_);
+ {
+ std::unique_lock<Mutex> l(unregister_lock_, std::try_to_lock);
+ lock_guard<simple_spinlock> l2(tls_lock_);
+ CHECK(tls_by_tid_.erase(tid));
+ if (!l.owns_lock()) {
+ // The watchdog is in the middle of running and might be accessing
+ // 'tls', so just enqueue it for later deletion. Otherwise it
+ // will go out of scope at the end of this function and get
+ // deleted here.
+ pending_delete_.emplace_back(std::move(tls));
+ }
+ }
+ tls_ = nullptr;
}
Status GetKernelStack(pid_t p, string* ret) {
+ MAYBE_INJECT_FIXED_LATENCY(FLAGS_inject_latency_on_kernel_stack_lookup_ms);
faststring buf;
RETURN_NOT_OK(ReadFileToString(Env::Default(), Substitute("/proc/$0/stack", p), &buf));
*ret = buf.ToString();
@@ -115,21 +135,27 @@ void KernelStackWatchdog::RunThread() {
break;
}
- // Prevent threads from unregistering between the snapshot loop and the sending of
- // signals. This makes it safe for us to access their TLS. We might delay the thread
- // exit a bit, but it would be unusual for any code to block on a thread exit, whereas
- // it's relatively important for threads to _start_ quickly.
+ // Prevent threads from deleting their TLS objects between the snapshot loop and the sending of
+ // signals. This makes it safe for us to access their TLS.
+ //
+ // NOTE: it's still possible that the thread will have exited in between grabbing its pointer
+ // and sending a signal, but DumpThreadStack() already is safe about not sending a signal
+ // to some other non-Kudu thread.
MutexLock l(unregister_lock_);
// Take the snapshot of the thread information under a short lock.
//
- // 'lock_' prevents new threads from starting, so we don't want to do any lengthy work
+ // 'tls_lock_' prevents new threads from starting, so we don't want to do any lengthy work
// (such as gathering stack traces) under this lock.
TLSMap tls_map_copy;
+ vector<unique_ptr<TLS>> to_delete;
{
lock_guard<simple_spinlock> l(tls_lock_);
+ to_delete.swap(pending_delete_);
tls_map_copy = tls_by_tid_;
}
+ // Actually delete the no-longer-used TLS entries outside of the lock.
+ to_delete.clear();
MicrosecondsInt64 now = GetMonoTimeMicros();
for (const auto& entry : tls_map_copy) {
@@ -151,6 +177,16 @@ void KernelStackWatchdog::RunThread() {
string user_stack = DumpThreadStack(p);
+ // If the thread exited the frame we're looking at in between when we started
+ // grabbing the stack and now, then our stack isn't correct. We shouldn't log it.
+ //
+ // We just use unprotected reads here since this is a somewhat best-effort
+ // check.
+ if (ANNOTATE_UNPROTECTED_READ(tls->depth_) < tls_copy.depth_ ||
+ ANNOTATE_UNPROTECTED_READ(tls->frames_[i].start_time_) != frame->start_time_) {
+ break;
+ }
+
lock_guard<simple_spinlock> l(log_lock_);
LOG_STRING(WARNING, log_collector_.get())
<< "Thread " << p << " stuck at " << frame->status_
@@ -163,21 +199,34 @@ void KernelStackWatchdog::RunThread() {
}
}
-KernelStackWatchdog::TLS* KernelStackWatchdog::GetTLS() {
+void KernelStackWatchdog::ThreadExiting(void* /* unused */) {
+ KernelStackWatchdog::GetInstance()->Unregister();
+}
+
+void KernelStackWatchdog::CreateAndRegisterTLS() {
+ DCHECK(!tls_);
// Disable leak check. LSAN sometimes gets false positives on thread locals.
// See: https://github.com/google/sanitizers/issues/757
debug::ScopedLeakCheckDisabler d;
- INIT_STATIC_THREAD_LOCAL(KernelStackWatchdog::TLS, tls_);
- return tls_;
+ auto* tls = new TLS();
+ KernelStackWatchdog::GetInstance()->Register(tls);
+ tls_ = tls;
+
+ // We manually install a thread-exit function here making use of the internal
+ // functionality of the thread-local module, rather than using Thread::CallAtExit().
+ // This is because we may use the stack watchdog in contexts such as the client
+ // where it's likely that threads aren't associated with a kudu::Thread instance.
+ auto* dtor_list = new threadlocal::internal::PerThreadDestructorList();
+ dtor_list->destructor = &ThreadExiting;
+ dtor_list->arg = nullptr;
+ kudu::threadlocal::internal::AddDestructor(dtor_list);
}
KernelStackWatchdog::TLS::TLS() {
memset(&data_, 0, sizeof(data_));
- KernelStackWatchdog::GetInstance()->Register(this);
}
KernelStackWatchdog::TLS::~TLS() {
- KernelStackWatchdog::GetInstance()->Unregister();
}
// Optimistic concurrency control approach to snapshot the value of another
http://git-wip-us.apache.org/repos/asf/kudu/blob/fb637ce9/src/kudu/util/kernel_stack_watchdog.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/kernel_stack_watchdog.h b/src/kudu/util/kernel_stack_watchdog.h
index 26b8253..6ec7b50 100644
--- a/src/kudu/util/kernel_stack_watchdog.h
+++ b/src/kudu/util/kernel_stack_watchdog.h
@@ -54,6 +54,7 @@
#define KUDU_UTIL_KERNEL_STACK_WATCHDOG_H
#include <ctime>
+#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
@@ -164,14 +165,27 @@ class KernelStackWatchdog {
~KernelStackWatchdog();
// Get or create the TLS for the current thread.
- static TLS* GetTLS();
+ static TLS* GetTLS() {
+ if (PREDICT_FALSE(!tls_)) {
+ CreateAndRegisterTLS();
+ }
+ return tls_;
+ }
+
+ // Create a new TLS for the current thread, and register it with the watchdog.
+ // Installs a callback to automatically unregister the thread upon its exit.
+ static void CreateAndRegisterTLS();
+
+ // Callback which is registered to run at thread-exit time by CreateAndRegisterTLS().
+ static void ThreadExiting(void* tls_void);
// Register a new thread's TLS with the watchdog.
// Called by any thread the first time it enters a watched section, when its TLS
// is constructed.
void Register(TLS* tls);
- // Called when a thread's TLS is destructed (i.e. when the thread exits).
+ // Called when a thread is in the process of exiting, and has a registered TLS
+ // object.
void Unregister();
// The actual watchdog loop that the watchdog thread runs.
@@ -182,6 +196,11 @@ class KernelStackWatchdog {
typedef std::unordered_map<pid_t, TLS*> TLSMap;
TLSMap tls_by_tid_;
+ // If a thread exits while the watchdog is in the middle of accessing the TLS
+ // objects, we can't immediately delete the TLS struct. Instead, the thread
+ // enqueues it here for later deletion by the watchdog thread within RunThread().
+ std::vector<std::unique_ptr<TLS>> pending_delete_;
+
// If non-NULL, warnings will be emitted into this vector instead of glog.
// Used by tests.
gscoped_ptr<std::vector<std::string> > log_collector_;
@@ -189,7 +208,7 @@ class KernelStackWatchdog {
// Lock protecting log_collector_.
mutable simple_spinlock log_lock_;
- // Lock protecting tls_by_tid_.
+ // Lock protecting tls_by_tid_ and pending_delete_.
mutable simple_spinlock tls_lock_;
// Lock which prevents threads from unregistering while the watchdog
http://git-wip-us.apache.org/repos/asf/kudu/blob/fb637ce9/src/kudu/util/stack_watchdog-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/stack_watchdog-test.cc b/src/kudu/util/stack_watchdog-test.cc
index a08887f..aefe220 100644
--- a/src/kudu/util/stack_watchdog-test.cc
+++ b/src/kudu/util/stack_watchdog-test.cc
@@ -17,12 +17,16 @@
#include "kudu/util/kernel_stack_watchdog.h"
+#include <ostream>
#include <string>
+#include <thread>
#include <vector>
#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
@@ -32,10 +36,12 @@
#include "kudu/util/test_util.h"
using std::string;
+using std::thread;
using std::vector;
using strings::Substitute;
DECLARE_int32(hung_task_check_interval_ms);
+DECLARE_int32(inject_latency_on_kernel_stack_lookup_ms);
namespace kudu {
@@ -44,6 +50,8 @@ class StackWatchdogTest : public KuduTest {
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
KernelStackWatchdog::GetInstance()->SaveLogsForTests(true);
+ ANNOTATE_BENIGN_RACE(&FLAGS_hung_task_check_interval_ms, "");
+ ANNOTATE_BENIGN_RACE(&FLAGS_inject_latency_on_kernel_stack_lookup_ms, "");
FLAGS_hung_task_check_interval_ms = 10;
}
};
@@ -106,4 +114,39 @@ TEST_F(StackWatchdogTest, TestPerformance) {
}
}
}
+
+// Stress test to ensure that we properly handle the case where threads are short-lived
+// and the watchdog may try to grab a stack of a thread that has already exited.
+//
+// This also serves as a benchmark -- we make the stack-grabbing especially slow and
+// ensure that we can still start and join threads quickly.
+TEST_F(StackWatchdogTest, TestShortLivedThreadsStress) {
+ // Run the stack watchdog continuously.
+ FLAGS_hung_task_check_interval_ms = 0;
+
+ // Make the actual stack trace collection slow. In practice we find that
+ // stack trace collection can often take quite some time due to symbolization, etc.
+ FLAGS_inject_latency_on_kernel_stack_lookup_ms = 1000;
+
+ MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+ vector<thread> threads(100);
+ int started = 0;
+ while (MonoTime::Now() < deadline) {
+ thread* t = &threads[started % threads.size()];
+ if (t->joinable()) {
+ t->join();
+ }
+ *t = thread([&]() {
+ // Trigger watchdog at 1ms, but then sleep for 2ms, to ensure that
+ // the watchdog has plenty of work to do.
+ SCOPED_WATCH_STACK(1);
+ SleepFor(MonoDelta::FromMilliseconds(2));
+ });
+ started++;
+ }
+ for (auto& t : threads) {
+ if (t.joinable()) t.join();
+ }
+ LOG(INFO) << "started and joined " << started << " threads";
+}
} // namespace kudu