You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/12 23:40:47 UTC

[kudu] branch master updated: threadpool: simplify Submit API

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new e133157  threadpool: simplify Submit API
e133157 is described below

commit e13315794e911556fc1702247d169c5aa29786c5
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Tue Mar 10 14:44:59 2020 -0700

    threadpool: simplify Submit API
    
    In an effort to modernize our codebase, I'm working to reduce the usage of
    boost::bind and std::bind. They can be actively harmful[1], and by
    comparison, lambdas offer the compiler more opportunities to inline. Also,
    std::bind tends to be aggressive with using allocations for storage, though
    boost::bind is better about this.
    
    This patch modifies ThreadPool to expose just a single Submit API which
    expects an std::function. Furthermore, all callers have been converted away
    from bind-based function creation and towards lambdas, which implicitly cast
    to std::function.
    
    There are contortions in a couple places to accommodate the lack of C++14
    lambda support (e.g. we need capture-by-move semantics in RaftConsensus),
    but these are minimal.
    
    1. https://abseil.io/tips/108
    
    Change-Id: I2bd2d59809225e4cde7e273e95428478a282aa2d
    Reviewed-on: http://gerrit.cloudera.org:8080/15401
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/codegen/compilation_manager.cc            |  17 +-
 src/kudu/consensus/consensus-test-util.h           |  10 +-
 src/kudu/consensus/consensus_peers.cc              |   4 +-
 src/kudu/consensus/consensus_queue.cc              |  66 +++----
 src/kudu/consensus/log.cc                          |   7 +-
 src/kudu/consensus/raft_consensus.cc               |  38 ++--
 src/kudu/fs/dir_manager.cc                         |   2 +-
 src/kudu/master/catalog_manager.cc                 |   3 +-
 src/kudu/master/master.cc                          |   4 +-
 src/kudu/rpc/reactor.cc                            |   5 +-
 src/kudu/tablet/tablet_replica.cc                  |   2 +-
 src/kudu/tablet/transactions/transaction_driver.cc |   6 +-
 src/kudu/thrift/client.h                           |   2 +-
 src/kudu/tools/ksck.cc                             |   4 +-
 src/kudu/tools/ksck_checksum.cc                    |   5 +-
 src/kudu/tools/ksck_remote.cc                      |   7 +-
 src/kudu/tools/table_scanner.cc                    |  13 +-
 src/kudu/tserver/ts_tablet_manager.cc              |  99 +++++-----
 src/kudu/tserver/ts_tablet_manager.h               |   2 +-
 src/kudu/util/countdown_latch-test.cc              |   4 +-
 src/kudu/util/curl_util-test.cc                    |   2 +-
 src/kudu/util/maintenance_manager.cc               |   3 +-
 src/kudu/util/net/dns_resolver.cc                  |   7 +-
 src/kudu/util/net/dns_resolver.h                   |   4 +-
 src/kudu/util/threadpool-test.cc                   | 200 +++++++++------------
 src/kudu/util/threadpool.cc                        |  67 +------
 src/kudu/util/threadpool.h                         |  41 +----
 27 files changed, 264 insertions(+), 360 deletions(-)

diff --git a/src/kudu/codegen/compilation_manager.cc b/src/kudu/codegen/compilation_manager.cc
index 2e45be4..3a7e5af 100644
--- a/src/kudu/codegen/compilation_manager.cc
+++ b/src/kudu/codegen/compilation_manager.cc
@@ -44,6 +44,7 @@
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/threadpool.h"
 
+using std::make_shared;
 using std::shared_ptr;
 using std::unique_ptr;
 
