You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/12 23:40:47 UTC
[kudu] branch master updated: threadpool: simplify Submit API
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new e133157 threadpool: simplify Submit API
e133157 is described below
commit e13315794e911556fc1702247d169c5aa29786c5
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Tue Mar 10 14:44:59 2020 -0700
threadpool: simplify Submit API
In an effort to modernize our codebase, I'm working to reduce the usage of
boost::bind and std::bind. They can be actively harmful[1], and by
comparison, lambdas offer the compiler more opportunities to inline. Also,
std::bind tends to be aggressive with using allocations for storage, though
boost::bind is better about this.
This patch modifies ThreadPool to expose just a single Submit API which
expects an std::function. Furthermore, all callers have been converted away
from bind-based function creation and towards lambdas, which implicitly cast
to std::function.
There are contortions in a couple places to accommodate the lack of C++14
lambda support (e.g. we need capture-by-move semantics in RaftConsensus),
but these are minimal.
1. https://abseil.io/tips/108
Change-Id: I2bd2d59809225e4cde7e273e95428478a282aa2d
Reviewed-on: http://gerrit.cloudera.org:8080/15401
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/codegen/compilation_manager.cc | 17 +-
src/kudu/consensus/consensus-test-util.h | 10 +-
src/kudu/consensus/consensus_peers.cc | 4 +-
src/kudu/consensus/consensus_queue.cc | 66 +++----
src/kudu/consensus/log.cc | 7 +-
src/kudu/consensus/raft_consensus.cc | 38 ++--
src/kudu/fs/dir_manager.cc | 2 +-
src/kudu/master/catalog_manager.cc | 3 +-
src/kudu/master/master.cc | 4 +-
src/kudu/rpc/reactor.cc | 5 +-
src/kudu/tablet/tablet_replica.cc | 2 +-
src/kudu/tablet/transactions/transaction_driver.cc | 6 +-
src/kudu/thrift/client.h | 2 +-
src/kudu/tools/ksck.cc | 4 +-
src/kudu/tools/ksck_checksum.cc | 5 +-
src/kudu/tools/ksck_remote.cc | 7 +-
src/kudu/tools/table_scanner.cc | 13 +-
src/kudu/tserver/ts_tablet_manager.cc | 99 +++++-----
src/kudu/tserver/ts_tablet_manager.h | 2 +-
src/kudu/util/countdown_latch-test.cc | 4 +-
src/kudu/util/curl_util-test.cc | 2 +-
src/kudu/util/maintenance_manager.cc | 3 +-
src/kudu/util/net/dns_resolver.cc | 7 +-
src/kudu/util/net/dns_resolver.h | 4 +-
src/kudu/util/threadpool-test.cc | 200 +++++++++------------
src/kudu/util/threadpool.cc | 67 +------
src/kudu/util/threadpool.h | 41 +----
27 files changed, 264 insertions(+), 360 deletions(-)
diff --git a/src/kudu/codegen/compilation_manager.cc b/src/kudu/codegen/compilation_manager.cc
index 2e45be4..3a7e5af 100644
--- a/src/kudu/codegen/compilation_manager.cc
+++ b/src/kudu/codegen/compilation_manager.cc
@@ -44,6 +44,7 @@
#include "kudu/util/stopwatch.h"
#include "kudu/util/threadpool.h"
+using std::make_shared;
using std::shared_ptr;
using std::unique_ptr;
@@ -72,10 +73,10 @@ namespace codegen {
namespace {
-// A CompilationTask is a ThreadPool's Runnable which, given a
-// pair of schemas and a cache to refer to, will generate code pertaining
-// to the two schemas and store it in the cache when run.
-class CompilationTask : public Runnable {
+// A CompilationTask is a task which, given a pair of schemas and a cache to
+// refer to, will generate code pertaining to the two schemas and store it in
+// the cache when run.
+class CompilationTask {
public:
// Requires that the cache and generator are valid for the lifetime
// of this object.
@@ -87,7 +88,7 @@ class CompilationTask : public Runnable {
generator_(generator) {}
// Can only be run once.
- void Run() override {
+ void Run() {
// We need to fail softly because the user could have just given
// a malformed projection schema pair, but could be long gone by
// now so there's nowhere to return the status to.
@@ -188,9 +189,9 @@ bool CompilationManager::RequestRowProjector(const Schema* base_schema,
// If not cached, add a request to compilation pool
if (!cached) {
- shared_ptr<Runnable> task(
- new CompilationTask(*base_schema, *projection, &cache_, &generator_));
- WARN_NOT_OK(pool_->Submit(task),
+ shared_ptr<CompilationTask> task(make_shared<CompilationTask>(
+ *base_schema, *projection, &cache_, &generator_));
+ WARN_NOT_OK(pool_->Submit([task]() { task->Run(); }),
"RowProjector compilation request failed");
return false;
}
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 65fd16e..b749611 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -168,7 +168,7 @@ class TestPeerProxy : public PeerProxy {
}
// If the peer has been closed while a response was in-flight, this can
// return a bad Status, but that's fine.
- ignore_result(pool_->SubmitFunc(callback));
+ ignore_result(pool_->Submit(std::move(callback)));
}
void RegisterCallbackAndRespond(Method method, const rpc::ResponseCallback& callback) {
@@ -478,8 +478,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
RegisterCallback(kUpdate, callback);
- CHECK_OK(pool_->SubmitFunc(boost::bind(&LocalTestPeerProxy::SendUpdateRequest,
- this, request, response)));
+ CHECK_OK(pool_->Submit([=]() { this->SendUpdateRequest(request, response); }));
}
void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
@@ -494,8 +493,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
RegisterCallback(kRequestVote, callback);
- CHECK_OK(pool_->SubmitFunc(boost::bind(&LocalTestPeerProxy::SendVoteRequest,
- this, request, response)));
+ CHECK_OK(pool_->Submit([=]() { this->SendVoteRequest(request, response); }));
}
template<class Response>
@@ -661,7 +659,7 @@ class TestDriver {
return;
}
CHECK_OK(status);
- CHECK_OK(pool_->SubmitFunc(boost::bind(&TestDriver::Apply, this)));
+ CHECK_OK(pool_->Submit([this]() { this->Apply(); }));
}
// Called in all modes to delete the transaction and, transitively, the consensus
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index d759ca1..8e34bec 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -177,7 +177,7 @@ Status Peer::SignalRequest(bool even_if_queue_empty) {
// Capture a weak_ptr reference into the submitted functor so that we can
// safely handle the functor outliving its peer.
weak_ptr<Peer> w_this = shared_from_this();
- RETURN_NOT_OK(raft_pool_token_->SubmitFunc([even_if_queue_empty, w_this]() {
+ RETURN_NOT_OK(raft_pool_token_->Submit([even_if_queue_empty, w_this]() {
if (auto p = w_this.lock()) {
p->SendNextRequest(even_if_queue_empty);
}
@@ -377,7 +377,7 @@ void Peer::ProcessResponse() {
// Capture a weak_ptr reference into the submitted functor so that we can
// safely handle the functor outliving its peer.
weak_ptr<Peer> w_this = shared_from_this();
- Status s = raft_pool_token_->SubmitFunc([w_this]() {
+ Status s = raft_pool_token_->Submit([w_this]() {
if (auto p = w_this.lock()) {
p->DoProcessResponse();
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 619bc18..c79d74a 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -1455,59 +1455,65 @@ bool PeerMessageQueue::IsOpInLog(const OpId& desired_op) const {
}
void PeerMessageQueue::NotifyObserversOfCommitIndexChange(int64_t new_commit_index) {
- WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
- Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
- [=](PeerMessageQueueObserver* observer) {
- observer->NotifyCommitIndex(new_commit_index);
- })),
+ WARN_NOT_OK(raft_pool_observers_token_->Submit(
+ [=](){ this->NotifyObserversTask(
+ [=](PeerMessageQueueObserver* observer) {
+ observer->NotifyCommitIndex(new_commit_index);
+ });
+ }),
LogPrefixUnlocked() + "Unable to notify RaftConsensus of commit index change.");
}
void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) {
- WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
- Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
- [=](PeerMessageQueueObserver* observer) {
- observer->NotifyTermChange(term);
- })),
+ WARN_NOT_OK(raft_pool_observers_token_->Submit(
+ [=](){ this->NotifyObserversTask(
+ [=](PeerMessageQueueObserver* observer) {
+ observer->NotifyTermChange(term);
+ });
+ }),
LogPrefixUnlocked() + "Unable to notify RaftConsensus of term change.");
}
void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid,
int64_t term,
const string& reason) {
- WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
- Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
- [=](PeerMessageQueueObserver* observer) {
- observer->NotifyFailedFollower(uuid, term, reason);
- })),
+ WARN_NOT_OK(raft_pool_observers_token_->Submit(
+ [=](){ this->NotifyObserversTask(
+ [=](PeerMessageQueueObserver* observer) {
+ observer->NotifyFailedFollower(uuid, term, reason);
+ });
+ }),
LogPrefixUnlocked() + "Unable to notify RaftConsensus of abandoned follower.");
}
void PeerMessageQueue::NotifyObserversOfPeerToPromote(const string& peer_uuid) {
- WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
- Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
- [=](PeerMessageQueueObserver* observer) {
- observer->NotifyPeerToPromote(peer_uuid);
- })),
+ WARN_NOT_OK(raft_pool_observers_token_->Submit(
+ [=](){ this->NotifyObserversTask(
+ [=](PeerMessageQueueObserver* observer) {
+ observer->NotifyPeerToPromote(peer_uuid);
+ });
+ }),
LogPrefixUnlocked() + "Unable to notify RaftConsensus of peer to promote.");
}
void PeerMessageQueue::NotifyObserversOfSuccessor(const string& peer_uuid) {
DCHECK(queue_lock_.is_locked());
- WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
- Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
- [=](PeerMessageQueueObserver* observer) {
- observer->NotifyPeerToStartElection(peer_uuid);
- })),
+ WARN_NOT_OK(raft_pool_observers_token_->Submit(
+ [=](){ this->NotifyObserversTask(
+ [=](PeerMessageQueueObserver* observer) {
+ observer->NotifyPeerToStartElection(peer_uuid);
+ });
+ }),
LogPrefixUnlocked() + "Unable to notify RaftConsensus of available successor.");
}
void PeerMessageQueue::NotifyObserversOfPeerHealthChange() {
- WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
- Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
- [](PeerMessageQueueObserver* observer) {
- observer->NotifyPeerHealthChange();
- })),
+ WARN_NOT_OK(raft_pool_observers_token_->Submit(
+ [=](){ this->NotifyObserversTask(
+ [](PeerMessageQueueObserver* observer) {
+ observer->NotifyPeerHealthChange();
+ });
+ }),
LogPrefixUnlocked() + "Unable to notify RaftConsensus peer health change.");
}
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 48e4be6..7079003 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -34,8 +34,6 @@
#include "kudu/consensus/log_util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/atomicops.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
@@ -296,7 +294,7 @@ void Log::AppendThread::Wake() {
auto old_status = base::subtle::NoBarrier_CompareAndSwap(
&thread_state_, IDLE, ACTIVE);
if (old_status == IDLE) {
- CHECK_OK(append_pool_->SubmitClosure(Bind(&Log::AppendThread::ProcessQueue, Unretained(this))));
+ CHECK_OK(append_pool_->Submit([this]() { this->ProcessQueue(); }));
}
}
@@ -623,8 +621,7 @@ Status SegmentAllocator::AsyncAllocateSegmentUnlocked() {
DCHECK_EQ(kAllocationNotStarted, allocation_state_);
allocation_status_.Reset();
allocation_state_ = kAllocationInProgress;
- return allocation_pool_->SubmitClosure(
- Bind(&SegmentAllocator::AllocationTask, Unretained(this)));
+ return allocation_pool_->Submit([this]() { this->AllocationTask(); });
}
void SegmentAllocator::AllocationTask() {
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 2aeffb6..2e9eb97 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -667,8 +667,8 @@ void RaftConsensus::ReportFailureDetected() {
//
// There's no need to reenable the failure detector; if this fails, it's a
// sign that RaftConsensus has stopped and we no longer need failure detection.
- Status s = raft_pool_token_->SubmitFunc(std::bind(
- &RaftConsensus::ReportFailureDetectedTask, shared_from_this()));
+ auto self = shared_from_this();
+ Status s = raft_pool_token_->Submit([self]() { self->ReportFailureDetectedTask(); });
if (PREDICT_FALSE(!s.ok())) {
static const char* msg = "failed to submit failure detected task";
CHECK(s.IsServiceUnavailable()) << LogPrefixThreadSafe() << msg;
@@ -907,29 +907,29 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
}
// Run config change on thread pool after dropping lock.
- WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryRemoveFollowerTask,
- shared_from_this(),
- uuid,
- committed_config,
- reason)),
- LogPrefixThreadSafe() + "Unable to start TryRemoveFollowerTask");
+ auto self = shared_from_this();
+ WARN_NOT_OK(raft_pool_token_->Submit(
+ [self, uuid, committed_config, reason]() {
+ self->TryRemoveFollowerTask(uuid, committed_config, reason);
+ }),
+ LogPrefixThreadSafe() + "Unable to start TryRemoveFollowerTask");
}
void RaftConsensus::NotifyPeerToPromote(const string& peer_uuid) {
// Run the config change on the raft thread pool.
- WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryPromoteNonVoterTask,
- shared_from_this(),
- peer_uuid)),
- LogPrefixThreadSafe() + "Unable to start TryPromoteNonVoterTask");
+ auto self = shared_from_this();
+ WARN_NOT_OK(raft_pool_token_->Submit(
+ [self, peer_uuid]() { self->TryPromoteNonVoterTask(peer_uuid); }),
+ LogPrefixThreadSafe() + "Unable to start TryPromoteNonVoterTask");
}
void RaftConsensus::NotifyPeerToStartElection(const string& peer_uuid) {
const auto& log_prefix = LogPrefixThreadSafe();
LOG(INFO) << log_prefix << ": Instructing follower " << peer_uuid << " to start an election";
- WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryStartElectionOnPeerTask,
- shared_from_this(),
- peer_uuid)),
- log_prefix + "Unable to start TryStartElectionOnPeerTask");
+ auto self = shared_from_this();
+ WARN_NOT_OK(raft_pool_token_->Submit(
+ [self, peer_uuid]() { self->TryStartElectionOnPeerTask(peer_uuid); }),
+ log_prefix + "Unable to start TryStartElectionOnPeerTask");
}
void RaftConsensus::NotifyPeerHealthChange() {
@@ -2610,8 +2610,8 @@ void RaftConsensus::ElectionCallback(ElectionReason reason, const ElectionResult
//
// There's no need to reenable the failure detector; if this fails, it's a
// sign that RaftConsensus has stopped and we no longer need failure detection.
- Status s = raft_pool_token_->SubmitFunc(std::bind(
- &RaftConsensus::DoElectionCallback, shared_from_this(), reason, result));
+ auto self = shared_from_this();
+ Status s = raft_pool_token_->Submit([=]() { self->DoElectionCallback(reason, result); });
if (!s.ok()) {
static const char* msg = "unable to run election callback";
CHECK(s.IsServiceUnavailable()) << LogPrefixThreadSafe() << msg;
@@ -2777,7 +2777,7 @@ log::RetentionIndexes RaftConsensus::GetRetentionIndexes() {
}
void RaftConsensus::MarkDirty(const string& reason) {
- WARN_NOT_OK(raft_pool_token_->SubmitClosure(Bind(mark_dirty_clbk_, reason)),
+ WARN_NOT_OK(raft_pool_token_->Submit([=]() { this->mark_dirty_clbk_.Run(reason); }),
LogPrefixThreadSafe() + "Unable to run MarkDirty callback");
}
diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
index ea420c2..8f7de67 100644
--- a/src/kudu/fs/dir_manager.cc
+++ b/src/kudu/fs/dir_manager.cc
@@ -103,7 +103,7 @@ void Dir::Shutdown() {
}
void Dir::ExecClosure(const Closure& task) {
- Status s = pool_->SubmitClosure(task);
+ Status s = pool_->Submit([task]() { task.Run(); });
if (!s.ok()) {
WARN_NOT_OK(
s, "Could not submit task to thread pool, running it synchronously");
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 562516e..122d0b8 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -844,8 +844,7 @@ Status CatalogManager::Init(bool is_first_run) {
}
Status CatalogManager::ElectedAsLeaderCb() {
- return leader_election_pool_->SubmitClosure(
- Bind(&CatalogManager::PrepareForLeadershipTask, Unretained(this)));
+ return leader_election_pool_->Submit([this]() { this->PrepareForLeadershipTask(); });
}
Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 23e2104..e612f84 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -35,7 +35,6 @@
#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/catalog_manager.h"
@@ -190,8 +189,7 @@ Status Master::StartAsync() {
RETURN_NOT_OK(InitMasterRegistration());
// Start initializing the catalog manager.
- RETURN_NOT_OK(init_pool_->SubmitClosure(Bind(&Master::InitCatalogManagerTask,
- Unretained(this))));
+ RETURN_NOT_OK(init_pool_->Submit([this]() { this->InitCatalogManagerTask(); }));
state_ = kRunning;
return Status::OK();
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 4ed2f5b..e9f807d 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -605,8 +605,9 @@ Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>
auto encryption = reactor()->messenger()->encryption();
ThreadPool* negotiation_pool =
reactor()->messenger()->negotiation_pool(conn->direction());
- RETURN_NOT_OK(negotiation_pool->SubmitClosure(
- Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
+ RETURN_NOT_OK(negotiation_pool->Submit([conn, authentication, encryption, deadline]() {
+ Negotiation::RunNegotiation(conn, authentication, encryption, deadline);
+ }));
return Status::OK();
}
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index b02b137..a6ed025 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -693,7 +693,7 @@ void TabletReplica::FinishConsensusOnlyRound(ConsensusRound* round) {
// TabletReplica::Stop() stops RaftConsensus before it stops the prepare
// pool token and this callback is invoked while the RaftConsensus lock is
// held.
- CHECK_OK(prepare_pool_token_->SubmitFunc([this, ts] {
+ CHECK_OK(prepare_pool_token_->Submit([this, ts] {
std::lock_guard<simple_spinlock> l(lock_);
if (state_ == RUNNING || state_ == BOOTSTRAPPING) {
tablet_->mvcc_manager()->AdjustNewTransactionLowerBound(Timestamp(ts));
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index c9d26e5..2212356 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -34,7 +34,6 @@
#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
@@ -226,8 +225,7 @@ Status TransactionDriver::ExecuteAsync() {
}
if (s.ok()) {
- s = prepare_pool_token_->SubmitClosure(
- Bind(&TransactionDriver::PrepareTask, Unretained(this)));
+ s = prepare_pool_token_->Submit([this]() { this->PrepareTask(); });
}
if (!s.ok()) {
@@ -493,7 +491,7 @@ Status TransactionDriver::ApplyAsync() {
}
TRACE_EVENT_FLOW_BEGIN0("txn", "ApplyTask", this);
- return apply_pool_->SubmitClosure(Bind(&TransactionDriver::ApplyTask, Unretained(this)));
+ return apply_pool_->Submit([this]() { this->ApplyTask(); });
}
void TransactionDriver::ApplyTask() {
diff --git a/src/kudu/thrift/client.h b/src/kudu/thrift/client.h
index 867cf2b..ed5f2ee 100644
--- a/src/kudu/thrift/client.h
+++ b/src/kudu/thrift/client.h
@@ -205,7 +205,7 @@ Status HaClient<Service>::Execute(std::function<Status(Service*)> task) {
// object. Note that the Thrift client classes already have LOG_IF_SLOW calls
// internally.
- RETURN_NOT_OK(threadpool_->SubmitFunc([=] {
+ RETURN_NOT_OK(threadpool_->Submit([=] {
// The main run routine of the threadpool thread. Runs the task with
// exclusive access to the Thrift service client. If the task fails, it will
// be retried, unless the failure type is non-retriable or the maximum
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 31643ad..6025473 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -278,7 +278,7 @@ Status Ksck::CheckMasterHealth() {
RETURN_NOT_OK(StringToFlagsCategories(FLAGS_flags_categories_to_check,
&flags_categories_to_fetch));
for (const auto& master : cluster_->masters()) {
- RETURN_NOT_OK(pool_->SubmitFunc([&]() {
+ RETURN_NOT_OK(pool_->Submit([&]() {
ServerHealthSummary sh;
Status s = master->FetchInfo().AndThen([&]() {
return master->FetchConsensusState();
@@ -480,7 +480,7 @@ Status Ksck::FetchInfoFromTabletServers() {
&flags_categories_to_fetch));
for (const auto& entry : cluster_->tablet_servers()) {
const auto& ts = entry.second;
- RETURN_NOT_OK(pool_->SubmitFunc([&]() {
+ RETURN_NOT_OK(pool_->Submit([&]() {
VLOG(1) << "Going to connect to tablet server: " << ts->uuid();
ServerHealth health;
Status s = ts->FetchInfo(&health).AndThen([&ts, &health]() {
diff --git a/src/kudu/tools/ksck_checksum.cc b/src/kudu/tools/ksck_checksum.cc
index 55c6b59..cf9802a 100644
--- a/src/kudu/tools/ksck_checksum.cc
+++ b/src/kudu/tools/ksck_checksum.cc
@@ -19,7 +19,6 @@
#include <algorithm>
#include <cstdint>
-#include <functional>
#include <iostream>
#include <map>
#include <set>
@@ -280,8 +279,8 @@ void KsckChecksumManager::ReportResult(const string& tablet_id,
}
responses_.CountDown();
- WARN_NOT_OK(find_tablets_to_checksum_pool_->SubmitFunc(
- std::bind(&KsckChecksumManager::StartTabletChecksums, this)),
+ WARN_NOT_OK(find_tablets_to_checksum_pool_->Submit(
+ [this]() { this->StartTabletChecksums(); }),
"failed to submit task to start additional tablet checksums");
}
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index 2804eca..29083a8 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -19,7 +19,6 @@
#include <atomic>
#include <cstdint>
-#include <functional>
#include <map>
#include <mutex>
#include <ostream>
@@ -603,7 +602,7 @@ Status RemoteKsckCluster::RetrieveTablesList() {
continue;
}
tables_count++;
- RETURN_NOT_OK(pool_->SubmitFunc([&]() {
+ RETURN_NOT_OK(pool_->Submit([&]() {
client::sp::shared_ptr<KuduTable> t;
Status s = client_->OpenTable(table_name, &t);
if (!s.ok()) {
@@ -640,8 +639,8 @@ Status RemoteKsckCluster::RetrieveAllTablets() {
}
for (const auto& table : tables_) {
- RETURN_NOT_OK(pool_->SubmitFunc(
- std::bind(&KsckCluster::RetrieveTabletsList, this, table)));
+ RETURN_NOT_OK(pool_->Submit(
+ [this, table]() { this->RetrieveTabletsList(table); }));
}
pool_->Wait();
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index af34e70..4242415 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -28,7 +28,6 @@
#include <set>
#include <boost/algorithm/string/predicate.hpp>
-#include <boost/bind.hpp>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
@@ -580,7 +579,7 @@ Status TableScanner::StartWork(WorkType type) {
const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace());
map<int, vector<KuduScanToken*>> thread_tokens;
int i = 0;
- for (auto token : tokens) {
+ for (auto* token : tokens) {
if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id())) {
thread_tokens[i++ % FLAGS_num_threads].emplace_back(token);
}
@@ -598,13 +597,15 @@ Status TableScanner::StartWork(WorkType type) {
Stopwatch sw(Stopwatch::THIS_THREAD);
sw.start();
for (i = 0; i < FLAGS_num_threads; ++i) {
+ auto* t_tokens = &thread_tokens[i];
+ auto* t_status = &thread_statuses[i];
if (type == WorkType::kScan) {
- RETURN_NOT_OK(thread_pool_->SubmitFunc(
- boost::bind(&TableScanner::ScanTask, this, thread_tokens[i], &thread_statuses[i])));
+ RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
+ { this->ScanTask(*t_tokens, t_status); }));
} else {
CHECK(type == WorkType::kCopy);
- RETURN_NOT_OK(thread_pool_->SubmitFunc(
- boost::bind(&TableScanner::CopyTask, this, thread_tokens[i], &thread_statuses[i])));
+ RETURN_NOT_OK(thread_pool_->Submit([this, t_tokens, t_status]()
+ { this->CopyTask(*t_tokens, t_status); }));
}
}
while (!thread_pool_->WaitFor(MonoDelta::FromSeconds(5))) {
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 9497e36..09c33f0 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -197,6 +197,34 @@ METRIC_DEFINE_gauge_int32(server, tablets_num_shutdown,
DECLARE_int32(heartbeat_interval_ms);
+using kudu::consensus::ConsensusMetadata;
+using kudu::consensus::ConsensusMetadataCreateMode;
+using kudu::consensus::ConsensusMetadataManager;
+using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::EXCLUDE_HEALTH_REPORT;
+using kudu::consensus::INCLUDE_HEALTH_REPORT;
+using kudu::consensus::OpId;
+using kudu::consensus::OpIdToString;
+using kudu::consensus::RECEIVED_OPID;
+using kudu::consensus::RaftConfigPB;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::StartTabletCopyRequestPB;
+using kudu::consensus::kMinimumTerm;
+using kudu::fs::DataDirManager;
+using kudu::log::Log;
+using kudu::master::ReportedTabletPB;
+using kudu::master::TabletReportPB;
+using kudu::tablet::ReportedTabletStatsPB;
+using kudu::tablet::Tablet;
+using kudu::tablet::TABLET_DATA_COPYING;
+using kudu::tablet::TABLET_DATA_DELETED;
+using kudu::tablet::TABLET_DATA_READY;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using kudu::tablet::TabletDataState;
+using kudu::tablet::TabletMetadata;
+using kudu::tablet::TabletReplica;
+using kudu::tserver::TabletCopyClient;
+using std::make_shared;
using std::set;
using std::shared_ptr;
using std::string;
@@ -205,34 +233,6 @@ using strings::Substitute;
namespace kudu {
-using consensus::ConsensusMetadata;
-using consensus::ConsensusMetadataCreateMode;
-using consensus::ConsensusMetadataManager;
-using consensus::ConsensusStatePB;
-using consensus::EXCLUDE_HEALTH_REPORT;
-using consensus::INCLUDE_HEALTH_REPORT;
-using consensus::OpId;
-using consensus::OpIdToString;
-using consensus::RECEIVED_OPID;
-using consensus::RaftConfigPB;
-using consensus::RaftConsensus;
-using consensus::StartTabletCopyRequestPB;
-using consensus::kMinimumTerm;
-using fs::DataDirManager;
-using log::Log;
-using master::ReportedTabletPB;
-using master::TabletReportPB;
-using tablet::ReportedTabletStatsPB;
-using tablet::Tablet;
-using tablet::TABLET_DATA_COPYING;
-using tablet::TABLET_DATA_DELETED;
-using tablet::TABLET_DATA_READY;
-using tablet::TABLET_DATA_TOMBSTONED;
-using tablet::TabletDataState;
-using tablet::TabletMetadata;
-using tablet::TabletReplica;
-using tserver::TabletCopyClient;
-
namespace tserver {
namespace {
@@ -302,9 +302,9 @@ TSTabletManager::TSTabletManager(TabletServer* server)
->AutoDetach(&metric_detacher_);
}
-// Base class for Runnables submitted against TSTabletManager threadpools whose
+// Base class for tasks submitted against TSTabletManager threadpools whose
// whose callback must fire, for example if the callback responds to an RPC.
-class TabletManagerRunnable : public Runnable {
+class TabletManagerRunnable {
public:
TabletManagerRunnable(TSTabletManager* ts_tablet_manager,
std::function<void(const Status&, TabletServerErrorPB::Code)> cb)
@@ -313,16 +313,19 @@ public:
}
virtual ~TabletManagerRunnable() {
- // If the Runnable is destroyed without the Run() method being invoked, we
+ // If the task is destroyed without the Run() method being invoked, we
// must invoke the user callback ourselves in order to free request
// resources. This may happen when the ThreadPool is shut down while the
- // Runnable is enqueued.
+ // task is enqueued.
if (!cb_invoked_) {
cb_(Status::ServiceUnavailable("Tablet server shutting down"),
TabletServerErrorPB::THROTTLED);
}
}
+ // Runs the task itself.
+ virtual void Run() = 0;
+
// Disable automatic invocation of the callback by the destructor.
// Does not disable invocation of the callback by Run().
void DisableCallback() {
@@ -412,8 +415,9 @@ Status TSTabletManager::Init() {
scoped_refptr<TabletReplica> replica;
RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &replica));
- RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
- this, replica, deleter)));
+ RETURN_NOT_OK(open_tablet_pool_->Submit([this, replica, deleter]() {
+ this->OpenTablet(replica, deleter);
+ }));
registered_count++;
}
LOG(INFO) << Substitute("Registered $0 tablets", registered_count);
@@ -501,8 +505,9 @@ Status TSTabletManager::CreateNewTablet(const string& table_id,
RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &new_replica));
// We can run this synchronously since there is nothing to bootstrap.
- RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
- this, new_replica, deleter)));
+ RETURN_NOT_OK(open_tablet_pool_->Submit([this, new_replica, deleter]() {
+ this->OpenTablet(new_replica, deleter);
+ }));
if (replica) {
*replica = new_replica;
@@ -555,8 +560,8 @@ void TSTabletManager::StartTabletCopy(
// immediately check whether the tablet is already being copied, and if so,
// return ALREADY_INPROGRESS.
string tablet_id = req->tablet_id();
- shared_ptr<TabletCopyRunnable> runnable(new TabletCopyRunnable(this, req, cb));
- Status s = tablet_copy_pool_->Submit(runnable);
+ auto runnable = make_shared<TabletCopyRunnable>(this, req, cb);
+ Status s = tablet_copy_pool_->Submit([runnable]() { runnable->Run(); });
if (PREDICT_TRUE(s.ok())) {
return;
}
@@ -879,7 +884,7 @@ Status TSTabletManager::BeginReplicaStateTransition(
class DeleteTabletRunnable : public TabletManagerRunnable {
public:
DeleteTabletRunnable(TSTabletManager* ts_tablet_manager,
- std::string tablet_id,
+ string tablet_id,
tablet::TabletDataState delete_type,
const boost::optional<int64_t>& cas_config_index, // NOLINT
std::function<void(const Status&, TabletServerErrorPB::Code)> cb)
@@ -905,13 +910,13 @@ private:
};
void TSTabletManager::DeleteTabletAsync(
- const std::string& tablet_id,
+ const string& tablet_id,
tablet::TabletDataState delete_type,
const boost::optional<int64_t>& cas_config_index,
- std::function<void(const Status&, TabletServerErrorPB::Code)> cb) {
- auto runnable = std::make_shared<DeleteTabletRunnable>(this, tablet_id, delete_type,
- cas_config_index, cb);
- Status s = delete_tablet_pool_->Submit(runnable);
+ const std::function<void(const Status&, TabletServerErrorPB::Code)>& cb) {
+ auto runnable = make_shared<DeleteTabletRunnable>(this, tablet_id, delete_type,
+ cas_config_index, cb);
+ Status s = delete_tablet_pool_->Submit([runnable]() { runnable->Run(); });
if (PREDICT_TRUE(s.ok())) {
return;
}
@@ -1237,7 +1242,7 @@ void TSTabletManager::Shutdown() {
}
}
-void TSTabletManager::RegisterTablet(const std::string& tablet_id,
+void TSTabletManager::RegisterTablet(const string& tablet_id,
const scoped_refptr<TabletReplica>& replica,
RegisterTabletReplicaMode mode) {
std::lock_guard<RWMutex> lock(lock_);
@@ -1555,7 +1560,7 @@ void TSTabletManager::FailTabletAndScheduleShutdown(const string& tablet_id) {
replica->MakeUnavailable(Status::IOError("failing tablet"));
// Submit a request to actually shut down the tablet asynchronously.
- CHECK_OK(open_tablet_pool_->SubmitFunc([tablet_id, this]() {
+ CHECK_OK(open_tablet_pool_->Submit([tablet_id, this]() {
scoped_refptr<TabletReplica> replica;
scoped_refptr<TransitionInProgressDeleter> deleter;
TabletServerErrorPB::Code error;
@@ -1575,7 +1580,7 @@ void TSTabletManager::FailTabletAndScheduleShutdown(const string& tablet_id) {
// Only proceed if there is no Tablet (e.g. a bootstrap terminated early
// due to error before creating the Tablet) or if the tablet has been
// stopped (e.g. due to the above call to MakeUnavailable).
- std::shared_ptr<Tablet> tablet = replica->shared_tablet();
+ auto tablet = replica->shared_tablet();
if (s.ok() && (!tablet || tablet->HasBeenStopped())) {
replica->Shutdown();
}
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 21f94a1..042f418 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -140,7 +140,7 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
void DeleteTabletAsync(const std::string& tablet_id,
tablet::TabletDataState delete_type,
const boost::optional<int64_t>& cas_config_index,
- std::function<void(const Status&, TabletServerErrorPB::Code)> cb);
+ const std::function<void(const Status&, TabletServerErrorPB::Code)>& cb);
// Delete the specified tablet synchronously.
// See DeleteTabletAsync() for more information.
diff --git a/src/kudu/util/countdown_latch-test.cc b/src/kudu/util/countdown_latch-test.cc
index 32a673f..420248d 100644
--- a/src/kudu/util/countdown_latch-test.cc
+++ b/src/kudu/util/countdown_latch-test.cc
@@ -48,13 +48,13 @@ TEST(TestCountDownLatch, TestLatch) {
// Decrement the count by 1 in another thread, this should not fire the
// latch.
- ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1)));
+ ASSERT_OK(pool->Submit([&latch]() { DecrementLatch(&latch, 1); }));
ASSERT_FALSE(latch.WaitFor(MonoDelta::FromMilliseconds(200)));
ASSERT_EQ(999, latch.count());
// Now decrement by 1000 this should decrement to 0 and fire the latch
// (even though 1000 is one more than the current count).
- ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1000)));
+ ASSERT_OK(pool->Submit([&latch]() { DecrementLatch(&latch, 1000); }));
latch.Wait();
ASSERT_EQ(0, latch.count());
}
diff --git a/src/kudu/util/curl_util-test.cc b/src/kudu/util/curl_util-test.cc
index 7f750e7..59ed975 100644
--- a/src/kudu/util/curl_util-test.cc
+++ b/src/kudu/util/curl_util-test.cc
@@ -49,7 +49,7 @@ TEST(CurlUtilTest, NonSharedObjectsBetweenThreads) {
.Build(&pool);
for (int i = 0; i < kThreadCount; i++) {
- ASSERT_OK(pool->SubmitFunc([&]() {
+ ASSERT_OK(pool->Submit([&]() {
EasyCurl curl;
}));
}
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index 3ace96e..fb7043e 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -326,8 +326,7 @@ bool MaintenanceManager::FindAndLaunchOp(std::unique_lock<Mutex>* guard) {
LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO)
<< Substitute("Scheduling $0: $1", op->name(), note);
// Run the maintenance operation.
- CHECK_OK(thread_pool_->SubmitFunc(boost::bind(&MaintenanceManager::LaunchOp, this, op)));
-
+ CHECK_OK(thread_pool_->Submit([this, op]() { this->LaunchOp(op); }));
return true;
}
diff --git a/src/kudu/util/net/dns_resolver.cc b/src/kudu/util/net/dns_resolver.cc
index 5038c2f..2e22073 100644
--- a/src/kudu/util/net/dns_resolver.cc
+++ b/src/kudu/util/net/dns_resolver.cc
@@ -17,7 +17,6 @@
#include "kudu/util/net/dns_resolver.h"
-#include <functional>
#include <memory>
#include <utility>
#include <vector>
@@ -25,7 +24,6 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
-#include "kudu/gutil/callback.h"
#include "kudu/gutil/port.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/malloc.h"
@@ -86,8 +84,9 @@ void DnsResolver::ResolveAddressesAsync(const HostPort& hostport,
if (GetCachedAddresses(hostport, addresses)) {
return cb.Run(Status::OK());
}
- const auto s = pool_->SubmitFunc(std::bind(&DnsResolver::DoResolutionCb,
- this, hostport, addresses, cb));
+ const auto s = pool_->Submit([=]() {
+ this->DoResolutionCb(hostport, addresses, cb);
+ });
if (!s.ok()) {
cb.Run(s);
}
diff --git a/src/kudu/util/net/dns_resolver.h b/src/kudu/util/net/dns_resolver.h
index b07384a..3b44943 100644
--- a/src/kudu/util/net/dns_resolver.h
+++ b/src/kudu/util/net/dns_resolver.h
@@ -17,10 +17,10 @@
#pragma once
-#include <stddef.h>
-
+#include <cstddef>
#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include "kudu/gutil/macros.h"
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index bf841d4..6ab39dc 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include "kudu/util/threadpool.h"
+
#include <unistd.h>
#include <atomic>
@@ -29,16 +31,12 @@
#include <utility>
#include <vector>
-#include <boost/bind.hpp> // IWYU pragma: keep
#include <boost/smart_ptr/shared_ptr.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/gutil/atomicops.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
-#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/sysinfo.h"
@@ -53,7 +51,6 @@
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
-#include "kudu/util/threadpool.h"
#include "kudu/util/trace.h"
using std::atomic;
@@ -99,42 +96,41 @@ TEST_F(ThreadPoolTest, TestNoTaskOpenClose) {
pool_->Shutdown();
}
-static void SimpleTaskMethod(int n, Atomic32 *counter) {
+static void SimpleTaskMethod(int n, Atomic32* counter) {
while (n--) {
base::subtle::NoBarrier_AtomicIncrement(counter, 1);
boost::detail::yield(n);
}
}
-class SimpleTask : public Runnable {
+class SimpleTask {
public:
- SimpleTask(int n, Atomic32 *counter)
+ SimpleTask(int n, Atomic32* counter)
: n_(n), counter_(counter) {
}
- void Run() OVERRIDE {
+ void Run() {
SimpleTaskMethod(n_, counter_);
}
private:
int n_;
- Atomic32 *counter_;
+ Atomic32* counter_;
};
TEST_F(ThreadPoolTest, TestSimpleTasks) {
ASSERT_OK(RebuildPoolWithMinMax(4, 4));
Atomic32 counter(0);
- std::shared_ptr<Runnable> task(new SimpleTask(15, &counter));
+ SimpleTask task(15, &counter);
- ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 10, &counter)));
- ASSERT_OK(pool_->Submit(task));
- ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 20, &counter)));
- ASSERT_OK(pool_->Submit(task));
- ASSERT_OK(pool_->SubmitClosure(Bind(&SimpleTaskMethod, 123, &counter)));
+ ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(10, &counter); }));
+ ASSERT_OK(pool_->Submit([&task]() { task.Run(); }));
+ ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(20, &counter); }));
+ ASSERT_OK(pool_->Submit([&task]() { task.Run(); }));
+ ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(123, &counter); }));
pool_->Wait();
ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter));
- pool_->Shutdown();
}
static void IssueTraceStatement() {
@@ -149,7 +145,7 @@ TEST_F(ThreadPoolTest, TestTracePropagation) {
scoped_refptr<Trace> t(new Trace);
{
ADOPT_TRACE(t.get());
- ASSERT_OK(pool_->SubmitFunc(&IssueTraceStatement));
+ ASSERT_OK(pool_->Submit(&IssueTraceStatement));
}
pool_->Wait();
ASSERT_STR_CONTAINS(t->DumpToString(), "hello from task");
@@ -158,29 +154,11 @@ TEST_F(ThreadPoolTest, TestTracePropagation) {
TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
ASSERT_OK(RebuildPoolWithMinMax(1, 1));
pool_->Shutdown();
- Status s = pool_->SubmitFunc(&IssueTraceStatement);
+ Status s = pool_->Submit(&IssueTraceStatement);
ASSERT_EQ("Service unavailable: The pool has been shut down.",
s.ToString());
}
-class SlowTask : public Runnable {
- public:
- explicit SlowTask(CountDownLatch* latch)
- : latch_(latch) {
- }
-
- void Run() OVERRIDE {
- latch_->Wait();
- }
-
- static shared_ptr<Runnable> NewSlowTask(CountDownLatch* latch) {
- return std::make_shared<SlowTask>(latch);
- }
-
- private:
- CountDownLatch* latch_;
-};
-
TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(0)
@@ -194,13 +172,13 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
SCOPED_CLEANUP({
latch.CountDown();
});
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
ASSERT_EQ(2, pool_->num_threads());
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
ASSERT_EQ(3, pool_->num_threads());
// The 4th piece of work gets queued.
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
ASSERT_EQ(3, pool_->num_threads());
// Finish all work
latch.CountDown();
@@ -226,7 +204,7 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
// Submit tokenless tasks. Each should create a new thread.
for (int i = 0; i < kNumCPUs * 2; i++) {
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
}
ASSERT_EQ((kNumCPUs * 2), pool_->num_threads());
@@ -235,19 +213,18 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
unique_ptr<ThreadPoolToken> t2 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
for (int i = 0; i < kNumCPUs * 2; i++) {
ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get();
- ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(t->Submit([&latch]() { latch.Wait(); }));
}
ASSERT_EQ((kNumCPUs * 2) + 2, pool_->num_threads());
// Submit more tokenless tasks. Each should create a new thread.
for (int i = 0; i < kNumCPUs; i++) {
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
}
ASSERT_EQ((kNumCPUs * 3) + 2, pool_->num_threads());
latch.CountDown();
pool_->Wait();
- pool_->Shutdown();
}
// Regression test for a bug where a task is submitted exactly
@@ -264,7 +241,7 @@ TEST_F(ThreadPoolTest, TestRace) {
for (int i = 0; i < 500; i++) {
CountDownLatch l(1);
- ASSERT_OK(pool_->SubmitFunc(boost::bind(&CountDownLatch::CountDown, &l)));
+ ASSERT_OK(pool_->Submit([&l]() { l.CountDown(); }));
l.Wait();
// Sleeping a different amount in each iteration makes it more likely to hit
// the bug.
@@ -282,16 +259,16 @@ TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
ASSERT_EQ(1, pool_->num_threads());
// We get up to 4 threads when submitting work.
CountDownLatch latch(1);
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
ASSERT_EQ(1, pool_->num_threads());
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
ASSERT_EQ(2, pool_->num_threads());
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
ASSERT_EQ(3, pool_->num_threads());
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
ASSERT_EQ(4, pool_->num_threads());
// The 5th piece of work gets queued.
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
ASSERT_EQ(4, pool_->num_threads());
// Finish all work
latch.CountDown();
@@ -310,13 +287,12 @@ TEST_F(ThreadPoolTest, TestMaxQueueSize) {
CountDownLatch latch(1);
// We will be able to submit two tasks: one for max_threads == 1 and one for
// max_queue_size == 1.
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
- Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
+ Status s = pool_->Submit([&latch]() { latch.Wait(); });
CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString();
latch.CountDown();
pool_->Wait();
- pool_->Shutdown();
}
// Test that when we specify a zero-sized queue, the maximum number of threads
@@ -329,14 +305,13 @@ TEST_F(ThreadPoolTest, TestZeroQueueSize) {
CountDownLatch latch(1);
for (int i = 0; i < kMaxThreads; i++) {
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
}
- Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
+ Status s = pool_->Submit([&latch]() { latch.Wait(); });
ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity");
latch.CountDown();
pool_->Wait();
- pool_->Shutdown();
}
// Regression test for KUDU-2187:
@@ -372,9 +347,9 @@ TEST_F(ThreadPoolTest, TestSlowThreadStart) {
// thread on 'pool_'.
std::atomic<int32_t> total_queue_time_ms(0);
for (int i = 0; i < 10; i++) {
- ASSERT_OK(submitter_pool->SubmitFunc([&]() {
+ ASSERT_OK(submitter_pool->Submit([&]() {
auto submit_time = MonoTime::Now();
- CHECK_OK(pool_->SubmitFunc([&,submit_time]() {
+ CHECK_OK(pool_->Submit([&,submit_time]() {
auto queue_time = MonoTime::Now() - submit_time;
total_queue_time_ms += queue_time.ToMilliseconds();
SleepFor(MonoDelta::FromMilliseconds(10));
@@ -404,10 +379,8 @@ TEST_F(ThreadPoolTest, TestPromises) {
.set_max_queue_size(1)));
Promise<int> my_promise;
- ASSERT_OK(pool_->SubmitClosure(
- Bind(&Promise<int>::Set, Unretained(&my_promise), 5)));
+ ASSERT_OK(pool_->Submit([&my_promise]() { my_promise.Set(5); }));
ASSERT_EQ(5, my_promise.Get());
- pool_->Shutdown();
}
METRIC_DEFINE_entity(test_entity);
@@ -451,12 +424,12 @@ TEST_F(ThreadPoolTest, TestMetrics) {
ThreadPool::ExecutionMode::SERIAL, all_metrics[2]);
// Submit once to t1, twice to t2, and three times without a token.
- ASSERT_OK(t1->SubmitFunc([](){}));
- ASSERT_OK(t2->SubmitFunc([](){}));
- ASSERT_OK(t2->SubmitFunc([](){}));
- ASSERT_OK(pool_->SubmitFunc([](){}));
- ASSERT_OK(pool_->SubmitFunc([](){}));
- ASSERT_OK(pool_->SubmitFunc([](){}));
+ ASSERT_OK(t1->Submit([](){}));
+ ASSERT_OK(t2->Submit([](){}));
+ ASSERT_OK(t2->Submit([](){}));
+ ASSERT_OK(pool_->Submit([](){}));
+ ASSERT_OK(pool_->Submit([](){}));
+ ASSERT_OK(pool_->Submit([](){}));
pool_->Wait();
// The total counts should reflect the number of submissions to each token.
@@ -488,25 +461,23 @@ TEST_F(ThreadPoolTest, TestDeadlocks) {
const char* death_msg = "called pool function that would result in deadlock";
ASSERT_DEATH({
ASSERT_OK(RebuildPoolWithMinMax(1, 1));
- ASSERT_OK(pool_->SubmitClosure(
- Bind(&ThreadPool::Shutdown, Unretained(pool_.get()))));
+ ASSERT_OK(pool_->Submit([this]() { this->pool_->Shutdown(); } ));
pool_->Wait();
}, death_msg);
ASSERT_DEATH({
ASSERT_OK(RebuildPoolWithMinMax(1, 1));
- ASSERT_OK(pool_->SubmitClosure(
- Bind(&ThreadPool::Wait, Unretained(pool_.get()))));
+ ASSERT_OK(pool_->Submit([this]() { this->pool_->Wait(); } ));
pool_->Wait();
}, death_msg);
}
#endif
-class SlowDestructorRunnable : public Runnable {
+class SlowDestructorRunnable {
public:
- void Run() override {}
+ void Run() {}
- virtual ~SlowDestructorRunnable() {
+ ~SlowDestructorRunnable() {
SleepFor(MonoDelta::FromMilliseconds(100));
}
};
@@ -517,8 +488,15 @@ TEST_F(ThreadPoolTest, TestSlowDestructor) {
ASSERT_OK(RebuildPoolWithMinMax(1, 20));
MonoTime start = MonoTime::Now();
for (int i = 0; i < 100; i++) {
- shared_ptr<Runnable> task(new SlowDestructorRunnable());
- ASSERT_OK(pool_->Submit(std::move(task)));
+ // In this particular test, it's important that the task's destructor (and
+ // thus the last ref of 'task') be dropped by the threadpool worker thread
+ // itself, so that the delay is incurred by that thread and not the task
+ // submission thread. Without C++14 capture-by-move semantics we have to
+ // work a little bit harder to accomplish that.
+ shared_ptr<SlowDestructorRunnable> task(new SlowDestructorRunnable());
+ auto wrapper = [task]() { task->Run(); };
+ task.reset();
+ ASSERT_OK(pool_->Submit(std::move(wrapper)));
}
pool_->Wait();
ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5);
@@ -536,7 +514,7 @@ INSTANTIATE_TEST_CASE_P(Tokens, ThreadPoolTestTokenTypes,
TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitAndWait) {
unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam());
int i = 0;
- ASSERT_OK(t->SubmitFunc([&]() {
+ ASSERT_OK(t->Submit([&]() {
SleepFor(MonoDelta::FromMilliseconds(1));
i++;
}));
@@ -551,8 +529,8 @@ TEST_F(ThreadPoolTest, TestTokenSubmitsProcessedSerially) {
for (char c = 'a'; c < 'f'; c++) {
// Sleep a little first so that there's a higher chance of out-of-order
// appends if the submissions did execute in parallel.
- int sleep_ms = r.Next() % 5;
- ASSERT_OK(t->SubmitFunc([&result, c, sleep_ms]() {
+ int sleep_ms = static_cast<int>(r.Uniform(5));
+ ASSERT_OK(t->Submit([&result, c, sleep_ms]() {
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
result += c;
}));
@@ -576,7 +554,7 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitsProcessedConcurrently) {
shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumTokens + 1);
for (int i = 0; i < kNumTokens; i++) {
tokens.emplace_back(pool_->NewToken(GetParam()));
- ASSERT_OK(tokens.back()->SubmitFunc([b]() {
+ ASSERT_OK(tokens.back()->Submit([b]() {
b->Wait();
}));
}
@@ -599,7 +577,7 @@ TEST_F(ThreadPoolTest, TestTokenSubmitsNonSequential) {
shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumSubmissions + 1);
unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
for (int i = 0; i < kNumSubmissions; i++) {
- ASSERT_OK(t->SubmitFunc([b]() {
+ ASSERT_OK(t->Submit([b]() {
b->Wait();
}));
}
@@ -625,12 +603,12 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) {
});
for (int i = 0; i < 3; i++) {
- ASSERT_OK(t1->SubmitFunc([&]() {
+ ASSERT_OK(t1->Submit([&]() {
l1.Wait();
}));
}
for (int i = 0; i < 3; i++) {
- ASSERT_OK(t2->SubmitFunc([&]() {
+ ASSERT_OK(t2->Submit([&]() {
l2.Wait();
}));
}
@@ -642,8 +620,8 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) {
t1->Shutdown();
// We can no longer submit to t1 but we can still submit to t2.
- ASSERT_TRUE(t1->SubmitFunc([](){}).IsServiceUnavailable());
- ASSERT_OK(t2->SubmitFunc([](){}));
+ ASSERT_TRUE(t1->Submit([](){}).IsServiceUnavailable());
+ ASSERT_OK(t2->Submit([](){}));
// Unblock t2's tasks.
l2.CountDown();
@@ -663,7 +641,7 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
for (int i = 0; i < kNumSubmissions; i++) {
// Sleep a little first to raise the likelihood of the test thread
// reaching Wait() before the submissions finish.
- int sleep_ms = r.Next() % 5;
+ int sleep_ms = static_cast<int>(r.Uniform(5));
auto task = [&v, sleep_ms]() {
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
@@ -672,10 +650,10 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
// Half of the submissions will be token-less, and half will use a token.
if (i % 2 == 0) {
- ASSERT_OK(pool_->SubmitFunc(task));
+ ASSERT_OK(pool_->Submit(task));
} else {
- int token_idx = r.Next() % tokens.size();
- ASSERT_OK(tokens[token_idx]->SubmitFunc(task));
+ int token_idx = static_cast<int>(r.Uniform(tokens.size()));
+ ASSERT_OK(tokens[token_idx]->Submit(task));
}
}
pool_->Wait();
@@ -697,11 +675,11 @@ TEST_F(ThreadPoolTest, TestFuzz) {
// - Shutdown a randomly selected token: 4%
// - Deallocate a randomly selected token: 2%
// - Wait for all submissions: 2%
- int op = r.Next() % 100;
+ int op = static_cast<int>(r.Uniform(100));
if (op < 40) {
// Submit without a token.
- int sleep_ms = r.Next() % 5;
- ASSERT_OK(pool_->SubmitFunc([sleep_ms]() {
+ int sleep_ms = static_cast<int>(r.Uniform(5));
+ ASSERT_OK(pool_->Submit([sleep_ms]() {
// Sleep a little first to increase task overlap.
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
}));
@@ -710,16 +688,16 @@ TEST_F(ThreadPoolTest, TestFuzz) {
if (tokens.empty()) {
continue;
}
- int sleep_ms = r.Next() % 5;
- int token_idx = r.Next() % tokens.size();
- Status s = tokens[token_idx]->SubmitFunc([sleep_ms]() {
+ int sleep_ms = static_cast<int>(r.Uniform(5));
+ int token_idx = static_cast<int>(r.Uniform(tokens.size()));
+ Status s = tokens[token_idx]->Submit([sleep_ms]() {
// Sleep a little first to increase task overlap.
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
});
ASSERT_TRUE(s.ok() || s.IsServiceUnavailable());
} else if (op < 85) {
// Allocate a token with a randomly selected policy.
- ThreadPool::ExecutionMode mode = r.Next() % 2 ?
+ ThreadPool::ExecutionMode mode = r.OneIn(2) ?
ThreadPool::ExecutionMode::SERIAL :
ThreadPool::ExecutionMode::CONCURRENT;
tokens.emplace_back(pool_->NewToken(mode));
@@ -728,14 +706,14 @@ TEST_F(ThreadPoolTest, TestFuzz) {
if (tokens.empty()) {
continue;
}
- int token_idx = r.Next() % tokens.size();
+ int token_idx = static_cast<int>(r.Uniform(tokens.size()));
tokens[token_idx]->Wait();
} else if (op < 96) {
// Shutdown a randomly selected token.
if (tokens.empty()) {
continue;
}
- int token_idx = r.Next() % tokens.size();
+ int token_idx = static_cast<int>(r.Uniform(tokens.size()));
tokens[token_idx]->Shutdown();
} else if (op < 98) {
// Deallocate a randomly selected token.
@@ -743,7 +721,7 @@ TEST_F(ThreadPoolTest, TestFuzz) {
continue;
}
auto it = tokens.begin();
- int token_idx = r.Next() % tokens.size();
+ int token_idx = static_cast<int>(r.Uniform(tokens.size()));
std::advance(it, token_idx);
tokens.erase(it);
} else {
@@ -756,7 +734,7 @@ TEST_F(ThreadPoolTest, TestFuzz) {
// Some test runs will shut down the pool before the tokens, and some won't.
// Either way should be safe.
- if (r.Next() % 2 == 0) {
+ if (r.OneIn(2)) {
pool_->Shutdown();
}
}
@@ -774,9 +752,9 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) {
});
// We will be able to submit two tasks: one for max_threads == 1 and one for
// max_queue_size == 1.
- ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
- ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
- Status s = t->Submit(SlowTask::NewSlowTask(&latch));
+ ASSERT_OK(t->Submit([&latch]() { latch.Wait(); }));
+ ASSERT_OK(t->Submit([&latch]() { latch.Wait(); }));
+ Status s = t->Submit([&latch]() { latch.Wait(); });
ASSERT_TRUE(s.IsServiceUnavailable());
}
@@ -806,7 +784,7 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
ThreadPool::ExecutionMode mode;
{
std::lock_guard<simple_spinlock> l(lock);
- mode = rng.Next() % 2 ?
+ mode = rng.OneIn(2) ?
ThreadPool::ExecutionMode::SERIAL :
ThreadPool::ExecutionMode::CONCURRENT;
}
@@ -832,7 +810,7 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
{
std::lock_guard<simple_spinlock> l(lock);
int idx = rng.Uniform(kNumTokens);
- ThreadPool::ExecutionMode mode = rng.Next() % 2 ?
+ ThreadPool::ExecutionMode mode = rng.OneIn(2) ?
ThreadPool::ExecutionMode::SERIAL :
ThreadPool::ExecutionMode::CONCURRENT;
tokens[idx] = shared_ptr<ThreadPoolToken>(pool_->NewToken(mode).release());
@@ -878,8 +856,8 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
int num_tokens_submitted = 0;
Random rng(SeedRandom());
while (latch.count()) {
- int sleep_ms = rng.Next() % 5;
- Status s = GetRandomToken()->SubmitFunc([sleep_ms]() {
+ int sleep_ms = static_cast<int>(rng.Uniform(5));
+ Status s = GetRandomToken()->Submit([sleep_ms]() {
// Sleep a little first so that tasks are running during other events.
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
});
@@ -920,7 +898,7 @@ TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
latch.CountDown();
});
for (int i = 0; i < kNumThreads; i++) {
- ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); }));
}
ASSERT_EQ(kNumThreads, pool_->num_threads());
latch.CountDown();
@@ -936,7 +914,7 @@ TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
// If LIFO order is used, the same thread will be reused for each task and
// the other threads will eventually time out.
AssertEventually([&]() {
- ASSERT_OK(pool_->SubmitFunc([](){}));
+ ASSERT_OK(pool_->Submit([](){}));
SleepFor(MonoDelta::FromMilliseconds(10));
ASSERT_EQ(1, pool_->num_threads());
}, MonoDelta::FromSeconds(10), AssertBackoff::NONE);
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index e983c7c..5199086 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -19,16 +19,15 @@
#include <cstdint>
#include <deque>
+#include <functional>
#include <limits>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
-#include <boost/function.hpp> // IWYU pragma: keep
#include <glog/logging.h>
-#include "kudu/gutil/callback.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
@@ -49,38 +48,6 @@ using std::unique_ptr;
using strings::Substitute;
////////////////////////////////////////////////////////
-// FunctionRunnable
-////////////////////////////////////////////////////////
-
-class FunctionRunnable : public Runnable {
- public:
- explicit FunctionRunnable(boost::function<void()> func) : func_(std::move(func)) {}
-
- void Run() OVERRIDE {
- func_();
- }
-
- private:
- boost::function<void()> func_;
-};
-
-////////////////////////////////////////////////////////
-// ClosureRunnable
-////////////////////////////////////////////////////////
-
-class ClosureRunnable : public Runnable {
- public:
- explicit ClosureRunnable(Closure cl) : cl_(std::move(cl)) {}
-
- void Run() OVERRIDE {
- cl_.Run();
- }
-
- private:
- Closure cl_;
-};
-
-////////////////////////////////////////////////////////
// ThreadPoolBuilder
////////////////////////////////////////////////////////
@@ -148,16 +115,8 @@ ThreadPoolToken::~ThreadPoolToken() {
pool_->ReleaseToken(this);
}
-Status ThreadPoolToken::SubmitClosure(Closure c) {
- return Submit(std::make_shared<ClosureRunnable>(std::move(c)));
-}
-
-Status ThreadPoolToken::SubmitFunc(boost::function<void()> f) {
- return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
-}
-
-Status ThreadPoolToken::Submit(shared_ptr<Runnable> r) {
- return pool_->DoSubmit(std::move(r), this);
+Status ThreadPoolToken::Submit(std::function<void()> f) {
+ return pool_->DoSubmit(std::move(f), this);
}
void ThreadPoolToken::Shutdown() {
@@ -445,19 +404,11 @@ void ThreadPool::ReleaseToken(ThreadPoolToken* t) {
CHECK_EQ(1, tokens_.erase(t));
}
-Status ThreadPool::SubmitClosure(Closure c) {
- return Submit(std::make_shared<ClosureRunnable>(std::move(c)));
-}
-
-Status ThreadPool::SubmitFunc(boost::function<void()> f) {
- return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
-}
-
-Status ThreadPool::Submit(shared_ptr<Runnable> r) {
- return DoSubmit(std::move(r), tokenless_.get());
+Status ThreadPool::Submit(std::function<void()> f) {
+ return DoSubmit(std::move(f), tokenless_.get());
}
-Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
+Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
DCHECK(token);
MonoTime submit_time = MonoTime::Now();
@@ -509,7 +460,7 @@ Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
}
Task task;
- task.runnable = std::move(r);
+ task.func = std::move(f);
task.trace = Trace::CurrentTrace();
// Need to AddRef, since the thread which submitted the task may go away,
// and we don't want the trace to be destructed while waiting in the queue.
@@ -682,7 +633,7 @@ void ThreadPool::DispatchThread() {
MicrosecondsInt64 start_wall_us = GetMonoTimeMicros();
MicrosecondsInt64 start_cpu_us = GetThreadCpuTimeMicros();
- task.runnable->Run();
+ task.func();
int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us;
@@ -702,7 +653,7 @@ void ThreadPool::DispatchThread() {
// objects, and we don't want to block submission of the threadpool.
// In the worst case, the destructor might even try to do something
// with this threadpool, and produce a deadlock.
- task.runnable.reset();
+ task.func = nullptr;
unique_lock.Lock();
// Possible states:
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 789b132..a1037e8 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -17,6 +17,7 @@
#pragma once
#include <deque>
+#include <functional>
#include <iosfwd>
#include <memory>
#include <string>
@@ -26,7 +27,6 @@
#include <boost/intrusive/list_hook.hpp>
#include <gtest/gtest_prod.h>
-#include "kudu/gutil/callback.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
@@ -36,11 +36,6 @@
#include "kudu/util/mutex.h"
#include "kudu/util/status.h"
-namespace boost {
-template <typename Signature>
-class function;
-} // namespace boost
-
namespace kudu {
class Thread;
@@ -48,12 +43,6 @@ class ThreadPool;
class ThreadPoolToken;
class Trace;
-class Runnable {
- public:
- virtual void Run() = 0;
- virtual ~Runnable() {}
-};
-
// Interesting thread pool metrics. Can be applied to the entire pool (see
// ThreadPoolBuilder) or to individual tokens.
struct ThreadPoolMetrics {
@@ -164,7 +153,6 @@ class ThreadPoolBuilder {
//
// Usage Example:
// static void Func(int n) { ... }
-// class Task : public Runnable { ... }
//
// unique_ptr<ThreadPool> thread_pool;
// CHECK_OK(
@@ -174,8 +162,7 @@ class ThreadPoolBuilder {
// .set_max_queue_size(10)
// .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
// .Build(&thread_pool));
-// thread_pool->Submit(shared_ptr<Runnable>(new Task()));
-// thread_pool->SubmitFunc(boost::bind(&Func, 10));
+// thread_pool->Submit([](){ Func(10) });
class ThreadPool {
public:
~ThreadPool();
@@ -188,14 +175,8 @@ class ThreadPool {
// require an explicit "abort" notification to exit from the run loop.
void Shutdown();
- // Submits a function using the kudu Closure system.
- Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
-
- // Submits a function bound using boost::bind(&FuncName, args...).
- Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT;
-
- // Submits a Runnable class.
- Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
+ // Submits a new task.
+ Status Submit(std::function<void()> f) WARN_UNUSED_RESULT;
// Waits until all the tasks are completed.
void Wait();
@@ -243,7 +224,7 @@ class ThreadPool {
// Client-provided task to be executed by this pool.
struct Task {
- std::shared_ptr<Runnable> runnable;
+ std::function<void()> func;
Trace* trace;
// Time at which the entry was submitted to the pool.
@@ -269,7 +250,7 @@ class ThreadPool {
void CheckNotPoolThreadUnlocked();
// Submits a task to be run via token.
- Status DoSubmit(std::shared_ptr<Runnable> r, ThreadPoolToken* token);
+ Status DoSubmit(std::function<void()> f, ThreadPoolToken* token);
// Releases token 't' and invalidates it.
void ReleaseToken(ThreadPoolToken* t);
@@ -383,14 +364,8 @@ class ThreadPoolToken {
// called first to take care of them.
~ThreadPoolToken();
- // Submits a function using the kudu Closure system.
- Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
-
- // Submits a function bound using boost::bind(&FuncName, args...).
- Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT;
-
- // Submits a Runnable class.
- Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
+ // Submits a new task.
+ Status Submit(std::function<void()> f) WARN_UNUSED_RESULT;
// Marks the token as unusable for future submissions. Any queued tasks not
// yet running are destroyed. If tasks are in flight, Shutdown() will wait