You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/18 06:45:21 UTC
[kudu] 03/03: thread: simplify Thread::Create API
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f37e7a6e0a249d32568707bdd2f5305c3663e0b3
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Sat Mar 14 02:07:20 2020 -0700
thread: simplify Thread::Create API
I set out to remove boost::bind from this API, but after that I found it
cleaner to force callers to create the lambdas rather than retain the
Thread::Create multi-arg overloads.
Change-Id: I581378b093a8d2e8536b7b07cca5520fdaa7672d
Reviewed-on: http://gerrit.cloudera.org:8080/15448
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/clock/builtin_ntp.cc | 2 +-
src/kudu/hms/hms_catalog.cc | 2 +-
src/kudu/master/catalog_manager.cc | 2 +-
src/kudu/master/hms_notification_log_listener.cc | 2 +-
src/kudu/rpc/acceptor_pool.cc | 2 +-
src/kudu/rpc/reactor.cc | 3 +-
src/kudu/rpc/result_tracker.cc | 4 +-
src/kudu/rpc/retriable_rpc.h | 2 +
src/kudu/rpc/service_pool.cc | 2 +-
src/kudu/server/diagnostics_log.cc | 3 +-
src/kudu/server/server_base.cc | 10 ++-
src/kudu/subprocess/server.cc | 24 +++---
src/kudu/tserver/heartbeater.cc | 2 +-
src/kudu/tserver/scanners.cc | 2 +-
src/kudu/tserver/tablet_copy_service.cc | 3 +-
src/kudu/util/cloud/instance_detector.cc | 6 +-
src/kudu/util/debug-util-test.cc | 68 ++++++++--------
src/kudu/util/file_cache.cc | 2 +-
src/kudu/util/kernel_stack_watchdog.cc | 3 +-
src/kudu/util/maintenance_manager.cc | 5 +-
src/kudu/util/minidump.cc | 4 +-
src/kudu/util/pstack_watcher.cc | 3 +-
src/kudu/util/thread-test.cc | 13 ++--
src/kudu/util/thread.cc | 25 +++---
src/kudu/util/thread.h | 99 ++++++------------------
src/kudu/util/threadpool.cc | 2 +-
src/kudu/util/trace-test.cc | 7 +-
src/kudu/util/ttl_cache.h | 2 +-
28 files changed, 132 insertions(+), 172 deletions(-)
diff --git a/src/kudu/clock/builtin_ntp.cc b/src/kudu/clock/builtin_ntp.cc
index e0b74db..613a71e 100644
--- a/src/kudu/clock/builtin_ntp.cc
+++ b/src/kudu/clock/builtin_ntp.cc
@@ -630,7 +630,7 @@ Status BuiltInNtp::InitImpl() {
RETURN_NOT_OK_PREPEND(socket_.SetRecvTimeout(MonoDelta::FromSeconds(0.5)),
"could not set socket recv timeout");
RETURN_NOT_OK_PREPEND(
- Thread::Create("ntp", "ntp client", &BuiltInNtp::PollThread, this, &thread_),
+ Thread::Create("ntp", "ntp client", [this]() { this->PollThread(); }, &thread_),
"could not start NTP client thread");
return Status::OK();
}
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index 2f08fba..da0e5cd 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -130,7 +130,7 @@ Status HmsCatalog::Start(HmsClientVerifyKuduSyncConfig verify_service_config) {
RETURN_NOT_OK(ha_client_.Start(std::move(addresses), std::move(options)));
RETURN_NOT_OK(Thread::Create("hms_catalog", "fetch_uuid",
- &HmsCatalog::LoopInitializeUuid, this,
+ [this](){ this->LoopInitializeUuid(); },
&uuid_initializing_thread_));
return Status::OK();
}
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 122d0b8..193d69c 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -578,7 +578,7 @@ class CatalogManagerBgTasks {
Status CatalogManagerBgTasks::Init() {
RETURN_NOT_OK(kudu::Thread::Create("catalog manager", "bgtasks",
- &CatalogManagerBgTasks::Run, this, &thread_));
+ [this]() { this->Run(); }, &thread_));
return Status::OK();
}
diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc
index d310e40..8198651 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -92,7 +92,7 @@ HmsNotificationLogListenerTask::~HmsNotificationLogListenerTask() {
Status HmsNotificationLogListenerTask::Init() {
CHECK(!thread_) << "HmsNotificationLogListenerTask is already initialized";
return kudu::Thread::Create("catalog manager", "hms-notification-log-listener",
- &HmsNotificationLogListenerTask::RunLoop, this, &thread_);
+ [this]() { this->RunLoop(); }, &thread_);
}
void HmsNotificationLogListenerTask::Shutdown() {
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index 88d48da..9971557 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -84,7 +84,7 @@ Status AcceptorPool::Start(int num_threads) {
for (int i = 0; i < num_threads; i++) {
scoped_refptr<kudu::Thread> new_thread;
Status s = kudu::Thread::Create("acceptor pool", "acceptor",
- &AcceptorPool::RunThread, this, &new_thread);
+ [this]() { this->RunThread(); }, &new_thread);
if (!s.ok()) {
Shutdown();
return s;
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index e9f807d..231783b 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -185,7 +185,8 @@ Status ReactorThread::Init() {
ev_set_invoke_pending_cb(loop_, &ReactorThread::InvokePendingCb);
// Create Reactor thread.
- return kudu::Thread::Create("reactor", "rpc reactor", &ReactorThread::RunThread, this, &thread_);
+ return kudu::Thread::Create("reactor", "rpc reactor",
+ [this]() { this->RunThread(); }, &thread_);
}
void ReactorThread::InvokePendingCb(struct ev_loop* loop) {
diff --git a/src/kudu/rpc/result_tracker.cc b/src/kudu/rpc/result_tracker.cc
index f8f5c46..a96710f 100644
--- a/src/kudu/rpc/result_tracker.cc
+++ b/src/kudu/rpc/result_tracker.cc
@@ -459,8 +459,8 @@ void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
void ResultTracker::StartGCThread() {
CHECK(!gc_thread_);
- CHECK_OK(Thread::Create("server", "result-tracker", &ResultTracker::RunGCThread,
- this, &gc_thread_));
+ CHECK_OK(Thread::Create("server", "result-tracker",
+ [this]() { this->RunGCThread(); }, &gc_thread_));
}
void ResultTracker::RunGCThread() {
diff --git a/src/kudu/rpc/retriable_rpc.h b/src/kudu/rpc/retriable_rpc.h
index 9a0b8f5..6e19e89 100644
--- a/src/kudu/rpc/retriable_rpc.h
+++ b/src/kudu/rpc/retriable_rpc.h
@@ -19,6 +19,8 @@
#include <memory>
#include <string>
+#include <boost/bind.hpp>
+
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc
index a12a34d..57643b1 100644
--- a/src/kudu/rpc/service_pool.cc
+++ b/src/kudu/rpc/service_pool.cc
@@ -90,7 +90,7 @@ Status ServicePool::Init(int num_threads) {
for (int i = 0; i < num_threads; i++) {
scoped_refptr<kudu::Thread> new_thread;
CHECK_OK(kudu::Thread::Create("service pool", "rpc worker",
- &ServicePool::RunThread, this, &new_thread));
+ [this]() { this->RunThread(); }, &new_thread));
threads_.push_back(new_thread);
}
return Status::OK();
diff --git a/src/kudu/server/diagnostics_log.cc b/src/kudu/server/diagnostics_log.cc
index 2b11540..03a8a1b 100644
--- a/src/kudu/server/diagnostics_log.cc
+++ b/src/kudu/server/diagnostics_log.cc
@@ -140,8 +140,7 @@ Status DiagnosticsLog::Start() {
RETURN_NOT_OK_PREPEND(l->Open(), "unable to open diagnostics log");
log_ = std::move(l);
Status s = Thread::Create("server", "diag-logger",
- &DiagnosticsLog::RunThread,
- this, &thread_);
+ [this]() { this->RunThread(); }, &thread_);
if (!s.ok()) {
// Don't leave the log open if we failed to start our thread.
log_.reset();
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index b666d6f..828b3b2 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -709,8 +709,9 @@ Status ServerBase::StartExcessLogFileDeleterThread() {
}
RETURN_NOT_OK_PREPEND(minidump_handler_->DeleteExcessMinidumpFiles(options_.env),
"Unable to delete excess minidump files");
- return Thread::Create("server", "excess-log-deleter", &ServerBase::ExcessLogFileDeleterThread,
- this, &excess_log_deleter_thread_);
+ return Thread::Create("server", "excess-log-deleter",
+ [this]() { this->ExcessLogFileDeleterThread(); },
+ &excess_log_deleter_thread_);
}
void ServerBase::ExcessLogFileDeleterThread() {
@@ -754,8 +755,9 @@ void ServerBase::ShutdownImpl() {
#ifdef TCMALLOC_ENABLED
Status ServerBase::StartTcmallocMemoryGcThread() {
- return Thread::Create("server", "tcmalloc-memory-gc", &ServerBase::TcmallocMemoryGcThread,
- this, &tcmalloc_memory_gc_thread_);
+ return Thread::Create("server", "tcmalloc-memory-gc",
+ [this]() { this->TcmallocMemoryGcThread(); },
+ &tcmalloc_memory_gc_thread_);
}
void ServerBase::TcmallocMemoryGcThread() {
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 6687e3b..1b961e0 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -114,8 +114,9 @@ Status SubprocessServer::Init() {
VLOG(2) << "Starting the subprocess";
Synchronizer sync;
auto cb = sync.AsStdStatusCallback();
- RETURN_NOT_OK(Thread::Create("subprocess", "start", &SubprocessServer::StartSubprocessThread,
- this, cb, &read_thread_));
+ RETURN_NOT_OK(Thread::Create("subprocess", "start",
+ [this, &cb]() { this->StartSubprocessThread(cb); },
+ &read_thread_));
RETURN_NOT_OK_PREPEND(sync.Wait(), "Failed to start subprocess");
// Start the message protocol.
@@ -127,16 +128,19 @@ Status SubprocessServer::Init() {
const int num_threads = FLAGS_subprocess_num_responder_threads;
responder_threads_.resize(num_threads);
for (int i = 0; i < num_threads; i++) {
- RETURN_NOT_OK(Thread::Create("subprocess", "responder", &SubprocessServer::ResponderThread,
- this, &responder_threads_[i]));
+ RETURN_NOT_OK(Thread::Create("subprocess", "responder",
+ [this]() { this->ResponderThread(); },
+ &responder_threads_[i]));
}
- RETURN_NOT_OK(Thread::Create("subprocess", "reader", &SubprocessServer::ReceiveMessagesThread,
- this, &read_thread_));
- RETURN_NOT_OK(Thread::Create("subprocess", "writer", &SubprocessServer::SendMessagesThread,
- this, &write_thread_));
+ RETURN_NOT_OK(Thread::Create("subprocess", "reader",
+ [this]() { this->ReceiveMessagesThread(); },
+ &read_thread_));
+ RETURN_NOT_OK(Thread::Create("subprocess", "writer",
+ [this]() { this->SendMessagesThread(); },
+ &write_thread_));
return Thread::Create("subprocess", "deadline-checker",
- &SubprocessServer::CheckDeadlinesThread,
- this, &deadline_checker_);
+ [this]() { this->CheckDeadlinesThread(); },
+ &deadline_checker_);
}
Status SubprocessServer::Execute(SubprocessRequestPB* req,
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index d88ebf5..c212bfa 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -644,7 +644,7 @@ Status Heartbeater::Thread::Start() {
should_run_ = true;
return kudu::Thread::Create("heartbeater", "heartbeat",
- &Heartbeater::Thread::RunThread, this, &thread_);
+ [this]() { this->RunThread(); }, &thread_);
}
Status Heartbeater::Thread::Stop() {
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index 418bbdc..a98f251 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -113,7 +113,7 @@ ScannerManager::~ScannerManager() {
Status ScannerManager::StartRemovalThread() {
RETURN_NOT_OK(Thread::Create("scanners", "removal_thread",
- &ScannerManager::RunRemovalThread, this,
+ [this]() { this->RunRemovalThread(); },
&removal_thread_));
return Status::OK();
}
diff --git a/src/kudu/tserver/tablet_copy_service.cc b/src/kudu/tserver/tablet_copy_service.cc
index 3fb028c..b9088ea 100644
--- a/src/kudu/tserver/tablet_copy_service.cc
+++ b/src/kudu/tserver/tablet_copy_service.cc
@@ -46,6 +46,7 @@
#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random_util.h"
+#include "kudu/util/thread.h"
#define RPC_RETURN_NOT_OK(expr, app_err, message, context) \
do { \
@@ -109,7 +110,7 @@ TabletCopyServiceImpl::TabletCopyServiceImpl(
shutdown_latch_(1),
tablet_copy_metrics_(server->metric_entity()) {
CHECK_OK(Thread::Create("tablet-copy", "tc-session-exp",
- &TabletCopyServiceImpl::EndExpiredSessions, this,
+ [this]() { this->EndExpiredSessions(); },
&session_expiration_thread_));
}
diff --git a/src/kudu/util/cloud/instance_detector.cc b/src/kudu/util/cloud/instance_detector.cc
index 4dd7321..183139d 100644
--- a/src/kudu/util/cloud/instance_detector.cc
+++ b/src/kudu/util/cloud/instance_detector.cc
@@ -71,8 +71,10 @@ Status InstanceDetector::Detect(unique_ptr<InstanceMetadata>* metadata) {
CHECK(d.metadata);
CHECK(!d.runner);
scoped_refptr<Thread> runner;
- RETURN_NOT_OK(Thread::Create("cloud detector", TypeToString(d.metadata->type()),
- &InstanceDetector::GetInstanceInfo, this, d.metadata.get(), idx, &runner));
+ RETURN_NOT_OK(Thread::Create(
+ "cloud detector", TypeToString(d.metadata->type()),
+ [this, &d, idx]() { this->GetInstanceInfo(d.metadata.get(), idx); },
+ &runner));
d.runner = std::move(runner);
}
diff --git a/src/kudu/util/debug-util-test.cc b/src/kudu/util/debug-util-test.cc
index 25e4ae0..1c498f2 100644
--- a/src/kudu/util/debug-util-test.cc
+++ b/src/kudu/util/debug-util-test.cc
@@ -19,12 +19,15 @@
#ifdef __linux__
#include <link.h>
#endif
+#include "kudu/util/debug-util.h"
+
#include <unistd.h>
#include <algorithm>
#include <csignal>
#include <cstddef>
#include <cstdint>
+#include <initializer_list>
#include <memory>
#include <ostream>
#include <string>
@@ -39,7 +42,6 @@
#include "kudu/gutil/walltime.h"
#include "kudu/util/array_view.h"
#include "kudu/util/countdown_latch.h"
-#include "kudu/util/debug-util.h"
#include "kudu/util/kernel_stack_watchdog.h"
#include "kudu/util/monotime.h"
#include "kudu/util/scoped_cleanup.h"
@@ -107,12 +109,12 @@ TEST_F(DebugUtilTest, TestStackTraceMainThread) {
TEST_F(DebugUtilTest, TestSignalStackTrace) {
CountDownLatch l(1);
scoped_refptr<Thread> t;
- ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
- auto cleanup_thr = MakeScopedCleanup([&]() {
- // Allow the thread to finish.
- l.CountDown();
- t->Join();
- });
+ ASSERT_OK(Thread::Create("test", "test thread", [&l]() { SleeperThread(&l); }, &t));
+ SCOPED_CLEANUP({
+ // Allow the thread to finish.
+ l.CountDown();
+ t->Join();
+ });
// We have to loop a little bit because it takes a little while for the thread
// to start up and actually call our function.
@@ -193,16 +195,17 @@ TEST_F(DebugUtilTest, TestSnapshot) {
CountDownLatch l(1);
vector<scoped_refptr<Thread>> threads(kNumThreads);
for (int i = 0; i < kNumThreads; i++) {
- ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &threads[i]));
+ ASSERT_OK(Thread::Create("test", "test thread",
+ [&l]() { SleeperThread(&l); }, &threads[i]));
}
SCOPED_CLEANUP({
- // Allow the thread to finish.
- l.CountDown();
- for (auto& t : threads) {
- t->Join();
- }
- });
+ // Allow the thread to finish.
+ l.CountDown();
+ for (auto& t : threads) {
+ t->Join();
+ }
+ });
StackTraceSnapshot snap;
ASSERT_OK(snap.SnapshotAllStacks());
@@ -233,12 +236,12 @@ TEST_F(DebugUtilTest, TestSnapshot) {
TEST_F(DebugUtilTest, Benchmark) {
CountDownLatch l(1);
scoped_refptr<Thread> t;
- ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
+ ASSERT_OK(Thread::Create("test", "test thread", [&l]() { SleeperThread(&l); }, &t));
SCOPED_CLEANUP({
- // Allow the thread to finish.
- l.CountDown();
- t->Join();
- });
+ // Allow the thread to finish.
+ l.CountDown();
+ t->Join();
+ });
for (bool symbolize : {false, true}) {
MonoTime end_time = MonoTime::Now() + MonoDelta::FromSeconds(1);
@@ -356,14 +359,15 @@ TEST_P(RaceTest, TestStackTraceRaces) {
DangerousOp op = GetParam();
CountDownLatch l(1);
scoped_refptr<Thread> t;
- ASSERT_OK(Thread::Create("test", "test thread", &DangerousOperationThread, op, &l, &t));
+ ASSERT_OK(Thread::Create("test", "test thread",
+ [op, &l]() { DangerousOperationThread(op, &l); }, &t));
SCOPED_CLEANUP({
- // Allow the thread to finish.
- l.CountDown();
- // Crash if we can't join the thread after a reasonable amount of time.
- // That probably indicates a deadlock.
- CHECK_OK(ThreadJoiner(t.get()).give_up_after_ms(10000).Join());
- });
+ // Allow the thread to finish.
+ l.CountDown();
+ // Crash if we can't join the thread after a reasonable amount of time.
+ // That probably indicates a deadlock.
+ CHECK_OK(ThreadJoiner(t.get()).give_up_after_ms(10000).Join());
+ });
MonoTime end_time = MonoTime::Now() + MonoDelta::FromSeconds(1);
while (MonoTime::Now() < end_time) {
StackTrace trace;
@@ -399,12 +403,12 @@ TEST_F(DebugUtilTest, TestTimeouts) {
CountDownLatch l(1);
scoped_refptr<Thread> t;
- ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
- auto cleanup_thr = MakeScopedCleanup([&]() {
- // Allow the thread to finish.
- l.CountDown();
- t->Join();
- });
+ ASSERT_OK(Thread::Create("test", "test thread", [&l]() { SleeperThread(&l); }, &t));
+ SCOPED_CLEANUP({
+ // Allow the thread to finish.
+ l.CountDown();
+ t->Join();
+ });
// First, time a few stack traces to determine how long a non-timed-out stack
// trace takes.
diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc
index 0bbbfe7..4e39b71 100644
--- a/src/kudu/util/file_cache.cc
+++ b/src/kudu/util/file_cache.cc
@@ -486,7 +486,7 @@ FileCache::~FileCache() {
Status FileCache::Init() {
return Thread::Create("cache", Substitute("$0-evict", cache_name_),
- &FileCache::RunDescriptorExpiry, this,
+ [this]() { this->RunDescriptorExpiry(); },
&descriptor_expiry_thread_);
}
diff --git a/src/kudu/util/kernel_stack_watchdog.cc b/src/kudu/util/kernel_stack_watchdog.cc
index 27a259c..4fa6de8 100644
--- a/src/kudu/util/kernel_stack_watchdog.cc
+++ b/src/kudu/util/kernel_stack_watchdog.cc
@@ -24,7 +24,6 @@
#include <string>
#include <utility>
-#include <boost/bind.hpp>
#include <glog/logging.h>
#include <gflags/gflags.h>
@@ -70,7 +69,7 @@ KernelStackWatchdog::KernelStackWatchdog()
// try to call back into initializing the stack watchdog, and will self-deadlock.
CHECK_OK(Thread::CreateWithFlags(
"kernel-watchdog", "kernel-watcher",
- boost::bind(&KernelStackWatchdog::RunThread, this),
+ [this]() { this->RunThread(); },
Thread::NO_STACK_WATCHDOG,
&thread_));
}
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index fb7043e..4495077 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -28,7 +28,6 @@
#include <utility>
#include <vector>
-#include <boost/bind.hpp>
#include <gflags/gflags.h>
#include "kudu/gutil/dynamic_annotations.h"
@@ -192,8 +191,8 @@ MaintenanceManager::~MaintenanceManager() {
Status MaintenanceManager::Start() {
CHECK(!monitor_thread_);
RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler",
- boost::bind(&MaintenanceManager::RunSchedulerThread, this),
- &monitor_thread_));
+ [this]() { this->RunSchedulerThread(); },
+ &monitor_thread_));
return Status::OK();
}
diff --git a/src/kudu/util/minidump.cc b/src/kudu/util/minidump.cc
index 9a301f4..0a9c417 100644
--- a/src/kudu/util/minidump.cc
+++ b/src/kudu/util/minidump.cc
@@ -281,8 +281,8 @@ void MinidumpExceptionHandler::UnregisterMinidumpExceptionHandler() {
Status MinidumpExceptionHandler::StartUserSignalHandlerThread() {
user_signal_handler_thread_running_.store(true, std::memory_order_relaxed);
return Thread::Create("minidump", "sigusr1-handler",
- &MinidumpExceptionHandler::RunUserSignalHandlerThread,
- this, &user_signal_handler_thread_);
+ [this]() { this->RunUserSignalHandlerThread(); },
+ &user_signal_handler_thread_);
}
void MinidumpExceptionHandler::StopUserSignalHandlerThread() {
diff --git a/src/kudu/util/pstack_watcher.cc b/src/kudu/util/pstack_watcher.cc
index 4f67e70..f1e97c4 100644
--- a/src/kudu/util/pstack_watcher.cc
+++ b/src/kudu/util/pstack_watcher.cc
@@ -25,7 +25,6 @@
#include <string>
#include <vector>
-#include <boost/bind.hpp>
#include <glog/logging.h>
#include "kudu/gutil/macros.h"
@@ -51,7 +50,7 @@ using strings::Substitute;
PstackWatcher::PstackWatcher(MonoDelta timeout)
: timeout_(timeout), running_(true), cond_(&lock_) {
CHECK_OK(Thread::Create("pstack_watcher", "pstack_watcher",
- boost::bind(&PstackWatcher::Run, this), &thread_));
+ [this]() { this->Run(); }, &thread_));
}
PstackWatcher::~PstackWatcher() {
diff --git a/src/kudu/util/thread-test.cc b/src/kudu/util/thread-test.cc
index d3ee733..bea5007 100644
--- a/src/kudu/util/thread-test.cc
+++ b/src/kudu/util/thread-test.cc
@@ -17,7 +17,6 @@
#include "kudu/util/thread.h"
-#include <sys/types.h>
#include <unistd.h>
#include <ostream>
@@ -52,7 +51,8 @@ TEST_F(ThreadTest, TestJoinAndWarn) {
}
scoped_refptr<Thread> holder;
- ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, &holder));
+ ASSERT_OK(Thread::Create("test", "sleeper thread",
+ []() { usleep(1000*1000); }, &holder));
ASSERT_OK(ThreadJoiner(holder.get())
.warn_after_ms(10)
.warn_every_ms(100)
@@ -66,7 +66,8 @@ TEST_F(ThreadTest, TestFailedJoin) {
}
scoped_refptr<Thread> holder;
- ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, &holder));
+ ASSERT_OK(Thread::Create("test", "sleeper thread",
+ []() { usleep(1000*1000); }, &holder));
Status s = ThreadJoiner(holder.get())
.give_up_after_ms(50)
.Join();
@@ -89,7 +90,8 @@ TEST_F(ThreadTest, TestJoinOnSelf) {
TEST_F(ThreadTest, TestDoubleJoinIsNoOp) {
scoped_refptr<Thread> holder;
- ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 0, &holder));
+ ASSERT_OK(Thread::Create("test", "sleeper thread",
+ []() { usleep(0); }, &holder));
ThreadJoiner joiner(holder.get());
ASSERT_OK(joiner.Join());
ASSERT_OK(joiner.Join());
@@ -99,7 +101,8 @@ TEST_F(ThreadTest, ThreadStartBenchmark) {
std::vector<scoped_refptr<Thread>> threads(1000);
LOG_TIMING(INFO, "starting threads") {
for (auto& t : threads) {
- ASSERT_OK(Thread::Create("test", "TestCallOnExit", usleep, 0, &t));
+ ASSERT_OK(Thread::Create("test", "TestCallOnExit",
+ []() { usleep(0); }, &t));
}
}
LOG_TIMING(INFO, "waiting for all threads to publish TIDs") {
diff --git a/src/kudu/util/thread.cc b/src/kudu/util/thread.cc
index 6697aa6..2fab076 100644
--- a/src/kudu/util/thread.cc
+++ b/src/kudu/util/thread.cc
@@ -23,6 +23,7 @@
#include <sys/prctl.h>
#endif // defined(__linux__)
#include <sys/resource.h>
+#include <sys/time.h>
#include <unistd.h>
#include <algorithm>
@@ -35,7 +36,6 @@
#include <utility>
#include <vector>
-#include <boost/bind.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
@@ -64,8 +64,6 @@
#include "kudu/util/url-coding.h"
#include "kudu/util/web_callback_registry.h"
-using boost::bind;
-using boost::mem_fn;
using std::ostringstream;
using std::pair;
using std::shared_ptr;
@@ -307,11 +305,13 @@ Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metric
Bind(&GetInVoluntaryContextSwitches)));
if (web) {
- auto thread_callback = bind<void>(mem_fn(&ThreadMgr::ThreadPathHandler),
- this, _1, _2);
- DCHECK_NOTNULL(web)->RegisterPathHandler("/threadz", "Threads", thread_callback,
- /* is_styled= */ true,
- /* is_on_nav_bar= */ true);
+ DCHECK_NOTNULL(web)->RegisterPathHandler(
+ "/threadz", "Threads", [this](const WebCallbackRegistry::WebRequest& req,
+ WebCallbackRegistry::WebResponse* resp) {
+ this->ThreadPathHandler(req, resp);
+ },
+ /* is_styled= */ true,
+ /* is_on_nav_bar= */ true);
}
return Status::OK();
}
@@ -573,8 +573,8 @@ int64_t Thread::WaitForTid() const {
}
-Status Thread::StartThread(const string& category, const string& name,
- const ThreadFunctor& functor, uint64_t flags,
+Status Thread::StartThread(string category, string name,
+ std::function<void()> functor, uint64_t flags,
scoped_refptr<Thread> *holder) {
TRACE_COUNTER_INCREMENT("threads_started", 1);
TRACE_COUNTER_SCOPE_LATENCY_US("thread_start_us");
@@ -584,7 +584,8 @@ Status Thread::StartThread(const string& category, const string& name,
SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "starting thread");
// Temporary reference for the duration of this function.
- scoped_refptr<Thread> t(new Thread(category, name, functor));
+ scoped_refptr<Thread> t(new Thread(
+ std::move(category), std::move(name), std::move(functor)));
// Optional, and only set if the thread was successfully created.
//
@@ -639,7 +640,7 @@ Status Thread::StartThread(const string& category, const string& name,
t->joinable_ = true;
cleanup.cancel();
- VLOG(2) << "Started thread " << t->tid()<< " - " << category << ":" << name;
+ VLOG(2) << Substitute("Started thread $0 - $1: $2", t->tid(), t->category(), t->name());
return Status::OK();
}
diff --git a/src/kudu/util/thread.h b/src/kudu/util/thread.h
index 443b739..91e4480 100644
--- a/src/kudu/util/thread.h
+++ b/src/kudu/util/thread.h
@@ -14,11 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
-// Copied from Impala and adapted to Kudu.
-
-#ifndef KUDU_UTIL_THREAD_H
-#define KUDU_UTIL_THREAD_H
+#pragma once
#include <pthread.h>
#if defined(__linux__)
@@ -29,12 +25,10 @@
#include <unistd.h>
#include <cstdint>
+#include <functional>
#include <string>
#include <utility>
-#include <boost/bind.hpp> // IWYU pragma: keep
-#include <boost/function.hpp> // IWYU pragma: keep
-
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
@@ -131,70 +125,26 @@ class Thread : public RefCountedThreadSafe<Thread> {
NO_STACK_WATCHDOG = 1 << 0
};
- // This constructor pattern mimics that in boost::thread. There is
- // one constructor for each number of arguments that the thread
- // function accepts. To extend the set of acceptable signatures, add
- // another constructor with <class F, class A1.... class An>.
- //
- // In general:
- // - category: string identifying the thread category to which this thread belongs,
- // used for organising threads together on the debug UI.
- // - name: name of this thread. Will be appended with "-<thread-id>" to ensure
- // uniqueness.
- // - F - a method type that supports operator(), and the instance passed to the
- // constructor is executed immediately in a separate thread.
- // - A1...An - argument types whose instances are passed to f(...)
- // - holder - optional shared pointer to hold a reference to the created thread.
- template <class F>
- static Status CreateWithFlags(const std::string& category, const std::string& name,
- const F& f, uint64_t flags,
+ // Creates and starts a new thread.
+ // - category: string identifying the thread category to which this thread
+ // belongs, used for organising threads together on the debug UI.
+ // - name: name of this thread. Will be appended with "-<thread-id>" to
+ // ensure uniqueness.
+ // - f: function passed to the constructor and executed immediately in the
+ // separate thread.
+ // - holder: optional shared pointer to hold a reference to the created thread.
+ static Status CreateWithFlags(std::string category, std::string name,
+ std::function<void()> f, uint64_t flags,
scoped_refptr<Thread>* holder) {
- return StartThread(category, name, f, flags, holder);
-
- }
- template <class F>
- static Status Create(const std::string& category, const std::string& name, const F& f,
- scoped_refptr<Thread>* holder) {
- return StartThread(category, name, f, NO_FLAGS, holder);
- }
-
- template <class F, class A1>
- static Status Create(const std::string& category, const std::string& name, const F& f,
- const A1& a1, scoped_refptr<Thread>* holder) {
- return StartThread(category, name, boost::bind(f, a1), NO_FLAGS, holder);
- }
+ return StartThread(std::move(category), std::move(name), std::move(f),
+ flags, holder);
- template <class F, class A1, class A2>
- static Status Create(const std::string& category, const std::string& name, const F& f,
- const A1& a1, const A2& a2, scoped_refptr<Thread>* holder) {
- return StartThread(category, name, boost::bind(f, a1, a2), NO_FLAGS, holder);
}
-
- template <class F, class A1, class A2, class A3>
- static Status Create(const std::string& category, const std::string& name, const F& f,
- const A1& a1, const A2& a2, const A3& a3, scoped_refptr<Thread>* holder) {
- return StartThread(category, name, boost::bind(f, a1, a2, a3), NO_FLAGS, holder);
- }
-
- template <class F, class A1, class A2, class A3, class A4>
- static Status Create(const std::string& category, const std::string& name, const F& f,
- const A1& a1, const A2& a2, const A3& a3, const A4& a4,
- scoped_refptr<Thread>* holder) {
- return StartThread(category, name, boost::bind(f, a1, a2, a3, a4), NO_FLAGS, holder);
- }
-
- template <class F, class A1, class A2, class A3, class A4, class A5>
- static Status Create(const std::string& category, const std::string& name, const F& f,
- const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5,
+ static Status Create(std::string category, std::string name,
+ std::function<void()> f,
scoped_refptr<Thread>* holder) {
- return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5), NO_FLAGS, holder);
- }
-
- template <class F, class A1, class A2, class A3, class A4, class A5, class A6>
- static Status Create(const std::string& category, const std::string& name, const F& f,
- const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5,
- const A6& a6, scoped_refptr<Thread>* holder) {
- return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5, a6), NO_FLAGS, holder);
+ return StartThread(std::move(category), std::move(name), std::move(f),
+ NO_FLAGS, holder);
}
// Emulates boost::thread and detaches.
@@ -287,10 +237,7 @@ class Thread : public RefCountedThreadSafe<Thread> {
PARENT_WAITING_TID = -2,
};
- // Function object that wraps the user-supplied function to run in a separate thread.
- typedef boost::function<void ()> ThreadFunctor;
-
- Thread(std::string category, std::string name, ThreadFunctor functor)
+ Thread(std::string category, std::string name, std::function<void()> functor)
: thread_(0),
category_(std::move(category)),
name_(std::move(name)),
@@ -317,7 +264,7 @@ class Thread : public RefCountedThreadSafe<Thread> {
int64_t tid_;
// User function to be executed by this thread.
- const ThreadFunctor functor_;
+ const std::function<void()> functor_;
// Joiners wait on this latch to be notified if the thread is done.
//
@@ -339,8 +286,8 @@ class Thread : public RefCountedThreadSafe<Thread> {
// initialised and its TID has been read. Waits for notification from the started
// thread that initialisation is complete before returning. On success, stores a
// reference to the thread in holder.
- static Status StartThread(const std::string& category, const std::string& name,
- const ThreadFunctor& functor, uint64_t flags,
+ static Status StartThread(std::string category, std::string name,
+ std::function<void()> functor, uint64_t flags,
scoped_refptr<Thread>* holder);
// Wrapper for the user-supplied function. Invoked from the new thread,
@@ -364,5 +311,3 @@ class Thread : public RefCountedThreadSafe<Thread> {
Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics,
WebCallbackRegistry* web);
} // namespace kudu
-
-#endif /* KUDU_UTIL_THREAD_H */
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 5199086..9d8cc4a 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -697,7 +697,7 @@ void ThreadPool::DispatchThread() {
Status ThreadPool::CreateThread() {
return kudu::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_),
- &ThreadPool::DispatchThread, this, nullptr);
+ [this]() { this->DispatchThread(); }, nullptr);
}
void ThreadPool::CheckNotPoolThreadUnlocked() {
diff --git a/src/kudu/util/trace-test.cc b/src/kudu/util/trace-test.cc
index a1388ee..90679d6 100644
--- a/src/kudu/util/trace-test.cc
+++ b/src/kudu/util/trace-test.cc
@@ -20,7 +20,6 @@
#include <cctype>
#include <cstdint>
#include <cstring>
-#include <functional>
#include <map>
#include <ostream>
#include <string>
@@ -173,7 +172,7 @@ TEST_F(TraceTest, TestChromeTracing) {
for (int i = 0; i < kNumThreads; i++) {
CHECK_OK(Thread::CreateWithFlags(
"test", "gen-traces",
- std::bind(GenerateTraceEvents, i, kEventsPerThread),
+ [i, kEventsPerThread]() { GenerateTraceEvents(i, kEventsPerThread); },
Thread::NO_STACK_WATCHDOG, &threads[i]));
}
@@ -208,7 +207,7 @@ TEST_F(TraceTest, TestTraceFromExitedThread) {
int kNumEvents = 10;
scoped_refptr<Thread> t;
CHECK_OK(Thread::CreateWithFlags(
- "test", "gen-traces", std::bind(GenerateTraceEvents, 1, kNumEvents),
+ "test", "gen-traces", [kNumEvents]() { GenerateTraceEvents(1, kNumEvents); },
Thread::NO_STACK_WATCHDOG, &t));
t->Join();
tl->SetDisabled();
@@ -286,7 +285,7 @@ TEST_F(TraceTest, TestStartAndStopCollection) {
scoped_refptr<Thread> t;
CHECK_OK(Thread::CreateWithFlags(
"test", "gen-traces",
- std::bind(GenerateTracesUntilLatch, &num_events_generated, &latch),
+ [&]() { GenerateTracesUntilLatch(&num_events_generated, &latch); },
Thread::NO_STACK_WATCHDOG, &t));
const int num_flushes = AllowSlowTests() ? 50 : 3;
diff --git a/src/kudu/util/ttl_cache.h b/src/kudu/util/ttl_cache.h
index cff37f4..1a0c3ee 100644
--- a/src/kudu/util/ttl_cache.h
+++ b/src/kudu/util/ttl_cache.h
@@ -138,7 +138,7 @@ class TTLCache {
scrubbing_thread_running_.Reset(1);
CHECK_OK(Thread::Create(
"cache", strings::Substitute("$0-scrubbing", cache_name),
- &TTLCache::ScrubExpiredEntries, this, &scrubbing_thread_));
+ [this]() { this->ScrubExpiredEntries(); }, &scrubbing_thread_));
VLOG(1) << strings::Substitute(
"started scrubbing thread for TTL cache '$0' with period of $1",
cache_name, scrubbing_period_.ToString());