@@ -72,10 +73,10 @@ namespace codegen {
 
 namespace {
 
-// A CompilationTask is a ThreadPool's Runnable which, given a
-// pair of schemas and a cache to refer to, will generate code pertaining
-// to the two schemas and store it in the cache when run.
-class CompilationTask : public Runnable {
+// A CompilationTask is a task which, given a pair of schemas and a cache to
+// refer to, will generate code pertaining to the two schemas and store it in
+// the cache when run.
+class CompilationTask {
  public:
   // Requires that the cache and generator are valid for the lifetime
   // of this object.
@@ -87,7 +88,7 @@ class CompilationTask : public Runnable {
       generator_(generator) {}
 
   // Can only be run once.
-  void Run() override {
+  void Run() {
     // We need to fail softly because the user could have just given
     // a malformed projection schema pair, but could be long gone by
     // now so there's nowhere to return the status to.
@@ -188,9 +189,9 @@ bool CompilationManager::RequestRowProjector(const Schema* base_schema,
 
   // If not cached, add a request to compilation pool
   if (!cached) {
-    shared_ptr<Runnable> task(
-      new CompilationTask(*base_schema, *projection, &cache_, &generator_));
-    WARN_NOT_OK(pool_->Submit(task),
+    shared_ptr<CompilationTask> task(make_shared<CompilationTask>(
+        *base_schema, *projection, &cache_, &generator_));
+    WARN_NOT_OK(pool_->Submit([task]() { task->Run(); }),
                 "RowProjector compilation request failed");
     return false;
   }
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 65fd16e..b749611 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -168,7 +168,7 @@ class TestPeerProxy : public PeerProxy {
     }
     // If the peer has been closed while a response was in-flight, this can
     // return a bad Status, but that's fine.
-    ignore_result(pool_->SubmitFunc(callback));
+    ignore_result(pool_->Submit(std::move(callback)));
   }
 
   void RegisterCallbackAndRespond(Method method, const rpc::ResponseCallback& callback) {
@@ -478,8 +478,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
                    rpc::RpcController* /*controller*/,
                    const rpc::ResponseCallback& callback) override {
     RegisterCallback(kUpdate, callback);
-    CHECK_OK(pool_->SubmitFunc(boost::bind(&LocalTestPeerProxy::SendUpdateRequest,
-                                           this, request, response)));
+    CHECK_OK(pool_->Submit([=]() { this->SendUpdateRequest(request, response); }));
   }
 
   void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
@@ -494,8 +493,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
                                  rpc::RpcController* /*controller*/,
                                  const rpc::ResponseCallback& callback) override {
     RegisterCallback(kRequestVote, callback);
-    CHECK_OK(pool_->SubmitFunc(boost::bind(&LocalTestPeerProxy::SendVoteRequest,
-                                           this, request, response)));
+    CHECK_OK(pool_->Submit([=]() { this->SendVoteRequest(request, response); }));
   }
 
   template<class Response>
@@ -661,7 +659,7 @@ class TestDriver {
       return;
     }
     CHECK_OK(status);
-    CHECK_OK(pool_->SubmitFunc(boost::bind(&TestDriver::Apply, this)));
+    CHECK_OK(pool_->Submit([this]() { this->Apply(); }));
   }
 
   // Called in all modes to delete the transaction and, transitively, the consensus
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index d759ca1..8e34bec 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -177,7 +177,7 @@ Status Peer::SignalRequest(bool even_if_queue_empty) {
   // Capture a weak_ptr reference into the submitted functor so that we can
   // safely handle the functor outliving its peer.
   weak_ptr<Peer> w_this = shared_from_this();
-  RETURN_NOT_OK(raft_pool_token_->SubmitFunc([even_if_queue_empty, w_this]() {
+  RETURN_NOT_OK(raft_pool_token_->Submit([even_if_queue_empty, w_this]() {
     if (auto p = w_this.lock()) {
       p->SendNextRequest(even_if_queue_empty);
     }
@@ -377,7 +377,7 @@ void Peer::ProcessResponse() {
   // Capture a weak_ptr reference into the submitted functor so that we can
   // safely handle the functor outliving its peer.
   weak_ptr<Peer> w_this = shared_from_this();
-  Status s = raft_pool_token_->SubmitFunc([w_this]() {
+  Status s = raft_pool_token_->Submit([w_this]() {
     if (auto p = w_this.lock()) {
       p->DoProcessResponse();
 
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 619bc18..c79d74a 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -1455,59 +1455,65 @@ bool PeerMessageQueue::IsOpInLog(const OpId& desired_op) const {
 }
 
 void PeerMessageQueue::NotifyObserversOfCommitIndexChange(int64_t new_commit_index) {
-  WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
-           [=](PeerMessageQueueObserver* observer) {
-             observer->NotifyCommitIndex(new_commit_index);
-           })),
+  WARN_NOT_OK(raft_pool_observers_token_->Submit(
+      [=](){ this->NotifyObserversTask(
+          [=](PeerMessageQueueObserver* observer) {
+            observer->NotifyCommitIndex(new_commit_index);
+          });
+      }),
       LogPrefixUnlocked() + "Unable to notify RaftConsensus of commit index change.");
 }
 
 void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) {
-  WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
-           [=](PeerMessageQueueObserver* observer) {
-             observer->NotifyTermChange(term);
-           })),
+  WARN_NOT_OK(raft_pool_observers_token_->Submit(
+      [=](){ this->NotifyObserversTask(
+          [=](PeerMessageQueueObserver* observer) {
+            observer->NotifyTermChange(term);
+          });
+      }),
       LogPrefixUnlocked() + "Unable to notify RaftConsensus of term change.");
 }
 
 void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid,
                                                        int64_t term,
                                                        const string& reason) {
-  WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
-           [=](PeerMessageQueueObserver* observer) {
-             observer->NotifyFailedFollower(uuid, term, reason);
-           })),
+  WARN_NOT_OK(raft_pool_observers_token_->Submit(
+      [=](){ this->NotifyObserversTask(
+          [=](PeerMessageQueueObserver* observer) {
+            observer->NotifyFailedFollower(uuid, term, reason);
+          });
+      }),
       LogPrefixUnlocked() + "Unable to notify RaftConsensus of abandoned follower.");
 }
 
 void PeerMessageQueue::NotifyObserversOfPeerToPromote(const string& peer_uuid) {
-  WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
-           [=](PeerMessageQueueObserver* observer) {
-             observer->NotifyPeerToPromote(peer_uuid);
-           })),
+  WARN_NOT_OK(raft_pool_observers_token_->Submit(
+      [=](){ this->NotifyObserversTask(
+          [=](PeerMessageQueueObserver* observer) {
+            observer->NotifyPeerToPromote(peer_uuid);
+          });
+      }),
       LogPrefixUnlocked() + "Unable to notify RaftConsensus of peer to promote.");
 }
 
 void PeerMessageQueue::NotifyObserversOfSuccessor(const string& peer_uuid) {
   DCHECK(queue_lock_.is_locked());
-  WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
-           [=](PeerMessageQueueObserver* observer) {
-             observer->NotifyPeerToStartElection(peer_uuid);
-           })),
+  WARN_NOT_OK(raft_pool_observers_token_->Submit(
+      [=](){ this->NotifyObserversTask(
+          [=](PeerMessageQueueObserver* observer) {
+            observer->NotifyPeerToStartElection(peer_uuid);
+          });
+      }),
       LogPrefixUnlocked() + "Unable to notify RaftConsensus of available successor.");
 }
 
 void PeerMessageQueue::NotifyObserversOfPeerHealthChange() {
-  WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
-           [](PeerMessageQueueObserver* observer) {
-             observer->NotifyPeerHealthChange();
-           })),
+  WARN_NOT_OK(raft_pool_observers_token_->Submit(
+      [=](){ this->NotifyObserversTask(
+          [](PeerMessageQueueObserver* observer) {
+            observer->NotifyPeerHealthChange();
+          });
+      }),
       LogPrefixUnlocked() + "Unable to notify RaftConsensus peer health change.");
 }
 
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 48e4be6..7079003 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -34,8 +34,6 @@
 #include "kudu/consensus/log_util.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/atomicops.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
 #include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
@@ -296,7 +294,7 @@ void Log::AppendThread::Wake() {
   auto old_status = base::subtle::NoBarrier_CompareAndSwap(
       &thread_state_, IDLE, ACTIVE);
   if (old_status == IDLE) {
-    CHECK_OK(append_pool_->SubmitClosure(Bind(&Log::AppendThread::ProcessQueue, Unretained(this))));
+    CHECK_OK(append_pool_->Submit([this]() { this->ProcessQueue(); }));
   }
 }
 
@@ -623,8 +621,7 @@ Status SegmentAllocator::AsyncAllocateSegmentUnlocked() {
   DCHECK_EQ(kAllocationNotStarted, allocation_state_);
   allocation_status_.Reset();
   allocation_state_ = kAllocationInProgress;
-  return allocation_pool_->SubmitClosure(
-      Bind(&SegmentAllocator::AllocationTask, Unretained(this)));
+  return allocation_pool_->Submit([this]() { this->AllocationTask(); });
 }
 
 void SegmentAllocator::AllocationTask() {
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 2aeffb6..2e9eb97 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -667,8 +667,8 @@ void RaftConsensus::ReportFailureDetected() {
   //
   // There's no need to reenable the failure detector; if this fails, it's a
   // sign that RaftConsensus has stopped and we no longer need failure detection.
-  Status s = raft_pool_token_->SubmitFunc(std::bind(
-      &RaftConsensus::ReportFailureDetectedTask, shared_from_this()));
+  auto self = shared_from_this();
+  Status s = raft_pool_token_->Submit([self]() { self->ReportFailureDetectedTask(); });
   if (PREDICT_FALSE(!s.ok())) {
     static const char* msg = "failed to submit failure detected task";
     CHECK(s.IsServiceUnavailable()) << LogPrefixThreadSafe() << msg;
@@ -907,29 +907,29 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
   }
 
   // Run config change on thread pool after dropping lock.
-  WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryRemoveFollowerTask,
-                                                     shared_from_this(),
-                                                     uuid,
-                                                     committed_config,
-                                                     reason)),
-              LogPrefixThreadSafe() + "Unable to start TryRemoveFollowerTask");
+  auto self = shared_from_this();
+  WARN_NOT_OK(raft_pool_token_->Submit(
+      [self, uuid, committed_config, reason]() {
+        self->TryRemoveFollowerTask(uuid, committed_config, reason);
+      }),
+    LogPrefixThreadSafe() + "Unable to start TryRemoveFollowerTask");
 }
 
 void RaftConsensus::NotifyPeerToPromote(const string& peer_uuid) {
   // Run the config change on the raft thread pool.
-  WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryPromoteNonVoterTask,
-                                                     shared_from_this(),
-                                                     peer_uuid)),
-              LogPrefixThreadSafe() + "Unable to start TryPromoteNonVoterTask");
+  auto self = shared_from_this();
+  WARN_NOT_OK(raft_pool_token_->Submit(
+      [self, peer_uuid]() { self->TryPromoteNonVoterTask(peer_uuid); }),
+    LogPrefixThreadSafe() + "Unable to start TryPromoteNonVoterTask");
 }
 
 void RaftConsensus::NotifyPeerToStartElection(const string& peer_uuid) {
   const auto& log_prefix = LogPrefixThreadSafe();
   LOG(INFO) << log_prefix << ": Instructing follower " << peer_uuid << " to start an election";
-  WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryStartElectionOnPeerTask,
-                                                     shared_from_this(),
-                                                     peer_uuid)),
-              log_prefix + "Unable to start TryStartElectionOnPeerTask");
+  auto self = shared_from_this();
+  WARN_NOT_OK(raft_pool_token_->Submit(
+      [self, peer_uuid]() { self->TryStartElectionOnPeerTask(peer_uuid); }),
+    log_prefix + "Unable to start TryStartElectionOnPeerTask");
 }
 
 void RaftConsensus::NotifyPeerHealthChange() {
@@ -2610,8 +2610,8 @@ void RaftConsensus::ElectionCallback(ElectionReason reason, const ElectionResult
   //
   // There's no need to reenable the failure detector; if this fails, it's a
   // sign that RaftConsensus has stopped and we no longer need failure detection.
-  Status s = raft_pool_token_->SubmitFunc(std::bind(
-      &RaftConsensus::DoElectionCallback, shared_from_this(), reason, result));
+  auto self = shared_from_this();
+  Status s = raft_pool_token_->Submit([=]() { self->DoElectionCallback(reason, result); });
   if (!s.ok()) {
     static const char* msg = "unable to run election callback";
     CHECK(s.IsServiceUnavailable()) << LogPrefixThreadSafe() << msg;
@@ -2777,7 +2777,7 @@ log::RetentionIndexes RaftConsensus::GetRetentionIndexes() {
 }
 
 void RaftConsensus::MarkDirty(const string& reason) {
-  WARN_NOT_OK(raft_pool_token_->SubmitClosure(Bind(mark_dirty_clbk_, reason)),
+  WARN_NOT_OK(raft_pool_token_->Submit([=]() { this->mark_dirty_clbk_.Run(reason); }),
               LogPrefixThreadSafe() + "Unable to run MarkDirty callback");
 }
 
diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
index ea420c2..8f7de67 100644
--- a/src/kudu/fs/dir_manager.cc
+++ b/src/kudu/fs/dir_manager.cc
@@ -103,7 +103,7 @@ void Dir::Shutdown() {
 }
 
 void Dir::ExecClosure(const Closure& task) {
-  Status s = pool_->SubmitClosure(task);
+  Status s = pool_->Submit([task]() { task.Run(); });
   if (!s.ok()) {
     WARN_NOT_OK(
         s, "Could not submit task to thread pool, running it synchronously");
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 562516e..122d0b8 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -844,8 +844,7 @@ Status CatalogManager::Init(bool is_first_run) {
 }
 
 Status CatalogManager::ElectedAsLeaderCb() {
-  return leader_election_pool_->SubmitClosure(
-      Bind(&CatalogManager::PrepareForLeadershipTask, Unretained(this)));
+  return leader_election_pool_->Submit([this]() { this->PrepareForLeadershipTask(); });
 }
 
 Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 23e2104..e612f84 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -35,7 +35,6 @@
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/catalog_manager.h"
@@ -190,8 +189,7 @@ Status Master::StartAsync() {
   RETURN_NOT_OK(InitMasterRegistration());
 
   // Start initializing the catalog manager.
-  RETURN_NOT_OK(init_pool_->SubmitClosure(Bind(&Master::InitCatalogManagerTask,
-                                               Unretained(this))));
+  RETURN_NOT_OK(init_pool_->Submit([this]() { this->InitCatalogManagerTask(); }));
   state_ = kRunning;
 
   return Status::OK();
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 4ed2f5b..e9f807d 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -605,8 +605,9 @@ Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>
   auto encryption = reactor()->messenger()->encryption();
   ThreadPool* negotiation_pool =
       reactor()->messenger()->negotiation_pool(conn->direction());
-  RETURN_NOT_OK(negotiation_pool->SubmitClosure(
-        Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
+  RETURN_NOT_OK(negotiation_pool->Submit([conn, authentication, encryption, deadline]() {
+        Negotiation::RunNegotiation(conn, authentication, encryption, deadline);
+      }));
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index b02b137..a6ed025 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -693,7 +693,7 @@ void TabletReplica::FinishConsensusOnlyRound(ConsensusRound* round) {
     // TabletReplica::Stop() stops RaftConsensus before it stops the prepare
     // pool token and this callback is invoked while the RaftConsensus lock is
     // held.
-    CHECK_OK(prepare_pool_token_->SubmitFunc([this, ts] {
+    CHECK_OK(prepare_pool_token_->Submit([this, ts] {
       std::lock_guard<simple_spinlock> l(lock_);
       if (state_ == RUNNING || state_ == BOOTSTRAPPING) {
         tablet_->mvcc_manager()->AdjustNewTransactionLowerBound(Timestamp(ts));
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index c9d26e5..2212356 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -34,7 +34,6 @@
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -226,8 +225,7 @@ Status TransactionDriver::ExecuteAsync() {
   }
 
   if (s.ok()) {
-    s = prepare_pool_token_->SubmitClosure(
-      Bind(&TransactionDriver::PrepareTask, Unretained(this)));
+    s = prepare_pool_token_->Submit([this]() { this->PrepareTask(); });
   }
 
   if (!s.ok()) {
@@ -493,7 +491,7 @@ Status TransactionDriver::ApplyAsync() {
   }
 
   TRACE_EVENT_FLOW_BEGIN0("txn", "ApplyTask", this);
-  return apply_pool_->SubmitClosure(Bind(&TransactionDriver::ApplyTask, Unretained(this)));
+  return apply_pool_->Submit([this]() { this->ApplyTask(); });
 }
 
 void TransactionDriver::ApplyTask() {
diff --git a/src/kudu/thrift/client.h b/src/kudu/thrift/client.h
index 867cf2b..ed5f2ee 100644
--- a/src/kudu/thrift/client.h
+++ b/src/kudu/thrift/client.h
@@ -205,7 +205,7 @@ Status HaClient<Service>::Execute(std::function<Status(Service*)> task) {
   // object. Note that the Thrift client classes already have LOG_IF_SLOW calls
   // internally.
 
-  RETURN_NOT_OK(threadpool_->SubmitFunc([=] {
+  RETURN_NOT_OK(threadpool_->Submit([=] {
     // The main run routine of the threadpool thread. Runs the task with
     // exclusive access to the Thrift service client. If the task fails, it will
     // be retried, unless the failure type is non-retriable or the maximum
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 31643ad..6025473 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -278,7 +278,7 @@ Status Ksck::CheckMasterHealth() {
   RETURN_NOT_OK(StringToFlagsCategories(FLAGS_flags_categories_to_check,
                                         &flags_categories_to_fetch));
   for (const auto& master : cluster_->masters()) {
-    RETURN_NOT_OK(pool_->SubmitFunc([&]() {
+    RETURN_NOT_OK(pool_->Submit([&]() {
       ServerHealthSummary sh;
       Status s = master->FetchInfo().AndThen([&]() {
         return master->FetchConsensusState();
@@ -480,7 +480,7 @@ Status Ksck::FetchInfoFromTabletServers() {
                                         &flags_categories_to_fetch));
   for (const auto& entry : cluster_->tablet_servers()) {
     const auto& ts = entry.second;
-    RETURN_NOT_OK(pool_->SubmitFunc([&]() {
+    RETURN_NOT_OK(pool_->Submit([&]() {
       VLOG(1) << "Going to connect to tablet server: " << ts->uuid();
       ServerHealth health;
       Status s = ts->FetchInfo(&health).AndThen([&ts, &health]() {
diff --git a/src/kudu/tools/ksck_checksum.cc b/src/kudu/tools/ksck_checksum.cc
index 55c6b59..cf9802a 100644
--- a/src/kudu/tools/ksck_checksum.cc
+++ b/src/kudu/tools/ksck_checksum.cc
@@ -19,7 +19,6 @@
 
 #include <algorithm>
 #include <cstdint>
-#include <functional>
 #include <iostream>
 #include <map>
 #include <set>
@@ -280,8 +279,8 @@ void KsckChecksumManager::ReportResult(const string& tablet_id,
   }
 
   responses_.CountDown();
-  WARN_NOT_OK(find_tablets_to_checksum_pool_->SubmitFunc(
-      std::bind(&KsckChecksumManager::StartTabletChecksums, this)),
+  WARN_NOT_OK(find_tablets_to_checksum_pool_->Submit(
+      [this]() { this->StartTabletChecksums(); }),
       "failed to submit task to start additional tablet checksums");
 }
 
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index 2804eca..29083a8 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -19,7 +19,6 @@
 
 #include <atomic>
 #include <cstdint>
-#include <functional>
 #include <map>
 #include <mutex>
 #include <ostream>
@@ -603,7 +602,7 @@ Status RemoteKsckCluster::RetrieveTablesList() {
       continue;
     }
     tables_count++;
-    RETURN_NOT_OK(pool_->SubmitFunc([&]() {
+    RETURN_NOT_OK(pool_->Submit([&]() {
       client::sp::shared_ptr<KuduTable> t;
       Status s = client_->OpenTable(table_name, &t);
       if (!s.ok()) {
@@ -640,8 +639,8 @@ Status RemoteKsckCluster::RetrieveAllTablets() {
   }
 
   for (const auto& table : tables_) {
-    RETURN_NOT_OK(pool_->SubmitFunc(
-        std::bind(&KsckCluster::RetrieveTabletsList, this, table)));
+    RETURN_NOT_OK(pool_->Submit(
+        [this, table]() { this->RetrieveTabletsList(table); }));
   }
   pool_->Wait();
 
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index af34e70..4242415 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -28,7 +28,6 @@
 #include <set>
 
 #include <boost/algorithm/string/predicate.hpp>
-#include <boost/bind.hpp>
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
@@ -580,7 +579,7 @@ Status TableScanner::StartWork(WorkType type) {
   const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace());
   map<int, vector<KuduScanToken*>> thread_tokens;
   int i = 0;
-  for (auto token : tokens) {
+  for (auto* token : tokens) {
     if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id())) {
       thread_tokens[i++ % FLAGS_num_threads].emplace_back(token);
     }
@@ -598,13 +597,15 @@ Status TableScanner::StartWork(WorkType type) {
   Stopwatch sw(Stopwatch::THIS_THREAD);
   sw.start();
   for (i = 0; i < FLAGS_num_threads; ++i) {
+    auto* t_tokens = &thread_tokens[i];
+    auto* t_status = &thread_statuses[i];
     if (type == WorkType::kScan) {
-      RETURN_NOT_OK(thread_pool_->SubmitFunc(
-        boost::bind(&TableScanner::ScanTask, this, thread_tokens[i], &thread_statuses[i])));
+      RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
+                                         { this->ScanTask(*t_tokens, t_status); }));
     } else {
       CHECK(type == WorkType::kCopy);
-      RETURN_NOT_OK(thread_pool_->SubmitFunc(
-        boost::bind(&TableScanner::CopyTask, this, thread_tokens[i], &thread_statuses[i])));
+      RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
+                                         { this->CopyTask(*t_tokens, t_status); }));
     }
   }
   while (!thread_pool_->WaitFor(MonoDelta::FromSeconds(5))) {
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 9497e36..09c33f0 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -197,6 +197,34 @@ METRIC_DEFINE_gauge_int32(server, tablets_num_shutdown,
 
 DECLARE_int32(heartbeat_interval_ms);
 
+using kudu::consensus::ConsensusMetadata;
+using kudu::consensus::ConsensusMetadataCreateMode;
+using kudu::consensus::ConsensusMetadataManager;
+using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::EXCLUDE_HEALTH_REPORT;
+using kudu::consensus::INCLUDE_HEALTH_REPORT;
+using kudu::consensus::OpId;
+using kudu::consensus::OpIdToString;
+using kudu::consensus::RECEIVED_OPID;
+using kudu::consensus::RaftConfigPB;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::StartTabletCopyRequestPB;
+using kudu::consensus::kMinimumTerm;
+using kudu::fs::DataDirManager;
+using kudu::log::Log;
+using kudu::master::ReportedTabletPB;
+using kudu::master::TabletReportPB;
+using kudu::tablet::ReportedTabletStatsPB;
+using kudu::tablet::Tablet;
+using kudu::tablet::TABLET_DATA_COPYING;
+using kudu::tablet::TABLET_DATA_DELETED;
+using kudu::tablet::TABLET_DATA_READY;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using kudu::tablet::TabletDataState;
+using kudu::tablet::TabletMetadata;
+using kudu::tablet::TabletReplica;
+using kudu::tserver::TabletCopyClient;
+using std::make_shared;
 using std::set;
 using std::shared_ptr;
 using std::string;
@@ -205,34 +233,6 @@ using strings::Substitute;
 
 namespace kudu {
 
-using consensus::ConsensusMetadata;
-using consensus::ConsensusMetadataCreateMode;
-using consensus::ConsensusMetadataManager;
-using consensus::ConsensusStatePB;
-using consensus::EXCLUDE_HEALTH_REPORT;
-using consensus::INCLUDE_HEALTH_REPORT;
-using consensus::OpId;
-using consensus::OpIdToString;
-using consensus::RECEIVED_OPID;
-using consensus::RaftConfigPB;
-using consensus::RaftConsensus;
-using consensus::StartTabletCopyRequestPB;
-using consensus::kMinimumTerm;
-using fs::DataDirManager;
-using log::Log;
-using master::ReportedTabletPB;
-using master::TabletReportPB;
-using tablet::ReportedTabletStatsPB;
-using tablet::Tablet;
-using tablet::TABLET_DATA_COPYING;
-using tablet::TABLET_DATA_DELETED;
-using tablet::TABLET_DATA_READY;
-using tablet::TABLET_DATA_TOMBSTONED;
-using tablet::TabletDataState;
-using tablet::TabletMetadata;
-using tablet::TabletReplica;
-using tserver::TabletCopyClient;
-
 namespace tserver {
 
 namespace {
@@ -302,9 +302,9 @@ TSTabletManager::TSTabletManager(TabletServer* server)
       ->AutoDetach(&metric_detacher_);
 }
 
-// Base class for Runnables submitted against TSTabletManager threadpools whose
+// Base class for tasks submitted against TSTabletManager threadpools whose
 // whose callback must fire, for example if the callback responds to an RPC.
-class TabletManagerRunnable : public Runnable {
+class TabletManagerRunnable {
 public:
   TabletManagerRunnable(TSTabletManager* ts_tablet_manager,
                         std::function<void(const Status&, TabletServerErrorPB::Code)> cb)
@@ -313,16 +313,19 @@ public:
   }
 
   virtual ~TabletManagerRunnable() {
-    // If the Runnable is destroyed without the Run() method being invoked, we
+    // If the task is destroyed without the Run() method being invoked, we
     // must invoke the user callback ourselves in order to free request
     // resources. This may happen when the ThreadPool is shut down while the
-    // Runnable is enqueued.
+    // task is enqueued.
     if (!cb_invoked_) {
       cb_(Status::ServiceUnavailable("Tablet server shutting down"),
           TabletServerErrorPB::THROTTLED);
     }
   }
 
+  // Runs the task itself.
+  virtual void Run() = 0;
+
   // Disable automatic invocation of the callback by the destructor.
   // Does not disable invocation of the callback by Run().
   void DisableCallback() {
@@ -412,8 +415,9 @@ Status TSTabletManager::Init() {
 
     scoped_refptr<TabletReplica> replica;
     RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &replica));
-    RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
-                                                            this, replica, deleter)));
+    RETURN_NOT_OK(open_tablet_pool_->Submit([this, replica, deleter]() {
+          this->OpenTablet(replica, deleter);
+        }));
     registered_count++;
   }
   LOG(INFO) << Substitute("Registered $0 tablets", registered_count);
@@ -501,8 +505,9 @@ Status TSTabletManager::CreateNewTablet(const string& table_id,
   RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &new_replica));
 
   // We can run this synchronously since there is nothing to bootstrap.
-  RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
-                                                          this, new_replica, deleter)));
+  RETURN_NOT_OK(open_tablet_pool_->Submit([this, new_replica, deleter]() {
+        this->OpenTablet(new_replica, deleter);
+      }));
 
   if (replica) {
     *replica = new_replica;
@@ -555,8 +560,8 @@ void TSTabletManager::StartTabletCopy(
   // immediately check whether the tablet is already being copied, and if so,
   // return ALREADY_INPROGRESS.
   string tablet_id = req->tablet_id();
-  shared_ptr<TabletCopyRunnable> runnable(new TabletCopyRunnable(this, req, cb));
-  Status s = tablet_copy_pool_->Submit(runnable);
+  auto runnable = make_shared<TabletCopyRunnable>(this, req, cb);
+  Status s = tablet_copy_pool_->Submit([runnable]() { runnable->Run(); });
   if (PREDICT_TRUE(s.ok())) {
     return;
   }
@@ -879,7 +884,7 @@ Status TSTabletManager::BeginReplicaStateTransition(
 class DeleteTabletRunnable : public TabletManagerRunnable {
 public:
   DeleteTabletRunnable(TSTabletManager* ts_tablet_manager,
-                       std::string tablet_id,
+                       string tablet_id,
                        tablet::TabletDataState delete_type,
                        const boost::optional<int64_t>& cas_config_index, // NOLINT
                        std::function<void(const Status&, TabletServerErrorPB::Code)> cb)
@@ -905,13 +910,13 @@ private:
 };
 
 void TSTabletManager::DeleteTabletAsync(
-    const std::string& tablet_id,
+    const string& tablet_id,
     tablet::TabletDataState delete_type,
     const boost::optional<int64_t>& cas_config_index,
-    std::function<void(const Status&, TabletServerErrorPB::Code)> cb) {
-  auto runnable = std::make_shared<DeleteTabletRunnable>(this, tablet_id, delete_type,
-                                                         cas_config_index, cb);
-  Status s = delete_tablet_pool_->Submit(runnable);
+    const std::function<void(const Status&, TabletServerErrorPB::Code)>& cb) {
+  auto runnable = make_shared<DeleteTabletRunnable>(this, tablet_id, delete_type,
+                                                    cas_config_index, cb);
+  Status s = delete_tablet_pool_->Submit([runnable]() { runnable->Run(); });
   if (PREDICT_TRUE(s.ok())) {
     return;
   }
@@ -1237,7 +1242,7 @@ void TSTabletManager::Shutdown() {
   }
 }
 
-void TSTabletManager::RegisterTablet(const std::string& tablet_id,
+void TSTabletManager::RegisterTablet(const string& tablet_id,
                                      const scoped_refptr<TabletReplica>& replica,
                                      RegisterTabletReplicaMode mode) {
   std::lock_guard<RWMutex> lock(lock_);
@@ -1555,7 +1560,7 @@ void TSTabletManager::FailTabletAndScheduleShutdown(const string& tablet_id) {
     replica->MakeUnavailable(Status::IOError("failing tablet"));
 
     // Submit a request to actually shut down the tablet asynchronously.
-    CHECK_OK(open_tablet_pool_->SubmitFunc([tablet_id, this]() {
+    CHECK_OK(open_tablet_pool_->Submit([tablet_id, this]() {
       scoped_refptr<TabletReplica> replica;
       scoped_refptr<TransitionInProgressDeleter> deleter;
       TabletServerErrorPB::Code error;
@@ -1575,7 +1580,7 @@ void TSTabletManager::FailTabletAndScheduleShutdown(const string& tablet_id) {
       // Only proceed if there is no Tablet (e.g. a bootstrap terminated early
       // due to error before creating the Tablet) or if the tablet has been
       // stopped (e.g. due to the above call to MakeUnavailable).
-      std::shared_ptr<Tablet> tablet = replica->shared_tablet();
+      auto tablet = replica->shared_tablet();
       if (s.ok() && (!tablet || tablet->HasBeenStopped())) {
         replica->Shutdown();
       }
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 21f94a1..042f418 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -140,7 +140,7 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   void DeleteTabletAsync(const std::string& tablet_id,
                          tablet::TabletDataState delete_type,
                          const boost::optional<int64_t>& cas_config_index,
-                         std::function<void(const Status&, TabletServerErrorPB::Code)> cb);
+                         const std::function<void(const Status&, TabletServerErrorPB::Code)>& cb);
 
   // Delete the specified tablet synchronously.
   // See DeleteTabletAsync() for more information.
diff --git a/src/kudu/util/countdown_latch-test.cc b/src/kudu/util/countdown_latch-test.cc
index 32a673f..420248d 100644
--- a/src/kudu/util/countdown_latch-test.cc
+++ b/src/kudu/util/countdown_latch-test.cc
@@ -48,13 +48,13 @@ TEST(TestCountDownLatch, TestLatch) {
 
   // Decrement the count by 1 in another thread, this should not fire the
   // latch.
-  ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1)));
+  ASSERT_OK(pool->Submit([&latch]() { DecrementLatch(&latch, 1); }));
   ASSERT_FALSE(latch.WaitFor(MonoDelta::FromMilliseconds(200)));
   ASSERT_EQ(999, latch.count());
 
   // Now decrement by 1000 this should decrement to 0 and fire the latch
   // (even though 1000 is one more than the current count).
-  ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1000)));
+  ASSERT_OK(pool->Submit([&latch]() { DecrementLatch(&latch, 1000); }));
   latch.Wait();
   ASSERT_EQ(0, latch.count());
 }
diff --git a/src/kudu/util/curl_util-test.cc b/src/kudu/util/curl_util-test.cc
index 7f750e7..59ed975 100644
--- a/src/kudu/util/curl_util-test.cc
+++ b/src/kudu/util/curl_util-test.cc
@@ -49,7 +49,7 @@ TEST(CurlUtilTest, NonSharedObjectsBetweenThreads) {
       .Build(&pool);
 
   for (int i = 0; i < kThreadCount; i++) {
-    ASSERT_OK(pool->SubmitFunc([&]() {
+    ASSERT_OK(pool->Submit([&]() {
       EasyCurl curl;
     }));
   }
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index 3ace96e..fb7043e 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -326,8 +326,7 @@ bool MaintenanceManager::FindAndLaunchOp(std::unique_lock<Mutex>* guard) {
   LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO)
       << Substitute("Scheduling $0: $1", op->name(), note);
   // Run the maintenance operation.
-  CHECK_OK(thread_pool_->SubmitFunc(boost::bind(&MaintenanceManager::LaunchOp, this, op)));
-
+  CHECK_OK(thread_pool_->Submit([this, op]() { this->LaunchOp(op); }));
   return true;
 }
 
diff --git a/src/kudu/util/net/dns_resolver.cc b/src/kudu/util/net/dns_resolver.cc
index 5038c2f..2e22073 100644
--- a/src/kudu/util/net/dns_resolver.cc
+++ b/src/kudu/util/net/dns_resolver.cc
@@ -17,7 +17,6 @@
 
 #include "kudu/util/net/dns_resolver.h"
 
-#include <functional>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -25,7 +24,6 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
-#include "kudu/gutil/callback.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/malloc.h"
@@ -86,8 +84,9 @@ void DnsResolver::ResolveAddressesAsync(const HostPort& hostport,
   if (GetCachedAddresses(hostport, addresses)) {
     return cb.Run(Status::OK());
   }
-  const auto s = pool_->SubmitFunc(std::bind(&DnsResolver::DoResolutionCb,
-                                             this, hostport, addresses, cb));
+  const auto s = pool_->Submit([=]() {
+    this->DoResolutionCb(hostport, addresses, cb);
+  });
   if (!s.ok()) {
     cb.Run(s);
   }
diff --git a/src/kudu/util/net/dns_resolver.h b/src/kudu/util/net/dns_resolver.h
index b07384a..3b44943 100644
--- a/src/kudu/util/net/dns_resolver.h
+++ b/src/kudu/util/net/dns_resolver.h
@@ -17,10 +17,10 @@
 
 #pragma once
 
-#include <stddef.h>
-
+#include <cstddef>
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include "kudu/gutil/macros.h"
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index bf841d4..6ab39dc 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/util/threadpool.h"
+
 #include <unistd.h>
 
 #include <atomic>
@@ -29,16 +31,12 @@
 #include <utility>
 #include <vector>
 
-#include <boost/bind.hpp> // IWYU pragma: keep
 #include <boost/smart_ptr/shared_ptr.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/atomicops.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/sysinfo.h"
@@ -53,7 +51,6 @@
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
-#include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
 
 using std::atomic;
@@ -99,42 +96,41 @@ TEST_F(ThreadPoolTest, TestNoTaskOpenClose) {
   pool_->Shutdown();
 }
 
-static void SimpleTaskMethod(int n, Atomic32 *counter) {
+static void SimpleTaskMethod(int n, Atomic32* counter) {
   while (n--) {
     base::subtle::NoBarrier_AtomicIncrement(counter, 1);
     boost::detail::yield(n);
   }
 }
 
-class SimpleTask : public Runnable {
+class SimpleTask {
  public:
-  SimpleTask(int n, Atomic32 *counter)
+  SimpleTask(int n, Atomic32* counter)
     : n_(n), counter_(counter) {
   }
 
-  void Run() OVERRIDE {
+  void Run() {
     SimpleTaskMethod(n_, counter_);
   }
 
  private:
   int n_;
-  Atomic32 *counter_;
+  Atomic32* counter_;
 };
 
 TEST_F(ThreadPoolTest, TestSimpleTasks) {
   ASSERT_OK(RebuildPoolWithMinMax(4, 4));
 
   Atomic32 counter(0);
-  std::shared_ptr<Runnable> task(new SimpleTask(15, &counter));
+  SimpleTask task(15, &counter);
 
-  ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 10, &counter)));
-  ASSERT_OK(pool_->Submit(task));
-  ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 20, &counter)));
-  ASSERT_OK(pool_->Submit(task));
-  ASSERT_OK(pool_->SubmitClosure(Bind(&SimpleTaskMethod, 123, &counter)));
+  ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(10, &counter); }));
+  ASSERT_OK(pool_->Submit([&task]() { task.Run(); }));
+  ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(20, &counter); }));
+  ASSERT_OK(pool_->Submit([&task]() { task.Run(); }));
+  ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(123, &counter); }));
   pool_->Wait();
   ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter));
-  pool_->Shutdown();
 }
 
 static void IssueTraceStatement() {
@@ -149,7 +145,7 @@ TEST_F(ThreadPoolTest, TestTracePropagation) {
   scoped_refptr<Trace> t(new Trace);
   {
     ADOPT_TRACE(t.get());
-    ASSERT_OK(pool_->SubmitFunc(&IssueTraceStatement));
+    ASSERT_OK(pool_->Submit(&IssueTraceStatement));
   }
   pool_->Wait();
   ASSERT_STR_CONTAINS(t->DumpToString(), "hello from task");
@@ -158,29 +154,11 @@ TEST_F(ThreadPoolTest, TestTracePropagation) {
 TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
   ASSERT_OK(RebuildPoolWithMinMax(1, 1));
   pool_->Shutdown();
-  Status s = pool_->SubmitFunc(&IssueTraceStatement);
+  Status s = pool_->Submit(&IssueTraceStatement);
   ASSERT_EQ("Service unavailable: The pool has been shut down.",
             s.ToString());
 }
 
-class SlowTask : public Runnable {
- public:
-  explicit SlowTask(CountDownLatch* latch)
-    : latch_(latch) {
-  }
-
-  void Run() OVERRIDE {
-    latch_->Wait();
-  }
-
-  static shared_ptr<Runnable> NewSlowTask(CountDownLatch* latch) {
-    return std::make_shared<SlowTask>(latch);
-  }
-
- private:
-  CountDownLatch* latch_;
-};
-
 TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
   ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
                                    .set_min_threads(0)
@@ -194,13 +172,13 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
   SCOPED_CLEANUP({
     latch.CountDown();
   });
-  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
-  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
+  ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   ASSERT_EQ(2, pool_->num_threads());
-  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   ASSERT_EQ(3, pool_->num_threads());
   // The 4th piece of work gets queued.
-  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   ASSERT_EQ(3, pool_->num_threads());
   // Finish all work
   latch.CountDown();
@@ -226,7 +204,7 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
 
   // Submit tokenless tasks. Each should create a new thread.
   for (int i = 0; i < kNumCPUs * 2; i++) {
-    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+    ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   }
   ASSERT_EQ((kNumCPUs * 2), pool_->num_threads());
 
@@ -235,19 +213,18 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
   unique_ptr<ThreadPoolToken> t2 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
   for (int i = 0; i < kNumCPUs * 2; i++) {
     ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get();
-    ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+    ASSERT_OK(t->Submit([&latch]() { latch.Wait(); }));
   }
   ASSERT_EQ((kNumCPUs * 2) + 2, pool_->num_threads());
 
   // Submit more tokenless tasks. Each should create a new thread.
   for (int i = 0; i < kNumCPUs; i++) {
-    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+    ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   }
   ASSERT_EQ((kNumCPUs * 3) + 2, pool_->num_threads());
 
   latch.CountDown();
   pool_->Wait();
-  pool_->Shutdown();
 }
 
 // Regression test for a bug where a task is submitted exactly
@@ -264,7 +241,7 @@ TEST_F(ThreadPoolTest, TestRace) {
 
   for (int i = 0; i < 500; i++) {
     CountDownLatch l(1);
-    ASSERT_OK(pool_->SubmitFunc(boost::bind(&CountDownLatch::CountDown, &l)));
+    ASSERT_OK(pool_->Submit([&l]() { l.CountDown(); }));
     l.Wait();
     // Sleeping a different amount in each iteration makes it more likely to hit
     // the bug.
@@ -282,16 +259,16 @@ TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
   ASSERT_EQ(1, pool_->num_threads());
   // We get up to 4 threads when submitting work.
   CountDownLatch latch(1);
-  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   ASSERT_EQ(1, pool_->num_threads());
-  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   ASSERT_EQ(2, pool_->num_threads());
-  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   ASSERT_EQ(3, pool_->num_threads());
-  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   ASSERT_EQ(4, pool_->num_threads());
   // The 5th piece of work gets queued.
-  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   ASSERT_EQ(4, pool_->num_threads());
   // Finish all work
   latch.CountDown();
@@ -310,13 +287,12 @@ TEST_F(ThreadPoolTest, TestMaxQueueSize) {
   CountDownLatch latch(1);
   // We will be able to submit two tasks: one for max_threads == 1 and one for
   // max_queue_size == 1.
-  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
-  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
-  Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
+  ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
+  ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
+  Status s = pool_->Submit([&latch]() { latch.Wait(); });
   CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString();
   latch.CountDown();
   pool_->Wait();
-  pool_->Shutdown();
 }
 
 // Test that when we specify a zero-sized queue, the maximum number of threads
@@ -329,14 +305,13 @@ TEST_F(ThreadPoolTest, TestZeroQueueSize) {
 
   CountDownLatch latch(1);
   for (int i = 0; i < kMaxThreads; i++) {
-    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+    ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   }
-  Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
+  Status s = pool_->Submit([&latch]() { latch.Wait(); });
   ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity");
   latch.CountDown();
   pool_->Wait();
-  pool_->Shutdown();
 }
 
 // Regression test for KUDU-2187:
@@ -372,9 +347,9 @@ TEST_F(ThreadPoolTest, TestSlowThreadStart) {
   // thread on 'pool_'.
   std::atomic<int32_t> total_queue_time_ms(0);
   for (int i = 0; i < 10; i++) {
-    ASSERT_OK(submitter_pool->SubmitFunc([&]() {
+    ASSERT_OK(submitter_pool->Submit([&]() {
           auto submit_time = MonoTime::Now();
-          CHECK_OK(pool_->SubmitFunc([&,submit_time]() {
+          CHECK_OK(pool_->Submit([&,submit_time]() {
                 auto queue_time = MonoTime::Now() - submit_time;
                 total_queue_time_ms += queue_time.ToMilliseconds();
                 SleepFor(MonoDelta::FromMilliseconds(10));
@@ -404,10 +379,8 @@ TEST_F(ThreadPoolTest, TestPromises) {
                                    .set_max_queue_size(1)));
 
   Promise<int> my_promise;
-  ASSERT_OK(pool_->SubmitClosure(
-                     Bind(&Promise<int>::Set, Unretained(&my_promise), 5)));
+  ASSERT_OK(pool_->Submit([&my_promise]() { my_promise.Set(5); }));
   ASSERT_EQ(5, my_promise.Get());
-  pool_->Shutdown();
 }
 
 METRIC_DEFINE_entity(test_entity);
@@ -451,12 +424,12 @@ TEST_F(ThreadPoolTest, TestMetrics) {
       ThreadPool::ExecutionMode::SERIAL, all_metrics[2]);
 
   // Submit once to t1, twice to t2, and three times without a token.
-  ASSERT_OK(t1->SubmitFunc([](){}));
-  ASSERT_OK(t2->SubmitFunc([](){}));
-  ASSERT_OK(t2->SubmitFunc([](){}));
-  ASSERT_OK(pool_->SubmitFunc([](){}));
-  ASSERT_OK(pool_->SubmitFunc([](){}));
-  ASSERT_OK(pool_->SubmitFunc([](){}));
+  ASSERT_OK(t1->Submit([](){}));
+  ASSERT_OK(t2->Submit([](){}));
+  ASSERT_OK(t2->Submit([](){}));
+  ASSERT_OK(pool_->Submit([](){}));
+  ASSERT_OK(pool_->Submit([](){}));
+  ASSERT_OK(pool_->Submit([](){}));
   pool_->Wait();
 
   // The total counts should reflect the number of submissions to each token.
@@ -488,25 +461,23 @@ TEST_F(ThreadPoolTest, TestDeadlocks) {
   const char* death_msg = "called pool function that would result in deadlock";
   ASSERT_DEATH({
     ASSERT_OK(RebuildPoolWithMinMax(1, 1));
-    ASSERT_OK(pool_->SubmitClosure(
-        Bind(&ThreadPool::Shutdown, Unretained(pool_.get()))));
+    ASSERT_OK(pool_->Submit([this]() { this->pool_->Shutdown(); } ));
     pool_->Wait();
   }, death_msg);
 
   ASSERT_DEATH({
     ASSERT_OK(RebuildPoolWithMinMax(1, 1));
-    ASSERT_OK(pool_->SubmitClosure(
-        Bind(&ThreadPool::Wait, Unretained(pool_.get()))));
+    ASSERT_OK(pool_->Submit([this]() { this->pool_->Wait(); } ));
     pool_->Wait();
   }, death_msg);
 }
 #endif
 
-class SlowDestructorRunnable : public Runnable {
+class SlowDestructorRunnable {
  public:
-  void Run() override {}
+  void Run() {}
 
-  virtual ~SlowDestructorRunnable() {
+  ~SlowDestructorRunnable() {
     SleepFor(MonoDelta::FromMilliseconds(100));
   }
 };
@@ -517,8 +488,15 @@ TEST_F(ThreadPoolTest, TestSlowDestructor) {
   ASSERT_OK(RebuildPoolWithMinMax(1, 20));
   MonoTime start = MonoTime::Now();
   for (int i = 0; i < 100; i++) {
-    shared_ptr<Runnable> task(new SlowDestructorRunnable());
-    ASSERT_OK(pool_->Submit(std::move(task)));
+    // In this particular test, it's important that the task's destructor (and
+    // thus the last ref of 'task') be dropped by the threadpool worker thread
+    // itself, so that the delay is incurred by that thread and not the task
+    // submission thread. Without C++14 capture-by-move semantics we have to
+    // work a little bit harder to accomplish that.
+    shared_ptr<SlowDestructorRunnable> task(new SlowDestructorRunnable());
+    auto wrapper = [task]() { task->Run(); };
+    task.reset();
+    ASSERT_OK(pool_->Submit(std::move(wrapper)));
   }
   pool_->Wait();
   ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5);
@@ -536,7 +514,7 @@ INSTANTIATE_TEST_CASE_P(Tokens, ThreadPoolTestTokenTypes,
 TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitAndWait) {
   unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam());
   int i = 0;
-  ASSERT_OK(t->SubmitFunc([&]() {
+  ASSERT_OK(t->Submit([&]() {
     SleepFor(MonoDelta::FromMilliseconds(1));
     i++;
   }));
@@ -551,8 +529,8 @@ TEST_F(ThreadPoolTest, TestTokenSubmitsProcessedSerially) {
   for (char c = 'a'; c < 'f'; c++) {
     // Sleep a little first so that there's a higher chance of out-of-order
     // appends if the submissions did execute in parallel.
-    int sleep_ms = r.Next() % 5;
-    ASSERT_OK(t->SubmitFunc([&result, c, sleep_ms]() {
+    int sleep_ms = static_cast<int>(r.Uniform(5));
+    ASSERT_OK(t->Submit([&result, c, sleep_ms]() {
       SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
       result += c;
     }));
@@ -576,7 +554,7 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitsProcessedConcurrently) {
   shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumTokens + 1);
   for (int i = 0; i < kNumTokens; i++) {
     tokens.emplace_back(pool_->NewToken(GetParam()));
-    ASSERT_OK(tokens.back()->SubmitFunc([b]() {
+    ASSERT_OK(tokens.back()->Submit([b]() {
       b->Wait();
     }));
   }
@@ -599,7 +577,7 @@ TEST_F(ThreadPoolTest, TestTokenSubmitsNonSequential) {
   shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumSubmissions + 1);
   unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
   for (int i = 0; i < kNumSubmissions; i++) {
-    ASSERT_OK(t->SubmitFunc([b]() {
+    ASSERT_OK(t->Submit([b]() {
       b->Wait();
     }));
   }
@@ -625,12 +603,12 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) {
   });
 
   for (int i = 0; i < 3; i++) {
-    ASSERT_OK(t1->SubmitFunc([&]() {
+    ASSERT_OK(t1->Submit([&]() {
       l1.Wait();
     }));
   }
   for (int i = 0; i < 3; i++) {
-    ASSERT_OK(t2->SubmitFunc([&]() {
+    ASSERT_OK(t2->Submit([&]() {
       l2.Wait();
     }));
   }
@@ -642,8 +620,8 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) {
   t1->Shutdown();
 
   // We can no longer submit to t1 but we can still submit to t2.
-  ASSERT_TRUE(t1->SubmitFunc([](){}).IsServiceUnavailable());
-  ASSERT_OK(t2->SubmitFunc([](){}));
+  ASSERT_TRUE(t1->Submit([](){}).IsServiceUnavailable());
+  ASSERT_OK(t2->Submit([](){}));
 
   // Unblock t2's tasks.
   l2.CountDown();
@@ -663,7 +641,7 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
   for (int i = 0; i < kNumSubmissions; i++) {
     // Sleep a little first to raise the likelihood of the test thread
     // reaching Wait() before the submissions finish.
-    int sleep_ms = r.Next() % 5;
+    int sleep_ms = static_cast<int>(r.Uniform(5));
 
     auto task = [&v, sleep_ms]() {
       SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
@@ -672,10 +650,10 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
 
     // Half of the submissions will be token-less, and half will use a token.
     if (i % 2 == 0) {
-      ASSERT_OK(pool_->SubmitFunc(task));
+      ASSERT_OK(pool_->Submit(task));
     } else {
-      int token_idx = r.Next() % tokens.size();
-      ASSERT_OK(tokens[token_idx]->SubmitFunc(task));
+      int token_idx = static_cast<int>(r.Uniform(tokens.size()));
+      ASSERT_OK(tokens[token_idx]->Submit(task));
     }
   }
   pool_->Wait();
@@ -697,11 +675,11 @@ TEST_F(ThreadPoolTest, TestFuzz) {
     // - Shutdown a randomly selected token: 4%
     // - Deallocate a randomly selected token: 2%
     // - Wait for all submissions: 2%
-    int op = r.Next() % 100;
+    int op = static_cast<int>(r.Uniform(100));
     if (op < 40) {
       // Submit without a token.
-      int sleep_ms = r.Next() % 5;
-      ASSERT_OK(pool_->SubmitFunc([sleep_ms]() {
+      int sleep_ms = static_cast<int>(r.Uniform(5));
+      ASSERT_OK(pool_->Submit([sleep_ms]() {
         // Sleep a little first to increase task overlap.
         SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
       }));
@@ -710,16 +688,16 @@ TEST_F(ThreadPoolTest, TestFuzz) {
       if (tokens.empty()) {
         continue;
       }
-      int sleep_ms = r.Next() % 5;
-      int token_idx = r.Next() % tokens.size();
-      Status s = tokens[token_idx]->SubmitFunc([sleep_ms]() {
+      int sleep_ms = static_cast<int>(r.Uniform(5));
+      int token_idx = static_cast<int>(r.Uniform(tokens.size()));
+      Status s = tokens[token_idx]->Submit([sleep_ms]() {
         // Sleep a little first to increase task overlap.
         SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
       });
       ASSERT_TRUE(s.ok() || s.IsServiceUnavailable());
     } else if (op < 85) {
       // Allocate a token with a randomly selected policy.
-      ThreadPool::ExecutionMode mode = r.Next() % 2 ?
+      ThreadPool::ExecutionMode mode = r.OneIn(2) ?
           ThreadPool::ExecutionMode::SERIAL :
           ThreadPool::ExecutionMode::CONCURRENT;
       tokens.emplace_back(pool_->NewToken(mode));
@@ -728,14 +706,14 @@ TEST_F(ThreadPoolTest, TestFuzz) {
       if (tokens.empty()) {
         continue;
       }
-      int token_idx = r.Next() % tokens.size();
+      int token_idx = static_cast<int>(r.Uniform(tokens.size()));
       tokens[token_idx]->Wait();
     } else if (op < 96) {
       // Shutdown a randomly selected token.
       if (tokens.empty()) {
         continue;
       }
-      int token_idx = r.Next() % tokens.size();
+      int token_idx = static_cast<int>(r.Uniform(tokens.size()));
       tokens[token_idx]->Shutdown();
     } else if (op < 98) {
       // Deallocate a randomly selected token.
@@ -743,7 +721,7 @@ TEST_F(ThreadPoolTest, TestFuzz) {
         continue;
       }
       auto it = tokens.begin();
-      int token_idx = r.Next() % tokens.size();
+      int token_idx = static_cast<int>(r.Uniform(tokens.size()));
       std::advance(it, token_idx);
       tokens.erase(it);
     } else {
@@ -756,7 +734,7 @@ TEST_F(ThreadPoolTest, TestFuzz) {
 
   // Some test runs will shut down the pool before the tokens, and some won't.
   // Either way should be safe.
-  if (r.Next() % 2 == 0) {
+  if (r.OneIn(2)) {
     pool_->Shutdown();
   }
 }
@@ -774,9 +752,9 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) {
   });
   // We will be able to submit two tasks: one for max_threads == 1 and one for
   // max_queue_size == 1.
-  ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
-  ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
-  Status s = t->Submit(SlowTask::NewSlowTask(&latch));
+  ASSERT_OK(t->Submit([&latch]() { latch.Wait(); }));
+  ASSERT_OK(t->Submit([&latch]() { latch.Wait(); }));
+  Status s = t->Submit([&latch]() { latch.Wait(); });
   ASSERT_TRUE(s.IsServiceUnavailable());
 }
 
@@ -806,7 +784,7 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
     ThreadPool::ExecutionMode mode;
     {
       std::lock_guard<simple_spinlock> l(lock);
-      mode = rng.Next() % 2 ?
+      mode = rng.OneIn(2) ?
           ThreadPool::ExecutionMode::SERIAL :
           ThreadPool::ExecutionMode::CONCURRENT;
     }
@@ -832,7 +810,7 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
         {
           std::lock_guard<simple_spinlock> l(lock);
           int idx = rng.Uniform(kNumTokens);
-          ThreadPool::ExecutionMode mode = rng.Next() % 2 ?
+          ThreadPool::ExecutionMode mode = rng.OneIn(2) ?
               ThreadPool::ExecutionMode::SERIAL :
               ThreadPool::ExecutionMode::CONCURRENT;
           tokens[idx] = shared_ptr<ThreadPoolToken>(pool_->NewToken(mode).release());
@@ -878,8 +856,8 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
       int num_tokens_submitted = 0;
       Random rng(SeedRandom());
       while (latch.count()) {
-        int sleep_ms = rng.Next() % 5;
-        Status s = GetRandomToken()->SubmitFunc([sleep_ms]() {
+        int sleep_ms = static_cast<int>(rng.Uniform(5));
+        Status s = GetRandomToken()->Submit([sleep_ms]() {
           // Sleep a little first so that tasks are running during other events.
           SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
         });
@@ -920,7 +898,7 @@ TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
     latch.CountDown();
   });
   for (int i = 0; i < kNumThreads; i++) {
-    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+    ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
   }
   ASSERT_EQ(kNumThreads, pool_->num_threads());
   latch.CountDown();
@@ -936,7 +914,7 @@ TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
   // If LIFO order is used, the same thread will be reused for each task and
   // the other threads will eventually time out.
   AssertEventually([&]() {
-    ASSERT_OK(pool_->SubmitFunc([](){}));
+    ASSERT_OK(pool_->Submit([](){}));
     SleepFor(MonoDelta::FromMilliseconds(10));
     ASSERT_EQ(1, pool_->num_threads());
   }, MonoDelta::FromSeconds(10), AssertBackoff::NONE);
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index e983c7c..5199086 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -19,16 +19,15 @@
 
 #include <cstdint>
 #include <deque>
+#include <functional>
 #include <limits>
 #include <memory>
 #include <ostream>
 #include <string>
 #include <utility>
 
-#include <boost/function.hpp> // IWYU pragma: keep
 #include <glog/logging.h>
 
-#include "kudu/gutil/callback.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -49,38 +48,6 @@ using std::unique_ptr;
 using strings::Substitute;
 
 ////////////////////////////////////////////////////////
-// FunctionRunnable
-////////////////////////////////////////////////////////
-
-class FunctionRunnable : public Runnable {
- public:
-  explicit FunctionRunnable(boost::function<void()> func) : func_(std::move(func)) {}
-
-  void Run() OVERRIDE {
-    func_();
-  }
-
- private:
-  boost::function<void()> func_;
-};
-
-////////////////////////////////////////////////////////
-// ClosureRunnable
-////////////////////////////////////////////////////////
-
-class ClosureRunnable : public Runnable {
- public:
-  explicit ClosureRunnable(Closure cl) : cl_(std::move(cl)) {}
-
-  void Run() OVERRIDE {
-    cl_.Run();
-  }
-
- private:
-  Closure cl_;
-};
-
-////////////////////////////////////////////////////////
 // ThreadPoolBuilder
 ////////////////////////////////////////////////////////
 
@@ -148,16 +115,8 @@ ThreadPoolToken::~ThreadPoolToken() {
   pool_->ReleaseToken(this);
 }
 
-Status ThreadPoolToken::SubmitClosure(Closure c) {
-  return Submit(std::make_shared<ClosureRunnable>(std::move(c)));
-}
-
-Status ThreadPoolToken::SubmitFunc(boost::function<void()> f) {
-  return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
-}
-
-Status ThreadPoolToken::Submit(shared_ptr<Runnable> r) {
-  return pool_->DoSubmit(std::move(r), this);
+Status ThreadPoolToken::Submit(std::function<void()> f) {
+  return pool_->DoSubmit(std::move(f), this);
 }
 
 void ThreadPoolToken::Shutdown() {
@@ -445,19 +404,11 @@ void ThreadPool::ReleaseToken(ThreadPoolToken* t) {
   CHECK_EQ(1, tokens_.erase(t));
 }
 
-Status ThreadPool::SubmitClosure(Closure c) {
-  return Submit(std::make_shared<ClosureRunnable>(std::move(c)));
-}
-
-Status ThreadPool::SubmitFunc(boost::function<void()> f) {
-  return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
-}
-
-Status ThreadPool::Submit(shared_ptr<Runnable> r) {
-  return DoSubmit(std::move(r), tokenless_.get());
+Status ThreadPool::Submit(std::function<void()> f) {
+  return DoSubmit(std::move(f), tokenless_.get());
 }
 
-Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
+Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
   DCHECK(token);
   MonoTime submit_time = MonoTime::Now();
 
@@ -509,7 +460,7 @@ Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
   }
 
   Task task;
-  task.runnable = std::move(r);
+  task.func = std::move(f);
   task.trace = Trace::CurrentTrace();
   // Need to AddRef, since the thread which submitted the task may go away,
   // and we don't want the trace to be destructed while waiting in the queue.
@@ -682,7 +633,7 @@ void ThreadPool::DispatchThread() {
       MicrosecondsInt64 start_wall_us = GetMonoTimeMicros();
       MicrosecondsInt64 start_cpu_us = GetThreadCpuTimeMicros();
 
-      task.runnable->Run();
+      task.func();
 
       int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
       int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us;
@@ -702,7 +653,7 @@ void ThreadPool::DispatchThread() {
     // objects, and we don't want to block submission of the threadpool.
     // In the worst case, the destructor might even try to do something
     // with this threadpool, and produce a deadlock.
-    task.runnable.reset();
+    task.func = nullptr;
     unique_lock.Lock();
 
     // Possible states:
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 789b132..a1037e8 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <deque>
+#include <functional>
 #include <iosfwd>
 #include <memory>
 #include <string>
@@ -26,7 +27,6 @@
 #include <boost/intrusive/list_hook.hpp>
 #include <gtest/gtest_prod.h>
 
-#include "kudu/gutil/callback.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
@@ -36,11 +36,6 @@
 #include "kudu/util/mutex.h"
 #include "kudu/util/status.h"
 
-namespace boost {
-template <typename Signature>
-class function;
-} // namespace boost
-
 namespace kudu {
 
 class Thread;
@@ -48,12 +43,6 @@ class ThreadPool;
 class ThreadPoolToken;
 class Trace;
 
-class Runnable {
- public:
-  virtual void Run() = 0;
-  virtual ~Runnable() {}
-};
-
 // Interesting thread pool metrics. Can be applied to the entire pool (see
 // ThreadPoolBuilder) or to individual tokens.
 struct ThreadPoolMetrics {
@@ -164,7 +153,6 @@ class ThreadPoolBuilder {
 //
 // Usage Example:
 //    static void Func(int n) { ... }
-//    class Task : public Runnable { ... }
 //
 //    unique_ptr<ThreadPool> thread_pool;
 //    CHECK_OK(
@@ -174,8 +162,7 @@ class ThreadPoolBuilder {
 //            .set_max_queue_size(10)
 //            .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
 //            .Build(&thread_pool));
-//    thread_pool->Submit(shared_ptr<Runnable>(new Task()));
-//    thread_pool->SubmitFunc(boost::bind(&Func, 10));
+//    thread_pool->Submit([](){ Func(10) });
 class ThreadPool {
  public:
   ~ThreadPool();
@@ -188,14 +175,8 @@ class ThreadPool {
   //       require an explicit "abort" notification to exit from the run loop.
   void Shutdown();
 
-  // Submits a function using the kudu Closure system.
-  Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
-
-  // Submits a function bound using boost::bind(&FuncName, args...).
-  Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT;
-
-  // Submits a Runnable class.
-  Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
+  // Submits a new task.
+  Status Submit(std::function<void()> f) WARN_UNUSED_RESULT;
 
   // Waits until all the tasks are completed.
   void Wait();
@@ -243,7 +224,7 @@ class ThreadPool {
 
   // Client-provided task to be executed by this pool.
   struct Task {
-    std::shared_ptr<Runnable> runnable;
+    std::function<void()> func;
     Trace* trace;
 
     // Time at which the entry was submitted to the pool.
@@ -269,7 +250,7 @@ class ThreadPool {
   void CheckNotPoolThreadUnlocked();
 
   // Submits a task to be run via token.
-  Status DoSubmit(std::shared_ptr<Runnable> r, ThreadPoolToken* token);
+  Status DoSubmit(std::function<void()> f, ThreadPoolToken* token);
 
   // Releases token 't' and invalidates it.
   void ReleaseToken(ThreadPoolToken* t);
@@ -383,14 +364,8 @@ class ThreadPoolToken {
   // called first to take care of them.
   ~ThreadPoolToken();
 
-  // Submits a function using the kudu Closure system.
-  Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
-
-  // Submits a function bound using boost::bind(&FuncName, args...).
-  Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT;
-
-  // Submits a Runnable class.
-  Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
+  // Submits a new task.
+  Status Submit(std::function<void()> f) WARN_UNUSED_RESULT;
 
   // Marks the token as unusable for future submissions. Any queued tasks not
   // yet running are destroyed. If tasks are in flight, Shutdown() will wait