You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/18 06:45:21 UTC

[kudu] 03/03: thread: simplify Thread::Create API

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f37e7a6e0a249d32568707bdd2f5305c3663e0b3
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Sat Mar 14 02:07:20 2020 -0700

    thread: simplify Thread::Create API
    
    I set out to remove boost::bind from this API, but after that I found it
    cleaner to force callers to create the lambdas rather than retain the
    Thread::Create multi-arg overloads.
    
    Change-Id: I581378b093a8d2e8536b7b07cca5520fdaa7672d
    Reviewed-on: http://gerrit.cloudera.org:8080/15448
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/clock/builtin_ntp.cc                    |  2 +-
 src/kudu/hms/hms_catalog.cc                      |  2 +-
 src/kudu/master/catalog_manager.cc               |  2 +-
 src/kudu/master/hms_notification_log_listener.cc |  2 +-
 src/kudu/rpc/acceptor_pool.cc                    |  2 +-
 src/kudu/rpc/reactor.cc                          |  3 +-
 src/kudu/rpc/result_tracker.cc                   |  4 +-
 src/kudu/rpc/retriable_rpc.h                     |  2 +
 src/kudu/rpc/service_pool.cc                     |  2 +-
 src/kudu/server/diagnostics_log.cc               |  3 +-
 src/kudu/server/server_base.cc                   | 10 ++-
 src/kudu/subprocess/server.cc                    | 24 +++---
 src/kudu/tserver/heartbeater.cc                  |  2 +-
 src/kudu/tserver/scanners.cc                     |  2 +-
 src/kudu/tserver/tablet_copy_service.cc          |  3 +-
 src/kudu/util/cloud/instance_detector.cc         |  6 +-
 src/kudu/util/debug-util-test.cc                 | 68 ++++++++--------
 src/kudu/util/file_cache.cc                      |  2 +-
 src/kudu/util/kernel_stack_watchdog.cc           |  3 +-
 src/kudu/util/maintenance_manager.cc             |  5 +-
 src/kudu/util/minidump.cc                        |  4 +-
 src/kudu/util/pstack_watcher.cc                  |  3 +-
 src/kudu/util/thread-test.cc                     | 13 ++--
 src/kudu/util/thread.cc                          | 25 +++---
 src/kudu/util/thread.h                           | 99 ++++++------------------
 src/kudu/util/threadpool.cc                      |  2 +-
 src/kudu/util/trace-test.cc                      |  7 +-
 src/kudu/util/ttl_cache.h                        |  2 +-
 28 files changed, 132 insertions(+), 172 deletions(-)

diff --git a/src/kudu/clock/builtin_ntp.cc b/src/kudu/clock/builtin_ntp.cc
index e0b74db..613a71e 100644
--- a/src/kudu/clock/builtin_ntp.cc
+++ b/src/kudu/clock/builtin_ntp.cc
@@ -630,7 +630,7 @@ Status BuiltInNtp::InitImpl() {
   RETURN_NOT_OK_PREPEND(socket_.SetRecvTimeout(MonoDelta::FromSeconds(0.5)),
                         "could not set socket recv timeout");
   RETURN_NOT_OK_PREPEND(
-      Thread::Create("ntp", "ntp client", &BuiltInNtp::PollThread, this, &thread_),
+      Thread::Create("ntp", "ntp client", [this]() { this->PollThread(); }, &thread_),
       "could not start NTP client thread");
   return Status::OK();
 }
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index 2f08fba..da0e5cd 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -130,7 +130,7 @@ Status HmsCatalog::Start(HmsClientVerifyKuduSyncConfig verify_service_config) {
   RETURN_NOT_OK(ha_client_.Start(std::move(addresses), std::move(options)));
 
   RETURN_NOT_OK(Thread::Create("hms_catalog", "fetch_uuid",
-                               &HmsCatalog::LoopInitializeUuid, this,
+                               [this](){ this->LoopInitializeUuid(); },
                                &uuid_initializing_thread_));
   return Status::OK();
 }
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 122d0b8..193d69c 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -578,7 +578,7 @@ class CatalogManagerBgTasks {
 
 Status CatalogManagerBgTasks::Init() {
   RETURN_NOT_OK(kudu::Thread::Create("catalog manager", "bgtasks",
-      &CatalogManagerBgTasks::Run, this, &thread_));
+                                     [this]() { this->Run(); }, &thread_));
   return Status::OK();
 }
 
diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc
index d310e40..8198651 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -92,7 +92,7 @@ HmsNotificationLogListenerTask::~HmsNotificationLogListenerTask() {
 Status HmsNotificationLogListenerTask::Init() {
   CHECK(!thread_) << "HmsNotificationLogListenerTask is already initialized";
   return kudu::Thread::Create("catalog manager", "hms-notification-log-listener",
-                              &HmsNotificationLogListenerTask::RunLoop, this, &thread_);
+                              [this]() { this->RunLoop(); }, &thread_);
 }
 
 void HmsNotificationLogListenerTask::Shutdown() {
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index 88d48da..9971557 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -84,7 +84,7 @@ Status AcceptorPool::Start(int num_threads) {
   for (int i = 0; i < num_threads; i++) {
     scoped_refptr<kudu::Thread> new_thread;
     Status s = kudu::Thread::Create("acceptor pool", "acceptor",
-        &AcceptorPool::RunThread, this, &new_thread);
+                                    [this]() { this->RunThread(); }, &new_thread);
     if (!s.ok()) {
       Shutdown();
       return s;
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index e9f807d..231783b 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -185,7 +185,8 @@ Status ReactorThread::Init() {
   ev_set_invoke_pending_cb(loop_, &ReactorThread::InvokePendingCb);
 
   // Create Reactor thread.
-  return kudu::Thread::Create("reactor", "rpc reactor", &ReactorThread::RunThread, this, &thread_);
+  return kudu::Thread::Create("reactor", "rpc reactor",
+                              [this]() { this->RunThread(); }, &thread_);
 }
 
 void ReactorThread::InvokePendingCb(struct ev_loop* loop) {
diff --git a/src/kudu/rpc/result_tracker.cc b/src/kudu/rpc/result_tracker.cc
index f8f5c46..a96710f 100644
--- a/src/kudu/rpc/result_tracker.cc
+++ b/src/kudu/rpc/result_tracker.cc
@@ -459,8 +459,8 @@ void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
 
 void ResultTracker::StartGCThread() {
   CHECK(!gc_thread_);
-  CHECK_OK(Thread::Create("server", "result-tracker", &ResultTracker::RunGCThread,
-                          this, &gc_thread_));
+  CHECK_OK(Thread::Create("server", "result-tracker",
+                          [this]() { this->RunGCThread(); }, &gc_thread_));
 }
 
 void ResultTracker::RunGCThread() {
diff --git a/src/kudu/rpc/retriable_rpc.h b/src/kudu/rpc/retriable_rpc.h
index 9a0b8f5..6e19e89 100644
--- a/src/kudu/rpc/retriable_rpc.h
+++ b/src/kudu/rpc/retriable_rpc.h
@@ -19,6 +19,8 @@
 #include <memory>
 #include <string>
 
+#include <boost/bind.hpp>
+
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc
index a12a34d..57643b1 100644
--- a/src/kudu/rpc/service_pool.cc
+++ b/src/kudu/rpc/service_pool.cc
@@ -90,7 +90,7 @@ Status ServicePool::Init(int num_threads) {
   for (int i = 0; i < num_threads; i++) {
     scoped_refptr<kudu::Thread> new_thread;
     CHECK_OK(kudu::Thread::Create("service pool", "rpc worker",
-        &ServicePool::RunThread, this, &new_thread));
+                                  [this]() { this->RunThread(); }, &new_thread));
     threads_.push_back(new_thread);
   }
   return Status::OK();
diff --git a/src/kudu/server/diagnostics_log.cc b/src/kudu/server/diagnostics_log.cc
index 2b11540..03a8a1b 100644
--- a/src/kudu/server/diagnostics_log.cc
+++ b/src/kudu/server/diagnostics_log.cc
@@ -140,8 +140,7 @@ Status DiagnosticsLog::Start() {
   RETURN_NOT_OK_PREPEND(l->Open(), "unable to open diagnostics log");
   log_ = std::move(l);
   Status s = Thread::Create("server", "diag-logger",
-                            &DiagnosticsLog::RunThread,
-                            this, &thread_);
+                            [this]() { this->RunThread(); }, &thread_);
   if (!s.ok()) {
     // Don't leave the log open if we failed to start our thread.
     log_.reset();
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index b666d6f..828b3b2 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -709,8 +709,9 @@ Status ServerBase::StartExcessLogFileDeleterThread() {
   }
   RETURN_NOT_OK_PREPEND(minidump_handler_->DeleteExcessMinidumpFiles(options_.env),
                         "Unable to delete excess minidump files");
-  return Thread::Create("server", "excess-log-deleter", &ServerBase::ExcessLogFileDeleterThread,
-                        this, &excess_log_deleter_thread_);
+  return Thread::Create("server", "excess-log-deleter",
+                        [this]() { this->ExcessLogFileDeleterThread(); },
+                        &excess_log_deleter_thread_);
 }
 
 void ServerBase::ExcessLogFileDeleterThread() {
@@ -754,8 +755,9 @@ void ServerBase::ShutdownImpl() {
 
 #ifdef TCMALLOC_ENABLED
 Status ServerBase::StartTcmallocMemoryGcThread() {
-  return Thread::Create("server", "tcmalloc-memory-gc", &ServerBase::TcmallocMemoryGcThread,
-                        this, &tcmalloc_memory_gc_thread_);
+  return Thread::Create("server", "tcmalloc-memory-gc",
+                        [this]() { this->TcmallocMemoryGcThread(); },
+                        &tcmalloc_memory_gc_thread_);
 }
 
 void ServerBase::TcmallocMemoryGcThread() {
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 6687e3b..1b961e0 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -114,8 +114,9 @@ Status SubprocessServer::Init() {
   VLOG(2) << "Starting the subprocess";
   Synchronizer sync;
   auto cb = sync.AsStdStatusCallback();
-  RETURN_NOT_OK(Thread::Create("subprocess", "start", &SubprocessServer::StartSubprocessThread,
-                               this, cb, &read_thread_));
+  RETURN_NOT_OK(Thread::Create("subprocess", "start",
+                               [this, &cb]() { this->StartSubprocessThread(cb); },
+                               &read_thread_));
   RETURN_NOT_OK_PREPEND(sync.Wait(), "Failed to start subprocess");
 
   // Start the message protocol.
@@ -127,16 +128,19 @@ Status SubprocessServer::Init() {
   const int num_threads = FLAGS_subprocess_num_responder_threads;
   responder_threads_.resize(num_threads);
   for (int i = 0; i < num_threads; i++) {
-    RETURN_NOT_OK(Thread::Create("subprocess", "responder", &SubprocessServer::ResponderThread,
-                                 this, &responder_threads_[i]));
+    RETURN_NOT_OK(Thread::Create("subprocess", "responder",
+                                 [this]() { this->ResponderThread(); },
+                                 &responder_threads_[i]));
   }
-  RETURN_NOT_OK(Thread::Create("subprocess", "reader", &SubprocessServer::ReceiveMessagesThread,
-                               this, &read_thread_));
-  RETURN_NOT_OK(Thread::Create("subprocess", "writer", &SubprocessServer::SendMessagesThread,
-                               this, &write_thread_));
+  RETURN_NOT_OK(Thread::Create("subprocess", "reader",
+                               [this]() { this->ReceiveMessagesThread(); },
+                               &read_thread_));
+  RETURN_NOT_OK(Thread::Create("subprocess", "writer",
+                               [this]() { this->SendMessagesThread(); },
+                               &write_thread_));
   return Thread::Create("subprocess", "deadline-checker",
-                        &SubprocessServer::CheckDeadlinesThread,
-                        this, &deadline_checker_);
+                        [this]() { this->CheckDeadlinesThread(); },
+                        &deadline_checker_);
 }
 
 Status SubprocessServer::Execute(SubprocessRequestPB* req,
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index d88ebf5..c212bfa 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -644,7 +644,7 @@ Status Heartbeater::Thread::Start() {
 
   should_run_ = true;
   return kudu::Thread::Create("heartbeater", "heartbeat",
-      &Heartbeater::Thread::RunThread, this, &thread_);
+                              [this]() { this->RunThread(); }, &thread_);
 }
 
 Status Heartbeater::Thread::Stop() {
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index 418bbdc..a98f251 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -113,7 +113,7 @@ ScannerManager::~ScannerManager() {
 
 Status ScannerManager::StartRemovalThread() {
   RETURN_NOT_OK(Thread::Create("scanners", "removal_thread",
-                               &ScannerManager::RunRemovalThread, this,
+                               [this]() { this->RunRemovalThread(); },
                                &removal_thread_));
   return Status::OK();
 }
diff --git a/src/kudu/tserver/tablet_copy_service.cc b/src/kudu/tserver/tablet_copy_service.cc
index 3fb028c..b9088ea 100644
--- a/src/kudu/tserver/tablet_copy_service.cc
+++ b/src/kudu/tserver/tablet_copy_service.cc
@@ -46,6 +46,7 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/random_util.h"
+#include "kudu/util/thread.h"
 
 #define RPC_RETURN_NOT_OK(expr, app_err, message, context) \
   do { \
@@ -109,7 +110,7 @@ TabletCopyServiceImpl::TabletCopyServiceImpl(
       shutdown_latch_(1),
       tablet_copy_metrics_(server->metric_entity()) {
   CHECK_OK(Thread::Create("tablet-copy", "tc-session-exp",
-                          &TabletCopyServiceImpl::EndExpiredSessions, this,
+                          [this]() { this->EndExpiredSessions(); },
                           &session_expiration_thread_));
 }
 
diff --git a/src/kudu/util/cloud/instance_detector.cc b/src/kudu/util/cloud/instance_detector.cc
index 4dd7321..183139d 100644
--- a/src/kudu/util/cloud/instance_detector.cc
+++ b/src/kudu/util/cloud/instance_detector.cc
@@ -71,8 +71,10 @@ Status InstanceDetector::Detect(unique_ptr<InstanceMetadata>* metadata) {
     CHECK(d.metadata);
     CHECK(!d.runner);
     scoped_refptr<Thread> runner;
-    RETURN_NOT_OK(Thread::Create("cloud detector", TypeToString(d.metadata->type()),
-        &InstanceDetector::GetInstanceInfo, this, d.metadata.get(), idx, &runner));
+    RETURN_NOT_OK(Thread::Create(
+        "cloud detector", TypeToString(d.metadata->type()),
+        [this, &d, idx]() { this->GetInstanceInfo(d.metadata.get(), idx); },
+        &runner));
     d.runner = std::move(runner);
   }
 
diff --git a/src/kudu/util/debug-util-test.cc b/src/kudu/util/debug-util-test.cc
index 25e4ae0..1c498f2 100644
--- a/src/kudu/util/debug-util-test.cc
+++ b/src/kudu/util/debug-util-test.cc
@@ -19,12 +19,15 @@
 #ifdef __linux__
 #include <link.h>
 #endif
+#include "kudu/util/debug-util.h"
+
 #include <unistd.h>
 
 #include <algorithm>
 #include <csignal>
 #include <cstddef>
 #include <cstdint>
+#include <initializer_list>
 #include <memory>
 #include <ostream>
 #include <string>
@@ -39,7 +42,6 @@
 #include "kudu/gutil/walltime.h"
 #include "kudu/util/array_view.h"
 #include "kudu/util/countdown_latch.h"
-#include "kudu/util/debug-util.h"
 #include "kudu/util/kernel_stack_watchdog.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -107,12 +109,12 @@ TEST_F(DebugUtilTest, TestStackTraceMainThread) {
 TEST_F(DebugUtilTest, TestSignalStackTrace) {
   CountDownLatch l(1);
   scoped_refptr<Thread> t;
-  ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
-  auto cleanup_thr = MakeScopedCleanup([&]() {
-      // Allow the thread to finish.
-      l.CountDown();
-      t->Join();
-    });
+  ASSERT_OK(Thread::Create("test", "test thread", [&l]() { SleeperThread(&l); }, &t));
+  SCOPED_CLEANUP({
+    // Allow the thread to finish.
+    l.CountDown();
+    t->Join();
+  });
 
   // We have to loop a little bit because it takes a little while for the thread
   // to start up and actually call our function.
@@ -193,16 +195,17 @@ TEST_F(DebugUtilTest, TestSnapshot) {
   CountDownLatch l(1);
   vector<scoped_refptr<Thread>> threads(kNumThreads);
   for (int i = 0; i < kNumThreads; i++) {
-    ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &threads[i]));
+    ASSERT_OK(Thread::Create("test", "test thread",
+                             [&l]() { SleeperThread(&l); }, &threads[i]));
   }
 
   SCOPED_CLEANUP({
-      // Allow the thread to finish.
-      l.CountDown();
-      for (auto& t : threads) {
-        t->Join();
-      }
-    });
+    // Allow the thread to finish.
+    l.CountDown();
+    for (auto& t : threads) {
+      t->Join();
+    }
+  });
 
   StackTraceSnapshot snap;
   ASSERT_OK(snap.SnapshotAllStacks());
@@ -233,12 +236,12 @@ TEST_F(DebugUtilTest, TestSnapshot) {
 TEST_F(DebugUtilTest, Benchmark) {
   CountDownLatch l(1);
   scoped_refptr<Thread> t;
-  ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
+  ASSERT_OK(Thread::Create("test", "test thread", [&l]() { SleeperThread(&l); }, &t));
   SCOPED_CLEANUP({
-      // Allow the thread to finish.
-      l.CountDown();
-      t->Join();
-    });
+    // Allow the thread to finish.
+    l.CountDown();
+    t->Join();
+  });
 
   for (bool symbolize : {false, true}) {
     MonoTime end_time = MonoTime::Now() + MonoDelta::FromSeconds(1);
@@ -356,14 +359,15 @@ TEST_P(RaceTest, TestStackTraceRaces) {
   DangerousOp op = GetParam();
   CountDownLatch l(1);
   scoped_refptr<Thread> t;
-  ASSERT_OK(Thread::Create("test", "test thread", &DangerousOperationThread, op, &l, &t));
+  ASSERT_OK(Thread::Create("test", "test thread",
+                           [op, &l]() { DangerousOperationThread(op, &l); }, &t));
   SCOPED_CLEANUP({
-      // Allow the thread to finish.
-      l.CountDown();
-      // Crash if we can't join the thread after a reasonable amount of time.
-      // That probably indicates a deadlock.
-      CHECK_OK(ThreadJoiner(t.get()).give_up_after_ms(10000).Join());
-    });
+    // Allow the thread to finish.
+    l.CountDown();
+    // Crash if we can't join the thread after a reasonable amount of time.
+    // That probably indicates a deadlock.
+    CHECK_OK(ThreadJoiner(t.get()).give_up_after_ms(10000).Join());
+  });
   MonoTime end_time = MonoTime::Now() + MonoDelta::FromSeconds(1);
   while (MonoTime::Now() < end_time) {
     StackTrace trace;
@@ -399,12 +403,12 @@ TEST_F(DebugUtilTest, TestTimeouts) {
 
   CountDownLatch l(1);
   scoped_refptr<Thread> t;
-  ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
-  auto cleanup_thr = MakeScopedCleanup([&]() {
-      // Allow the thread to finish.
-      l.CountDown();
-      t->Join();
-    });
+  ASSERT_OK(Thread::Create("test", "test thread", [&l]() { SleeperThread(&l); }, &t));
+  SCOPED_CLEANUP({
+    // Allow the thread to finish.
+    l.CountDown();
+    t->Join();
+  });
 
   // First, time a few stack traces to determine how long a non-timed-out stack
   // trace takes.
diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc
index 0bbbfe7..4e39b71 100644
--- a/src/kudu/util/file_cache.cc
+++ b/src/kudu/util/file_cache.cc
@@ -486,7 +486,7 @@ FileCache::~FileCache() {
 
 Status FileCache::Init() {
   return Thread::Create("cache", Substitute("$0-evict", cache_name_),
-                        &FileCache::RunDescriptorExpiry, this,
+                        [this]() { this->RunDescriptorExpiry(); },
                         &descriptor_expiry_thread_);
 }
 
diff --git a/src/kudu/util/kernel_stack_watchdog.cc b/src/kudu/util/kernel_stack_watchdog.cc
index 27a259c..4fa6de8 100644
--- a/src/kudu/util/kernel_stack_watchdog.cc
+++ b/src/kudu/util/kernel_stack_watchdog.cc
@@ -24,7 +24,6 @@
 #include <string>
 #include <utility>
 
-#include <boost/bind.hpp>
 #include <glog/logging.h>
 #include <gflags/gflags.h>
 
@@ -70,7 +69,7 @@ KernelStackWatchdog::KernelStackWatchdog()
   // try to call back into initializing the stack watchdog, and will self-deadlock.
   CHECK_OK(Thread::CreateWithFlags(
       "kernel-watchdog", "kernel-watcher",
-      boost::bind(&KernelStackWatchdog::RunThread, this),
+      [this]() { this->RunThread(); },
       Thread::NO_STACK_WATCHDOG,
       &thread_));
 }
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index fb7043e..4495077 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -28,7 +28,6 @@
 #include <utility>
 #include <vector>
 
-#include <boost/bind.hpp>
 #include <gflags/gflags.h>
 
 #include "kudu/gutil/dynamic_annotations.h"
@@ -192,8 +191,8 @@ MaintenanceManager::~MaintenanceManager() {
 Status MaintenanceManager::Start() {
   CHECK(!monitor_thread_);
   RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler",
-      boost::bind(&MaintenanceManager::RunSchedulerThread, this),
-      &monitor_thread_));
+                               [this]() { this->RunSchedulerThread(); },
+                               &monitor_thread_));
   return Status::OK();
 }
 
diff --git a/src/kudu/util/minidump.cc b/src/kudu/util/minidump.cc
index 9a301f4..0a9c417 100644
--- a/src/kudu/util/minidump.cc
+++ b/src/kudu/util/minidump.cc
@@ -281,8 +281,8 @@ void MinidumpExceptionHandler::UnregisterMinidumpExceptionHandler() {
 Status MinidumpExceptionHandler::StartUserSignalHandlerThread() {
   user_signal_handler_thread_running_.store(true, std::memory_order_relaxed);
   return Thread::Create("minidump", "sigusr1-handler",
-                        &MinidumpExceptionHandler::RunUserSignalHandlerThread,
-                        this, &user_signal_handler_thread_);
+                        [this]() { this->RunUserSignalHandlerThread(); },
+                        &user_signal_handler_thread_);
 }
 
 void MinidumpExceptionHandler::StopUserSignalHandlerThread() {
diff --git a/src/kudu/util/pstack_watcher.cc b/src/kudu/util/pstack_watcher.cc
index 4f67e70..f1e97c4 100644
--- a/src/kudu/util/pstack_watcher.cc
+++ b/src/kudu/util/pstack_watcher.cc
@@ -25,7 +25,6 @@
 #include <string>
 #include <vector>
 
-#include <boost/bind.hpp>
 #include <glog/logging.h>
 
 #include "kudu/gutil/macros.h"
@@ -51,7 +50,7 @@ using strings::Substitute;
 PstackWatcher::PstackWatcher(MonoDelta timeout)
     : timeout_(timeout), running_(true), cond_(&lock_) {
   CHECK_OK(Thread::Create("pstack_watcher", "pstack_watcher",
-                 boost::bind(&PstackWatcher::Run, this), &thread_));
+                          [this]() { this->Run(); }, &thread_));
 }
 
 PstackWatcher::~PstackWatcher() {
diff --git a/src/kudu/util/thread-test.cc b/src/kudu/util/thread-test.cc
index d3ee733..bea5007 100644
--- a/src/kudu/util/thread-test.cc
+++ b/src/kudu/util/thread-test.cc
@@ -17,7 +17,6 @@
 
 #include "kudu/util/thread.h"
 
-#include <sys/types.h>
 #include <unistd.h>
 
 #include <ostream>
@@ -52,7 +51,8 @@ TEST_F(ThreadTest, TestJoinAndWarn) {
   }
 
   scoped_refptr<Thread> holder;
-  ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, &holder));
+  ASSERT_OK(Thread::Create("test", "sleeper thread",
+                           []() { usleep(1000*1000); }, &holder));
   ASSERT_OK(ThreadJoiner(holder.get())
                    .warn_after_ms(10)
                    .warn_every_ms(100)
@@ -66,7 +66,8 @@ TEST_F(ThreadTest, TestFailedJoin) {
   }
 
   scoped_refptr<Thread> holder;
-  ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, &holder));
+  ASSERT_OK(Thread::Create("test", "sleeper thread",
+                           []() { usleep(1000*1000); }, &holder));
   Status s = ThreadJoiner(holder.get())
     .give_up_after_ms(50)
     .Join();
@@ -89,7 +90,8 @@ TEST_F(ThreadTest, TestJoinOnSelf) {
 
 TEST_F(ThreadTest, TestDoubleJoinIsNoOp) {
   scoped_refptr<Thread> holder;
-  ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 0, &holder));
+  ASSERT_OK(Thread::Create("test", "sleeper thread",
+                           []() { usleep(0); }, &holder));
   ThreadJoiner joiner(holder.get());
   ASSERT_OK(joiner.Join());
   ASSERT_OK(joiner.Join());
@@ -99,7 +101,8 @@ TEST_F(ThreadTest, ThreadStartBenchmark) {
   std::vector<scoped_refptr<Thread>> threads(1000);
   LOG_TIMING(INFO, "starting threads") {
     for (auto& t : threads) {
-      ASSERT_OK(Thread::Create("test", "TestCallOnExit", usleep, 0, &t));
+      ASSERT_OK(Thread::Create("test", "TestCallOnExit",
+                               []() { usleep(0); }, &t));
     }
   }
   LOG_TIMING(INFO, "waiting for all threads to publish TIDs") {
diff --git a/src/kudu/util/thread.cc b/src/kudu/util/thread.cc
index 6697aa6..2fab076 100644
--- a/src/kudu/util/thread.cc
+++ b/src/kudu/util/thread.cc
@@ -23,6 +23,7 @@
 #include <sys/prctl.h>
 #endif // defined(__linux__)
 #include <sys/resource.h>
+#include <sys/time.h>
 #include <unistd.h>
 
 #include <algorithm>
@@ -35,7 +36,6 @@
 #include <utility>
 #include <vector>
 
-#include <boost/bind.hpp>
 #include <boost/smart_ptr/shared_ptr.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
@@ -64,8 +64,6 @@
 #include "kudu/util/url-coding.h"
 #include "kudu/util/web_callback_registry.h"
 
-using boost::bind;
-using boost::mem_fn;
 using std::ostringstream;
 using std::pair;
 using std::shared_ptr;
@@ -307,11 +305,13 @@ Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metric
         Bind(&GetInVoluntaryContextSwitches)));
 
   if (web) {
-    auto thread_callback = bind<void>(mem_fn(&ThreadMgr::ThreadPathHandler),
-                                      this, _1, _2);
-    DCHECK_NOTNULL(web)->RegisterPathHandler("/threadz", "Threads", thread_callback,
-                                             /* is_styled= */ true,
-                                             /* is_on_nav_bar= */ true);
+    DCHECK_NOTNULL(web)->RegisterPathHandler(
+        "/threadz", "Threads", [this](const WebCallbackRegistry::WebRequest& req,
+                                      WebCallbackRegistry::WebResponse* resp) {
+          this->ThreadPathHandler(req, resp);
+        },
+        /* is_styled= */ true,
+        /* is_on_nav_bar= */ true);
   }
   return Status::OK();
 }
@@ -573,8 +573,8 @@ int64_t Thread::WaitForTid() const {
 }
 
 
-Status Thread::StartThread(const string& category, const string& name,
-                           const ThreadFunctor& functor, uint64_t flags,
+Status Thread::StartThread(string category, string name,
+                           std::function<void()> functor, uint64_t flags,
                            scoped_refptr<Thread> *holder) {
   TRACE_COUNTER_INCREMENT("threads_started", 1);
   TRACE_COUNTER_SCOPE_LATENCY_US("thread_start_us");
@@ -584,7 +584,8 @@ Status Thread::StartThread(const string& category, const string& name,
   SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "starting thread");
 
   // Temporary reference for the duration of this function.
-  scoped_refptr<Thread> t(new Thread(category, name, functor));
+  scoped_refptr<Thread> t(new Thread(
+      std::move(category), std::move(name), std::move(functor)));
 
   // Optional, and only set if the thread was successfully created.
   //
@@ -639,7 +640,7 @@ Status Thread::StartThread(const string& category, const string& name,
   t->joinable_ = true;
   cleanup.cancel();
 
-  VLOG(2) << "Started thread " << t->tid()<< " - " << category << ":" << name;
+  VLOG(2) << Substitute("Started thread $0 - $1: $2", t->tid(), t->category(), t->name());
   return Status::OK();
 }
 
diff --git a/src/kudu/util/thread.h b/src/kudu/util/thread.h
index 443b739..91e4480 100644
--- a/src/kudu/util/thread.h
+++ b/src/kudu/util/thread.h
@@ -14,11 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
-// Copied from Impala and adapted to Kudu.
-
-#ifndef KUDU_UTIL_THREAD_H
-#define KUDU_UTIL_THREAD_H
+#pragma once
 
 #include <pthread.h>
 #if defined(__linux__)
@@ -29,12 +25,10 @@
 #include <unistd.h>
 
 #include <cstdint>
+#include <functional>
 #include <string>
 #include <utility>
 
-#include <boost/bind.hpp>     // IWYU pragma: keep
-#include <boost/function.hpp> // IWYU pragma: keep
-
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
@@ -131,70 +125,26 @@ class Thread : public RefCountedThreadSafe<Thread> {
     NO_STACK_WATCHDOG = 1 << 0
   };
 
-  // This constructor pattern mimics that in boost::thread. There is
-  // one constructor for each number of arguments that the thread
-  // function accepts. To extend the set of acceptable signatures, add
-  // another constructor with <class F, class A1.... class An>.
-  //
-  // In general:
-  //  - category: string identifying the thread category to which this thread belongs,
-  //    used for organising threads together on the debug UI.
-  //  - name: name of this thread. Will be appended with "-<thread-id>" to ensure
-  //    uniqueness.
-  //  - F - a method type that supports operator(), and the instance passed to the
-  //    constructor is executed immediately in a separate thread.
-  //  - A1...An - argument types whose instances are passed to f(...)
-  //  - holder - optional shared pointer to hold a reference to the created thread.
-  template <class F>
-  static Status CreateWithFlags(const std::string& category, const std::string& name,
-                                const F& f, uint64_t flags,
+  // Creates and starts a new thread.
+  //  - category: string identifying the thread category to which this thread
+  //    belongs, used for organising threads together on the debug UI.
+  //  - name: name of this thread. Will be appended with "-<thread-id>" to
+  //    ensure uniqueness.
+  //  - f: function passed to the constructor and executed immediately in the
+  //    separate thread.
+  //  - holder: optional shared pointer to hold a reference to the created thread.
+  static Status CreateWithFlags(std::string category, std::string name,
+                                std::function<void()> f, uint64_t flags,
                                 scoped_refptr<Thread>* holder) {
-    return StartThread(category, name, f, flags, holder);
-
-  }
-  template <class F>
-  static Status Create(const std::string& category, const std::string& name, const F& f,
-                       scoped_refptr<Thread>* holder) {
-    return StartThread(category, name, f, NO_FLAGS, holder);
-  }
-
-  template <class F, class A1>
-  static Status Create(const std::string& category, const std::string& name, const F& f,
-                       const A1& a1, scoped_refptr<Thread>* holder) {
-    return StartThread(category, name, boost::bind(f, a1), NO_FLAGS, holder);
-  }
+    return StartThread(std::move(category), std::move(name), std::move(f),
+                       flags, holder);
 
-  template <class F, class A1, class A2>
-  static Status Create(const std::string& category, const std::string& name, const F& f,
-                       const A1& a1, const A2& a2, scoped_refptr<Thread>* holder) {
-    return StartThread(category, name, boost::bind(f, a1, a2), NO_FLAGS, holder);
   }
-
-  template <class F, class A1, class A2, class A3>
-  static Status Create(const std::string& category, const std::string& name, const F& f,
-                       const A1& a1, const A2& a2, const A3& a3, scoped_refptr<Thread>* holder) {
-    return StartThread(category, name, boost::bind(f, a1, a2, a3), NO_FLAGS, holder);
-  }
-
-  template <class F, class A1, class A2, class A3, class A4>
-  static Status Create(const std::string& category, const std::string& name, const F& f,
-                       const A1& a1, const A2& a2, const A3& a3, const A4& a4,
-                       scoped_refptr<Thread>* holder) {
-    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4), NO_FLAGS, holder);
-  }
-
-  template <class F, class A1, class A2, class A3, class A4, class A5>
-  static Status Create(const std::string& category, const std::string& name, const F& f,
-                       const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5,
+  static Status Create(std::string category, std::string name,
+                       std::function<void()> f,
                        scoped_refptr<Thread>* holder) {
-    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5), NO_FLAGS, holder);
-  }
-
-  template <class F, class A1, class A2, class A3, class A4, class A5, class A6>
-  static Status Create(const std::string& category, const std::string& name, const F& f,
-                       const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5,
-                       const A6& a6, scoped_refptr<Thread>* holder) {
-    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5, a6), NO_FLAGS, holder);
+    return StartThread(std::move(category), std::move(name), std::move(f),
+                       NO_FLAGS, holder);
   }
 
   // Emulates boost::thread and detaches.
@@ -287,10 +237,7 @@ class Thread : public RefCountedThreadSafe<Thread> {
     PARENT_WAITING_TID = -2,
   };
 
-  // Function object that wraps the user-supplied function to run in a separate thread.
-  typedef boost::function<void ()> ThreadFunctor;
-
-  Thread(std::string category, std::string name, ThreadFunctor functor)
+  Thread(std::string category, std::string name, std::function<void()> functor)
       : thread_(0),
         category_(std::move(category)),
         name_(std::move(name)),
@@ -317,7 +264,7 @@ class Thread : public RefCountedThreadSafe<Thread> {
   int64_t tid_;
 
   // User function to be executed by this thread.
-  const ThreadFunctor functor_;
+  const std::function<void()> functor_;
 
   // Joiners wait on this latch to be notified if the thread is done.
   //
@@ -339,8 +286,8 @@ class Thread : public RefCountedThreadSafe<Thread> {
   // initialised and its TID has been read. Waits for notification from the started
   // thread that initialisation is complete before returning. On success, stores a
   // reference to the thread in holder.
-  static Status StartThread(const std::string& category, const std::string& name,
-                            const ThreadFunctor& functor, uint64_t flags,
+  static Status StartThread(std::string category, std::string name,
+                            std::function<void()> functor, uint64_t flags,
                             scoped_refptr<Thread>* holder);
 
   // Wrapper for the user-supplied function. Invoked from the new thread,
@@ -364,5 +311,3 @@ class Thread : public RefCountedThreadSafe<Thread> {
 Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics,
                                   WebCallbackRegistry* web);
 } // namespace kudu
-
-#endif /* KUDU_UTIL_THREAD_H */
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 5199086..9d8cc4a 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -697,7 +697,7 @@ void ThreadPool::DispatchThread() {
 
 Status ThreadPool::CreateThread() {
   return kudu::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_),
-                              &ThreadPool::DispatchThread, this, nullptr);
+                              [this]() { this->DispatchThread(); }, nullptr);
 }
 
 void ThreadPool::CheckNotPoolThreadUnlocked() {
diff --git a/src/kudu/util/trace-test.cc b/src/kudu/util/trace-test.cc
index a1388ee..90679d6 100644
--- a/src/kudu/util/trace-test.cc
+++ b/src/kudu/util/trace-test.cc
@@ -20,7 +20,6 @@
 #include <cctype>
 #include <cstdint>
 #include <cstring>
-#include <functional>
 #include <map>
 #include <ostream>
 #include <string>
@@ -173,7 +172,7 @@ TEST_F(TraceTest, TestChromeTracing) {
   for (int i = 0; i < kNumThreads; i++) {
     CHECK_OK(Thread::CreateWithFlags(
         "test", "gen-traces",
-        std::bind(GenerateTraceEvents, i, kEventsPerThread),
+        [i, kEventsPerThread]() { GenerateTraceEvents(i, kEventsPerThread); },
         Thread::NO_STACK_WATCHDOG, &threads[i]));
   }
 
@@ -208,7 +207,7 @@ TEST_F(TraceTest, TestTraceFromExitedThread) {
   int kNumEvents = 10;
   scoped_refptr<Thread> t;
   CHECK_OK(Thread::CreateWithFlags(
-      "test", "gen-traces", std::bind(GenerateTraceEvents, 1, kNumEvents),
+      "test", "gen-traces", [kNumEvents]() { GenerateTraceEvents(1, kNumEvents); },
       Thread::NO_STACK_WATCHDOG, &t));
   t->Join();
   tl->SetDisabled();
@@ -286,7 +285,7 @@ TEST_F(TraceTest, TestStartAndStopCollection) {
   scoped_refptr<Thread> t;
   CHECK_OK(Thread::CreateWithFlags(
       "test", "gen-traces",
-      std::bind(GenerateTracesUntilLatch, &num_events_generated, &latch),
+      [&]() { GenerateTracesUntilLatch(&num_events_generated, &latch); },
       Thread::NO_STACK_WATCHDOG, &t));
 
   const int num_flushes = AllowSlowTests() ? 50 : 3;
diff --git a/src/kudu/util/ttl_cache.h b/src/kudu/util/ttl_cache.h
index cff37f4..1a0c3ee 100644
--- a/src/kudu/util/ttl_cache.h
+++ b/src/kudu/util/ttl_cache.h
@@ -138,7 +138,7 @@ class TTLCache {
       scrubbing_thread_running_.Reset(1);
       CHECK_OK(Thread::Create(
           "cache", strings::Substitute("$0-scrubbing", cache_name),
-          &TTLCache::ScrubExpiredEntries, this, &scrubbing_thread_));
+          [this]() { this->ScrubExpiredEntries(); }, &scrubbing_thread_));
       VLOG(1) << strings::Substitute(
           "started scrubbing thread for TTL cache '$0' with period of $1",
           cache_name, scrubbing_period_.ToString());