You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/28 06:32:32 UTC
[kudu] branch master updated: status_callback: replace Bind usage
with lambdas
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new be2d7c5 status_callback: replace Bind usage with lambdas
be2d7c5 is described below
commit be2d7c5882a9f4842e6387c4c4cef50b2e4bae75
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Thu Mar 26 00:10:25 2020 -0700
status_callback: replace Bind usage with lambdas
StatusCallback was another significant user of kudu::Bind. I've removed it
in favor of StdStatusCallback, which has been renamed to StatusCallback. All
affected Bind users were converted into lambdas.
Change-Id: I66e90b8144b2a6b03b1e102dd4ca93f8e232b1a9
Reviewed-on: http://gerrit.cloudera.org:8080/15562
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/client/authz_token_cache.cc | 33 ++++++++++-----------
src/kudu/client/batcher.cc | 10 ++++---
src/kudu/client/client-internal.cc | 8 ++---
src/kudu/client/master_proxy_rpc.cc | 7 ++---
src/kudu/client/master_rpc.cc | 6 ++--
src/kudu/client/meta_cache.cc | 34 ++++++++++------------
src/kudu/consensus/consensus-test-util.h | 5 ++--
src/kudu/consensus/consensus_queue.cc | 18 +++++-------
src/kudu/consensus/log-test-base.h | 4 +--
src/kudu/consensus/log.cc | 14 ++++-----
src/kudu/consensus/log_cache-test.cc | 4 +--
src/kudu/consensus/log_cache.cc | 13 ++++-----
src/kudu/consensus/mt-log-test.cc | 5 ++--
src/kudu/consensus/raft_consensus.cc | 20 ++++++-------
src/kudu/consensus/raft_consensus.h | 12 ++++----
src/kudu/consensus/raft_consensus_quorum-test.cc | 10 ++++---
src/kudu/master/hms_notification_log_listener.cc | 5 ++--
src/kudu/master/sentry_privileges_fetcher.cc | 3 +-
src/kudu/subprocess/server.cc | 6 ++--
src/kudu/subprocess/server.h | 6 ++--
src/kudu/tablet/tablet.cc | 3 +-
src/kudu/tablet/tablet_bootstrap.cc | 8 ++---
src/kudu/tablet/tablet_metadata.cc | 8 ++---
src/kudu/tablet/transactions/transaction_driver.cc | 8 ++---
src/kudu/thrift/client.h | 2 +-
src/kudu/tserver/tablet_service.cc | 2 +-
src/kudu/util/async_util-test.cc | 9 +++---
src/kudu/util/async_util.h | 9 ++----
src/kudu/util/net/dns_resolver.cc | 6 ++--
src/kudu/util/status_callback.h | 16 ++--------
30 files changed, 133 insertions(+), 161 deletions(-)
diff --git a/src/kudu/client/authz_token_cache.cc b/src/kudu/client/authz_token_cache.cc
index 67d82c0..a1f5f8c 100644
--- a/src/kudu/client/authz_token_cache.cc
+++ b/src/kudu/client/authz_token_cache.cc
@@ -25,16 +25,12 @@
#include <unordered_map>
#include <vector>
-#include <boost/bind.hpp> // IWYU pragma: keep
#include <glog/logging.h>
#include "kudu/client/client-internal.h"
#include "kudu/client/client.h"
#include "kudu/client/master_proxy_rpc.h"
#include "kudu/common/wire_protocol.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
-#include "kudu/gutil/callback.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
@@ -47,28 +43,29 @@
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"
+using kudu::master::MasterFeatures;
+using kudu::master::MasterServiceProxy;
+using kudu::rpc::BackoffType;
+using kudu::rpc::ResponseCallback;
+using kudu::security::SignedTokenPB;
using std::string;
using std::vector;
using strings::Substitute;
namespace kudu {
-
-using master::MasterFeatures;
-using master::MasterServiceProxy;
-using rpc::BackoffType;
-using security::SignedTokenPB;
-
namespace client {
namespace internal {
RetrieveAuthzTokenRpc::RetrieveAuthzTokenRpc(const KuduTable* table,
MonoTime deadline)
- : AsyncLeaderMasterRpc(deadline, table->client(), BackoffType::LINEAR, req_, &resp_,
- &MasterServiceProxy::GetTableSchemaAsync, "RetrieveAuthzToken",
- Bind(&AuthzTokenCache::RetrievedNewAuthzTokenCb,
- Unretained(&table->client()->data_->authz_token_cache_),
- table->id()),
- { MasterFeatures::GENERATE_AUTHZ_TOKEN }),
+ : AsyncLeaderMasterRpc(
+ deadline, table->client(), BackoffType::LINEAR, req_, &resp_,
+ &MasterServiceProxy::GetTableSchemaAsync, "RetrieveAuthzToken",
+ [table](const Status& s) {
+ table->client()->data_->authz_token_cache_.RetrievedNewAuthzTokenCb(
+ table->id(), s);
+ },
+ { MasterFeatures::GENERATE_AUTHZ_TOKEN }),
table_(table) {
req_.mutable_table()->set_table_id(table_->id());
}
@@ -98,7 +95,7 @@ void RetrieveAuthzTokenRpc::SendRpcCb(const Status& status) {
client_->data_->authz_token_cache_.Put(table_->id(), resp_.authz_token());
}
}
- user_cb_.Run(new_status);
+ user_cb_(new_status);
}
void AuthzTokenCache::Put(const string& table_id, SignedTokenPB authz_token) {
@@ -154,7 +151,7 @@ void AuthzTokenCache::RetrievedNewAuthzTokenCb(const string& table_id,
}
DCHECK(!cbs.empty());
for (const auto& cb : cbs) {
- cb.Run(status);
+ cb(status);
}
}
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index 2fe9e8a..e673ac8 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -18,6 +18,7 @@
#include "kudu/client/batcher.h"
#include <cstddef>
+#include <functional>
#include <mutex>
#include <ostream>
#include <string>
@@ -44,7 +45,6 @@
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/atomic_refcount.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
@@ -544,7 +544,7 @@ bool WriteRpc::GetNewAuthnTokenAndRetry() {
KuduClient* c = batcher_->client_;
VLOG(1) << "Retrieving new authn token from master";
c->data_->ConnectToClusterAsync(c, retrier().deadline(),
- Bind(&WriteRpc::GotNewAuthnTokenRetryCb, Unretained(this)),
+ [this](const Status& s) { this->GotNewAuthnTokenRetryCb(s); },
CredentialsPolicy::PRIMARY_CREDENTIALS);
return true;
}
@@ -555,7 +555,7 @@ bool WriteRpc::GetNewAuthzTokenAndRetry() {
KuduClient* c = batcher_->client_;
VLOG(1) << "Retrieving new authz token from master";
c->data_->RetrieveAuthzTokenAsync(table(),
- Bind(&WriteRpc::GotNewAuthzTokenRetryCb, Unretained(this)),
+ [this](const Status& s) { this->GotNewAuthzTokenRetryCb(s); },
retrier().deadline());
return true;
}
@@ -722,15 +722,17 @@ Status Batcher::Add(KuduWriteOperation* write_op) {
//
// deadline_ is set in FlushAsync(), after all Add() calls are done, so
// here we're forced to create a new deadline.
+ auto op_raw = op.get();
MonoTime deadline = ComputeDeadlineUnlocked();
base::RefCountInc(&outstanding_lookups_);
+ scoped_refptr<Batcher> self(this);
client_->data_->meta_cache_->LookupTabletByKey(
op->write_op->table(),
std::move(partition_key),
deadline,
MetaCache::LookupType::kPoint,
&op->tablet,
- Bind(&Batcher::TabletLookupFinished, this, op.get()));
+ [self, op_raw](const Status& s) { self->TabletLookupFinished(op_raw, s); });
IgnoreResult(op.release());
buffer_bytes_used_.IncrementBy(write_op->SizeInBuffer());
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 783bd4d..b8ba40f 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -646,7 +646,7 @@ void KuduClient::Data::ConnectedToClusterCb(
}
for (const StatusCallback& cb : cbs) {
- cb.Run(status);
+ cb(status);
}
}
@@ -676,12 +676,12 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
s = dns_resolver_->ResolveAddresses(hp, &addrs);
}
if (!s.ok()) {
- cb.Run(s);
+ cb(s);
return;
}
if (addrs.empty()) {
- cb.Run(Status::InvalidArgument(Substitute("No master address specified by '$0'",
- master_server_addr)));
+ cb(Status::InvalidArgument(Substitute("No master address specified by '$0'",
+ master_server_addr)));
return;
}
if (addrs.size() > 1) {
diff --git a/src/kudu/client/master_proxy_rpc.cc b/src/kudu/client/master_proxy_rpc.cc
index 412d22c..45e78d1 100644
--- a/src/kudu/client/master_proxy_rpc.cc
+++ b/src/kudu/client/master_proxy_rpc.cc
@@ -28,8 +28,6 @@
#include "kudu/client/client-internal.h"
#include "kudu/client/client.h"
#include "kudu/common/wire_protocol.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.pb.h"
#include "kudu/rpc/response_callback.h"
@@ -148,7 +146,7 @@ void AsyncLeaderMasterRpc<ReqClass, RespClass>::SendRpcCb(const Status& status)
if (s.ok() && resp_->has_error()) {
s = StatusFromPB(resp_->error().status());
}
- user_cb_.Run(s);
+ user_cb_(s);
}
template <class ReqClass, class RespClass>
@@ -178,8 +176,7 @@ void AsyncLeaderMasterRpc<ReqClass, RespClass>::ResetMasterLeaderAndRetry(
// FATAL_INVALID_AUTHENTICATION_TOKEN error as well.
client_->data_->ConnectToClusterAsync(
client_, retrier().deadline(),
- Bind(&AsyncLeaderMasterRpc<ReqClass, RespClass>::NewLeaderMasterDeterminedCb,
- Unretained(this), creds_policy),
+ [=](const Status& s) { this->NewLeaderMasterDeterminedCb(creds_policy, s); },
creds_policy);
}
diff --git a/src/kudu/client/master_rpc.cc b/src/kudu/client/master_rpc.cc
index e82cb20..b35bf9f 100644
--- a/src/kudu/client/master_rpc.cc
+++ b/src/kudu/client/master_rpc.cc
@@ -32,7 +32,6 @@
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/basictypes.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.proxy.h"
@@ -226,7 +225,7 @@ void ConnectToMasterRpc::SendRpcCb(const Status& status) {
new_status = StatusFromPB(out_->error().status());
}
}
- user_cb_.Run(new_status);
+ user_cb_(new_status);
}
} // anonymous namespace
@@ -277,8 +276,9 @@ void ConnectToClusterRpc::SendRpc() {
std::lock_guard<simple_spinlock> l(lock_);
for (int i = 0; i < addrs_with_names_.size(); i++) {
+ scoped_refptr<ConnectToClusterRpc> self(this);
ConnectToMasterRpc* rpc = new ConnectToMasterRpc(
- Bind(&ConnectToClusterRpc::SingleNodeCallback, this, i),
+ [self, i](const Status& s) { self->SingleNodeCallback(i, s); },
addrs_with_names_[i],
actual_deadline,
retrier().messenger(),
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 1b7ff51..263659c 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -27,7 +27,6 @@
#include <utility>
#include <vector>
-#include <boost/bind.hpp> // IWYU pragma: keep
#include <glog/logging.h>
#include <google/protobuf/repeated_field.h> // IWYU pragma: keep
@@ -39,8 +38,6 @@
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/basictypes.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
@@ -99,7 +96,7 @@ void RemoteTabletServer::DnsResolutionFinished(const HostPort& hp,
if (!s.ok()) {
s = s.CloneAndPrepend("Failed to resolve address for TS " + uuid_);
- user_callback.Run(s);
+ user_callback(s);
return;
}
@@ -111,7 +108,7 @@ void RemoteTabletServer::DnsResolutionFinished(const HostPort& hp,
proxy_.reset(new TabletServerServiceProxy(client->data_->messenger_, (*addrs)[0], hp.host()));
proxy_->set_user_credentials(client->data_->user_credentials_);
}
- user_callback.Run(s);
+ user_callback(s);
}
void RemoteTabletServer::InitProxy(KuduClient* client, const StatusCallback& cb) {
@@ -122,7 +119,7 @@ void RemoteTabletServer::InitProxy(KuduClient* client, const StatusCallback& cb)
if (proxy_) {
// Already have a proxy created.
l.unlock();
- cb.Run(Status::OK());
+ cb(Status::OK());
return;
}
@@ -134,8 +131,9 @@ void RemoteTabletServer::InitProxy(KuduClient* client, const StatusCallback& cb)
auto addrs = new vector<Sockaddr>();
client->data_->dns_resolver_->ResolveAddressesAsync(
- hp, addrs, Bind(&RemoteTabletServer::DnsResolutionFinished,
- Unretained(this), hp, addrs, client, cb));
+ hp, addrs, [=](const Status& s) {
+ this->DnsResolutionFinished(hp, addrs, client, cb, s);
+ });
}
void RemoteTabletServer::Update(const master::TSInfoPB& pb) {
@@ -457,17 +455,17 @@ void MetaCacheServerPicker::PickLeader(const ServerPickedCallback& callback,
deadline,
MetaCache::LookupType::kPoint,
nullptr,
- Bind(&MetaCacheServerPicker::LookUpTabletCb, Unretained(this), callback, deadline));
+ [this, callback, deadline](const Status& s) {
+ this->LookUpTabletCb(callback, deadline, s);
+ });
return;
}
// If we have a current TS initialize the proxy.
// Make sure we have a working proxy before sending out the RPC.
- leader->InitProxy(client_,
- Bind(&MetaCacheServerPicker::InitProxyCb,
- Unretained(this),
- callback,
- leader));
+ leader->InitProxy(client_, [this, callback, leader](const Status& s) {
+ this->InitProxyCb(callback, leader, s);
+ });
}
void MetaCacheServerPicker::MarkServerFailed(RemoteTabletServer* replica, const Status& status) {
@@ -672,7 +670,7 @@ void LookupRpc::SendRpc() {
Status fastpath_status = meta_cache_->DoFastPathLookup(
table_, &partition_key_, lookup_type_, remote_tablet_);
if (!fastpath_status.IsIncomplete()) {
- user_cb_.Run(fastpath_status);
+ user_cb_(fastpath_status);
delete this;
return;
}
@@ -720,7 +718,7 @@ void LookupRpc::ResetMasterLeaderAndRetry(CredentialsPolicy creds_policy) {
table_->client()->data_->ConnectToClusterAsync(
table_->client(),
retrier().deadline(),
- Bind(&LookupRpc::NewLeaderMasterDeterminedCb, Unretained(this), creds_policy),
+ [=](const Status& s) { this->NewLeaderMasterDeterminedCb(creds_policy, s); },
creds_policy);
}
@@ -764,7 +762,7 @@ void LookupRpc::SendRpcCb(const Status& status) {
new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString()));
KLOG_EVERY_N_SECS(WARNING, 1) << new_status.ToString();
}
- user_cb_.Run(new_status);
+ user_cb_(new_status);
}
Status MetaCache::ProcessLookupResponse(const LookupRpc& rpc,
@@ -1016,7 +1014,7 @@ void MetaCache::LookupTabletByKey(const KuduTable* table,
Status fastpath_status = DoFastPathLookup(
table, &partition_key, lookup_type, remote_tablet);
if (!fastpath_status.IsIncomplete()) {
- callback.Run(fastpath_status);
+ callback(fastpath_status);
return;
}
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 2b21cef..dec636e 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -36,7 +36,6 @@
#include "kudu/consensus/log.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
@@ -678,8 +677,8 @@ class TestDriver {
std::unique_ptr<CommitMsg> msg(new CommitMsg);
msg->set_op_type(round_->replicate_msg()->op_type());
msg->mutable_commited_op_id()->CopyFrom(round_->id());
- CHECK_OK(log_->AsyncAppendCommit(std::move(msg),
- Bind(&TestDriver::CommitCallback, Unretained(this))));
+ CHECK_OK(log_->AsyncAppendCommit(
+ std::move(msg), [this](const Status& s) { this->CommitCallback(s); }));
}
void CommitCallback(const Status& s) {
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index c79d74a..676d99d 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -37,8 +37,6 @@
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/time_manager.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
@@ -377,12 +375,13 @@ void PeerMessageQueue::LocalPeerAppendFinished(const OpId& id,
}
ResponseFromPeer(local_peer_pb_.permanent_uuid(), fake_response);
- callback.Run(status);
+ callback(status);
}
Status PeerMessageQueue::AppendOperation(const ReplicateRefPtr& msg) {
- return AppendOperations({ msg }, Bind(CrashIfNotOkStatusCB,
- "Enqueued replicate operation failed to write to WAL"));
+ return AppendOperations({ msg }, [](const Status& s) {
+ CrashIfNotOkStatusCB("Enqueued replicate operation failed to write to WAL", s);
+ });
}
Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
@@ -424,11 +423,10 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
// for the log buffer to empty, it may need to call LocalPeerAppendFinished()
// which also needs queue_lock_.
lock.unlock();
- RETURN_NOT_OK(log_cache_.AppendOperations(msgs,
- Bind(&PeerMessageQueue::LocalPeerAppendFinished,
- Unretained(this),
- last_id,
- log_append_callback)));
+ RETURN_NOT_OK(log_cache_.AppendOperations(
+ msgs, [this, last_id, log_append_callback](const Status& s) {
+ this->LocalPeerAppendFinished(last_id, log_append_callback, s);
+ }));
lock.lock();
DCHECK(last_id.IsInitialized());
queue_state_.last_appended = last_id;
diff --git a/src/kudu/consensus/log-test-base.h b/src/kudu/consensus/log-test-base.h
index b7728bd..2041e81 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -252,7 +252,7 @@ class LogTestBase : public KuduTest {
// AsyncAppendReplicates does not free the ReplicateMsg on completion, so we
// need to pass it through to our callback.
return log_->AsyncAppendReplicates(
- { replicate }, Bind(&LogTestBase::CheckReplicateResult, replicate));
+ { replicate }, [replicate](const Status& s) { CheckReplicateResult(replicate, s); });
}
static void CheckCommitResult(const Status& s) {
@@ -321,7 +321,7 @@ class LogTestBase : public KuduTest {
return s.Wait();
}
return log_->AsyncAppendCommit(std::move(commit),
- Bind(&LogTestBase::CheckCommitResult));
+ [](const Status& s) { CheckCommitResult(s); });
}
// Appends 'count' ReplicateMsgs and the corresponding CommitMsgs to the log
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index a5db193..576b44e 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -382,9 +382,9 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
// abort all subsequent transactions in this batch or allow
// them to be appended? What about transactions in future
// batches?
- if (!entry_batch->callback().is_null()) {
- entry_batch->callback().Run(s);
- entry_batch->callback_.Reset();
+ if (entry_batch->callback()) {
+ entry_batch->callback()(s);
+ entry_batch->callback_ = nullptr;
}
}
if (is_all_commits && entry_batch->type_ != COMMIT) {
@@ -399,8 +399,8 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
for (LogEntryBatch* entry_batch : entry_batches) {
- if (!entry_batch->callback().is_null()) {
- entry_batch->callback().Run(s);
+ if (entry_batch->callback()) {
+ entry_batch->callback()(s);
}
delete entry_batch;
}
@@ -409,8 +409,8 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
SCOPED_WATCH_STACK(100);
for (LogEntryBatch* entry_batch : entry_batches) {
- if (PREDICT_TRUE(!entry_batch->callback().is_null())) {
- entry_batch->callback().Run(Status::OK());
+ if (PREDICT_TRUE(entry_batch->callback())) {
+ entry_batch->callback()(Status::OK());
}
// It's important to delete each batch as we see it, because
// deleting it may free up memory from memory trackers, and the
diff --git a/src/kudu/consensus/log_cache-test.cc b/src/kudu/consensus/log_cache-test.cc
index 3972696..423f02a 100644
--- a/src/kudu/consensus/log_cache-test.cc
+++ b/src/kudu/consensus/log_cache-test.cc
@@ -20,6 +20,7 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
+#include <functional>
#include <initializer_list>
#include <memory>
#include <ostream>
@@ -43,7 +44,6 @@
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/fs/fs_manager.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
@@ -129,7 +129,7 @@ class LogCacheTest : public KuduTest {
vector<ReplicateRefPtr> msgs;
msgs.push_back(make_scoped_refptr_replicate(
CreateDummyReplicate(term, index, clock_->Now(), payload_size).release()));
- RETURN_NOT_OK(cache_->AppendOperations(msgs, Bind(&FatalOnError)));
+ RETURN_NOT_OK(cache_->AppendOperations(msgs, [](const Status& s) { FatalOnError(s); }));
}
return Status::OK();
}
diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index a0dbb95..26e8a1c 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -17,6 +17,7 @@
#include "kudu/consensus/log_cache.h"
+#include <functional>
#include <map>
#include <mutex>
#include <ostream>
@@ -35,8 +36,6 @@
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/ref_counted_replicate.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/mathlimits.h"
#include "kudu/gutil/strings/human_readable.h"
@@ -213,11 +212,9 @@ Status LogCache::AppendOperations(const vector<ReplicateRefPtr>& msgs,
metrics_.log_cache_num_ops->IncrementBy(msgs.size());
Status log_status = log_->AsyncAppendReplicates(
- msgs, Bind(&LogCache::LogCallback,
- Unretained(this),
- last_idx_in_batch,
- borrowed_memory,
- callback));
+ msgs, [this, last_idx_in_batch, borrowed_memory, callback](const Status& s) {
+ this->LogCallback(last_idx_in_batch, borrowed_memory, callback, s);
+ });
if (!log_status.ok()) {
LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Couldn't append to log: " << log_status.ToString();
@@ -249,7 +246,7 @@ void LogCache::LogCallback(int64_t last_idx_in_batch,
}
}
}
- user_callback.Run(log_status);
+ user_callback(log_status);
}
bool LogCache::HasOpBeenWritten(int64_t index) const {
diff --git a/src/kudu/consensus/mt-log-test.cc b/src/kudu/consensus/mt-log-test.cc
index 46bf9ab..2c74002 100644
--- a/src/kudu/consensus/mt-log-test.cc
+++ b/src/kudu/consensus/mt-log-test.cc
@@ -18,6 +18,7 @@
#include <algorithm>
#include <atomic>
#include <cstdint>
+#include <functional>
#include <map>
#include <memory>
#include <mutex>
@@ -41,7 +42,6 @@
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/ref_counted_replicate.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
@@ -95,7 +95,8 @@ class CustomLatchCallback : public RefCountedThreadSafe<CustomLatchCallback> {
}
StatusCallback AsStatusCallback() {
- return Bind(&CustomLatchCallback::StatusCB, this);
+ scoped_refptr<CustomLatchCallback> self(this);
+ return [self](const Status& s) { self->StatusCB(s); };
}
private:
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index b7174f3..0c2e473 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -46,7 +46,6 @@
#include "kudu/consensus/pending_rounds.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/time_manager.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
@@ -1817,7 +1816,7 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request,
}
Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
- StdStatusCallback client_cb,
+ StatusCallback client_cb,
boost::optional<TabletServerErrorPB::Code>* error_code) {
TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig",
"peer", peer_uuid(),
@@ -1844,7 +1843,7 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
}
Status RaftConsensus::BulkChangeConfig(const BulkChangeConfigRequestPB& req,
- StdStatusCallback client_cb,
+ StatusCallback client_cb,
boost::optional<TabletServerErrorPB::Code>* error_code) {
TRACE_EVENT2("consensus", "RaftConsensus::BulkChangeConfig",
"peer", peer_uuid(),
@@ -2249,7 +2248,7 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg
VLOG_WITH_PREFIX_UNLOCKED(1) << "Starting consensus round: "
<< SecureShortDebugString(msg->get()->id());
scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
- StdStatusCallback client_cb = std::bind(&RaftConsensus::MarkDirtyOnSuccess,
+ StatusCallback client_cb = std::bind(&RaftConsensus::MarkDirtyOnSuccess,
this,
string("Replicated consensus-only round"),
&DoNothingStatusCB,
@@ -2487,7 +2486,7 @@ void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
Status RaftConsensus::ReplicateConfigChangeUnlocked(
RaftConfigPB old_config,
RaftConfigPB new_config,
- StdStatusCallback client_cb) {
+ StatusCallback client_cb) {
DCHECK(lock_.is_locked());
auto cc_replicate = new ReplicateMsg();
cc_replicate->set_op_type(CHANGE_CONFIG_OP);
@@ -2780,7 +2779,7 @@ void RaftConsensus::MarkDirty(const string& reason) {
}
void RaftConsensus::MarkDirtyOnSuccess(const string& reason,
- const StdStatusCallback& client_cb,
+ const StatusCallback& client_cb,
const Status& status) {
if (PREDICT_TRUE(status.ok())) {
MarkDirty(reason);
@@ -2789,7 +2788,7 @@ void RaftConsensus::MarkDirtyOnSuccess(const string& reason,
}
void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
- const StdStatusCallback& client_cb,
+ const StatusCallback& client_cb,
const Status& status) {
// NOTE: lock_ is held here because this is triggered by
// PendingRounds::AbortOpsAfter() and AdvanceCommittedIndex().
@@ -2819,9 +2818,10 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
commit_msg->set_op_type(round->replicate_msg()->op_type());
*commit_msg->mutable_commited_op_id() = round->id();
- CHECK_OK(log_->AsyncAppendCommit(std::move(commit_msg),
- Bind(CrashIfNotOkStatusCB,
- "Enqueued commit operation failed to write to WAL")));
+ CHECK_OK(log_->AsyncAppendCommit(
+ std::move(commit_msg), [](const Status& s) {
+ CrashIfNotOkStatusCB("Enqueued commit operation failed to write to WAL", s);
+ }));
client_cb(status);
}
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 1a1cef3..912da5c 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -105,7 +105,7 @@ struct TabletVotingState {
};
typedef int64_t ConsensusTerm;
-typedef StdStatusCallback ConsensusReplicatedCallback;
+typedef StatusCallback ConsensusReplicatedCallback;
class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
public enable_make_shared<RaftConsensus>,
@@ -299,12 +299,12 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// Implement a ChangeConfig() request.
Status ChangeConfig(const ChangeConfigRequestPB& req,
- StdStatusCallback client_cb,
+ StatusCallback client_cb,
boost::optional<tserver::TabletServerErrorPB::Code>* error_code);
// Implement a BulkChangeConfig() request.
Status BulkChangeConfig(const BulkChangeConfigRequestPB& req,
- StdStatusCallback client_cb,
+ StatusCallback client_cb,
boost::optional<tserver::TabletServerErrorPB::Code>* error_code);
// Implement an UnsafeChangeConfig() request.
@@ -494,7 +494,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
Status ReplicateConfigChangeUnlocked(
RaftConfigPB old_config,
RaftConfigPB new_config,
- StdStatusCallback client_cb);
+ StatusCallback client_cb);
// Update the peers and queue to be consistent with a new active configuration.
// Should only be called by the leader.
@@ -707,7 +707,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// Calls MarkDirty() if 'status' == OK. Then, always calls 'client_cb' with
// 'status' as its argument.
void MarkDirtyOnSuccess(const std::string& reason,
- const StdStatusCallback& client_cb,
+ const StatusCallback& client_cb,
const Status& status);
// Attempt to remove the follower with the specified 'uuid' from the config,
@@ -752,7 +752,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
//
// NOTE: Must be called while holding 'lock_'.
void NonTxRoundReplicationFinished(ConsensusRound* round,
- const StdStatusCallback& client_cb,
+ const StatusCallback& client_cb,
const Status& status);
// As a leader, append a new ConsensusRound to the queue.
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index d00e226..ebdb8cc 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -17,6 +17,7 @@
#include <algorithm>
#include <cstdint>
+#include <functional>
#include <memory>
#include <ostream>
#include <string>
@@ -277,7 +278,7 @@ class RaftConsensusQuorumTest : public KuduTest {
// Use a latch in place of a Transaction callback.
unique_ptr<Synchronizer> sync(new Synchronizer());
- *round = peer->NewRound(std::move(msg), sync->AsStdStatusCallback());
+ *round = peer->NewRound(std::move(msg), sync->AsStatusCallback());
EmplaceOrDie(&syncs_, round->get(), std::move(sync));
RETURN_NOT_OK_PREPEND(peer->Replicate(round->get()),
Substitute("Unable to replicate to peer $0", peer_idx));
@@ -293,10 +294,11 @@ class RaftConsensusQuorumTest : public KuduTest {
shared_ptr<Synchronizer>* commit_sync = nullptr) {
StatusCallback commit_callback;
if (commit_sync != nullptr) {
- commit_sync->reset(new Synchronizer());
- commit_callback = Bind(&FireSharedSynchronizer, *commit_sync);
+ shared_ptr<Synchronizer> sync(std::make_shared<Synchronizer>());
+ commit_callback = [sync](const Status& s) { FireSharedSynchronizer(sync, s); };
+ *commit_sync = std::move(sync);
} else {
- commit_callback = Bind(&DoNothingStatusCB);
+ commit_callback = &DoNothingStatusCB;
}
unique_ptr<CommitMsg> msg(new CommitMsg());
diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc
index c49b484..8acaade 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -31,7 +31,6 @@
#include <rapidjson/document.h>
#include <rapidjson/error/en.h>
-#include "kudu/gutil/callback.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
@@ -132,7 +131,7 @@ void HmsNotificationLogListenerTask::RunLoop() {
// Wakeup all threads which enqueued before beginning the poll.
for (auto& cb : callback_batch) {
- cb.Run(s);
+ cb(s);
}
callback_batch.clear();
@@ -164,7 +163,7 @@ void HmsNotificationLogListenerTask::RunLoop() {
}
for (auto& cb : callback_batch) {
- cb.Run(Status::ServiceUnavailable(kShutdownMessage));
+ cb(Status::ServiceUnavailable(kShutdownMessage));
}
}
diff --git a/src/kudu/master/sentry_privileges_fetcher.cc b/src/kudu/master/sentry_privileges_fetcher.cc
index 74364db..b9d35b7 100644
--- a/src/kudu/master/sentry_privileges_fetcher.cc
+++ b/src/kudu/master/sentry_privileges_fetcher.cc
@@ -33,7 +33,6 @@
#include <glog/logging.h>
#include "kudu/common/table_util.h"
-#include "kudu/gutil/callback.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
@@ -757,7 +756,7 @@ Status SentryPrivilegesFetcher::GetSentryPrivileges(
}
CHECK_LE(1, info.callbacks.size());
for (auto& cb : info.callbacks) {
- cb.Run(s);
+ cb(s);
}
RETURN_NOT_OK(s);
*privileges = *fetched_privileges;
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 1b76610..4f7b8ae 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -99,7 +99,7 @@ SubprocessServer::~SubprocessServer() {
Shutdown();
}
-void SubprocessServer::StartSubprocessThread(const StdStatusCallback& cb) {
+void SubprocessServer::StartSubprocessThread(const StatusCallback& cb) {
Status s = process_->Start();
cb(s);
if (PREDICT_TRUE(s.ok())) {
@@ -112,7 +112,7 @@ void SubprocessServer::StartSubprocessThread(const StdStatusCallback& cb) {
Status SubprocessServer::Init() {
VLOG(2) << "Starting the subprocess";
Synchronizer sync;
- auto cb = sync.AsStdStatusCallback();
+ auto cb = sync.AsStatusCallback();
RETURN_NOT_OK(Thread::Create("subprocess", "start",
[this, &cb]() { this->StartSubprocessThread(cb); },
&read_thread_));
@@ -147,7 +147,7 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
DCHECK(!req->has_id());
req->set_id(next_id_++);
Synchronizer sync;
- auto cb = sync.AsStdStatusCallback();
+ auto cb = sync.AsStatusCallback();
// Before adding to the queue, record the size of the call queue.
metrics_.server_outbound_queue_size_bytes->Increment(outbound_call_queue_.size());
CallAndTimer call_and_timer = {
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index 3c33e5b..ce0a2df 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -87,7 +87,7 @@ class SubprocessCall {
public:
SubprocessCall(const SubprocessRequestPB* req,
SubprocessResponsePB* resp,
- StdStatusCallback* cb,
+ StatusCallback* cb,
MonoTime deadline)
: id_(req->id()),
deadline_(deadline),
@@ -168,7 +168,7 @@ class SubprocessCall {
// Callback to wake up the caller that enqueued this call. This is called
// exactly once per SubprocessCall.
- StdStatusCallback* cb_;
+ StatusCallback* cb_;
};
// Used by BlockingQueue to determine the size of messages.
@@ -230,7 +230,7 @@ class SubprocessServer {
private:
FRIEND_TEST(SubprocessServerTest, TestCallsReturnWhenShuttingDown);
- void StartSubprocessThread(const StdStatusCallback& cb);
+ void StartSubprocessThread(const StatusCallback& cb);
// Stop the subprocess and stop processing messages.
void Shutdown();
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index a05f1eb..7e1abf1 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -51,7 +51,6 @@
#include "kudu/consensus/opid.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/fs/io_context.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/human_readable.h"
@@ -379,7 +378,7 @@ void Tablet::Shutdown() {
// ShutDown(), and need to flush the metadata to indicate that the tablet is deleted.
// During that flush, we don't want metadata to call back into the Tablet, so we
// have to unregister the pre-flush callback.
- metadata_->SetPreFlushCallback(Bind(DoNothingStatusClosure));
+ metadata_->SetPreFlushCallback(&DoNothingStatusClosure);
}
Status Tablet::GetMappedReadProjection(const Schema& projection,
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 32dedbe..7521d85 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -18,6 +18,7 @@
#include "kudu/tablet/tablet_bootstrap.h"
#include <cstdint>
+#include <functional>
#include <iterator>
#include <map>
#include <memory>
@@ -54,7 +55,6 @@
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/fs/io_context.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
@@ -529,10 +529,10 @@ Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
CHECK((*rebuilt_tablet && *rebuilt_log) || !bootstrap_status.ok())
<< "Tablet and Log not initialized";
if (bootstrap_status.ok()) {
+ auto cb = make_scoped_refptr(new FlushInflightsToLogCallback(
+ rebuilt_tablet->get(), *rebuilt_log));
tablet_meta_->SetPreFlushCallback(
- Bind(&FlushInflightsToLogCallback::WaitForInflightsAndFlushLog,
- make_scoped_refptr(new FlushInflightsToLogCallback(
- rebuilt_tablet->get(), *rebuilt_log))));
+ [cb]() -> Status { return cb->WaitForInflightsAndFlushLog(); });
}
// This will cause any pending TabletMetadata flush to be executed.
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 754f53c..297ff89 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -18,6 +18,7 @@
#include "kudu/tablet/tablet_metadata.h"
#include <algorithm>
+#include <functional>
#include <mutex>
#include <ostream>
#include <string>
@@ -36,7 +37,6 @@
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/atomicops.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
@@ -300,7 +300,7 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id,
num_flush_pins_(0),
needs_flush_(false),
flush_count_for_tests_(0),
- pre_flush_callback_(Bind(DoNothingStatusClosure)),
+ pre_flush_callback_(&DoNothingStatusClosure),
supports_live_row_count_(supports_live_row_count) {
CHECK(schema_->has_column_ids());
CHECK_GT(schema_->num_key_columns(), 0);
@@ -320,7 +320,7 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id)
num_flush_pins_(0),
needs_flush_(false),
flush_count_for_tests_(0),
- pre_flush_callback_(Bind(DoNothingStatusClosure)),
+ pre_flush_callback_(&DoNothingStatusClosure),
supports_live_row_count_(false) {}
Status TabletMetadata::LoadFromDisk() {
@@ -580,7 +580,7 @@ Status TabletMetadata::Flush() {
// is persisted. See KUDU-701 for details.
orphaned.assign(orphaned_blocks_.begin(), orphaned_blocks_.end());
}
- pre_flush_callback_.Run();
+ pre_flush_callback_();
RETURN_NOT_OK(ReplaceSuperBlockUnlocked(pb));
TRACE("Metadata flushed");
l_flush.Unlock();
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index 2212356..0391936 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -33,7 +33,6 @@
#include "kudu/consensus/log.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/time_manager.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
@@ -527,9 +526,10 @@ void TransactionDriver::ApplyTask() {
{
TRACE_EVENT1("txn", "AsyncAppendCommit", "txn", this);
- CHECK_OK(log_->AsyncAppendCommit(std::move(commit_msg),
- Bind(CrashIfNotOkStatusCB,
- "Enqueued commit operation failed to write to WAL")));
+ CHECK_OK(log_->AsyncAppendCommit(
+ std::move(commit_msg), [](const Status& s) {
+ CrashIfNotOkStatusCB("Enqueued commit operation failed to write to WAL", s);
+ }));
}
// If the client requested COMMIT_WAIT as the external consistency mode
diff --git a/src/kudu/thrift/client.h b/src/kudu/thrift/client.h
index ed5f2ee..6dad0aa 100644
--- a/src/kudu/thrift/client.h
+++ b/src/kudu/thrift/client.h
@@ -196,7 +196,7 @@ template<typename Service>
Status HaClient<Service>::Execute(std::function<Status(Service*)> task) {
const MonoTime start_time(MonoTime::Now());
Synchronizer synchronizer;
- auto callback = synchronizer.AsStdStatusCallback();
+ auto callback = synchronizer.AsStatusCallback();
// TODO(todd): wrapping this in a TRACE_EVENT scope and a LOG_IF_SLOW and such
// would be helpful. Perhaps a TRACE message and/or a TRACE_COUNTER_INCREMENT
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 3015734..a51450f 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -399,7 +399,7 @@ void HandleResponse(const ReqType* req, RespType* resp,
}
template <class ReqType, class RespType>
-static StdStatusCallback BindHandleResponse(
+static StatusCallback BindHandleResponse(
const ReqType* req,
RespType* resp,
RpcContext* context) {
diff --git a/src/kudu/util/async_util-test.cc b/src/kudu/util/async_util-test.cc
index 91f2baa..3fcad25 100644
--- a/src/kudu/util/async_util-test.cc
+++ b/src/kudu/util/async_util-test.cc
@@ -26,7 +26,6 @@
#include <gtest/gtest.h>
#include "kudu/gutil/basictypes.h"
-#include "kudu/gutil/callback.h"
#include "kudu/util/monotime.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
@@ -69,12 +68,12 @@ TEST_F(AsyncUtilTest, TestSynchronizerCompletion) {
ignore_result(sync.Wait());
});
SleepFor(MonoDelta::FromMilliseconds(5));
- cb.Run(Status::OK());
+ cb(Status::OK());
waiter.join();
}
sync.Reset();
{
- auto cb = sync.AsStdStatusCallback();
+ auto cb = sync.AsStatusCallback();
auto waiter = thread([sync] {
ignore_result(sync.Wait());
});
@@ -118,7 +117,7 @@ TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitSuccess) {
auto cb = sync.AsStatusCallback();
auto waiter = thread([cb] {
SleepFor(MonoDelta::FromMilliseconds(5));
- cb.Run(Status::OK());
+ cb(Status::OK());
});
SCOPED_CLEANUP({
waiter.join();
@@ -144,7 +143,7 @@ TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitTimeout) {
auto cb = sync.AsStatusCallback();
auto waiter = thread([cb] {
SleepFor(MonoDelta::FromMilliseconds(1000));
- cb.Run(Status::OK());
+ cb(Status::OK());
});
SCOPED_CLEANUP({
waiter.join();
diff --git a/src/kudu/util/async_util.h b/src/kudu/util/async_util.h
index 61621d6..56b5a04 100644
--- a/src/kudu/util/async_util.h
+++ b/src/kudu/util/async_util.h
@@ -16,13 +16,11 @@
// under the License.
//
// Utility functions which are handy when doing async/callback-based programming.
-
#pragma once
#include <functional>
#include <memory>
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/macros.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/status.h"
@@ -52,11 +50,8 @@ class Synchronizer {
}
StatusCallback AsStatusCallback() {
- return Bind(Data::Callback, std::weak_ptr<Data>(data_));
- }
-
- StdStatusCallback AsStdStatusCallback() {
- return std::bind(Data::Callback, std::weak_ptr<Data>(data_), std::placeholders::_1);
+ std::weak_ptr<Data> w_data(data_);
+ return [w_data](const Status& s) { Data::Callback(w_data, s); };
}
Status Wait() const {
diff --git a/src/kudu/util/net/dns_resolver.cc b/src/kudu/util/net/dns_resolver.cc
index b3ba1ed..75a12b7 100644
--- a/src/kudu/util/net/dns_resolver.cc
+++ b/src/kudu/util/net/dns_resolver.cc
@@ -83,13 +83,13 @@ void DnsResolver::ResolveAddressesAsync(const HostPort& hostport,
vector<Sockaddr>* addresses,
const StatusCallback& cb) {
if (GetCachedAddresses(hostport, addresses)) {
- return cb.Run(Status::OK());
+ return cb(Status::OK());
}
const auto s = pool_->Submit([=]() {
this->DoResolutionCb(hostport, addresses, cb);
});
if (!s.ok()) {
- cb.Run(s);
+ cb(s);
}
}
@@ -123,7 +123,7 @@ Status DnsResolver::DoResolution(const HostPort& hostport,
void DnsResolver::DoResolutionCb(const HostPort& hostport,
vector<Sockaddr>* addresses,
const StatusCallback& cb) {
- cb.Run(DoResolution(hostport, addresses));
+ cb(DoResolution(hostport, addresses));
}
bool DnsResolver::GetCachedAddresses(const HostPort& hostport,
diff --git a/src/kudu/util/status_callback.h b/src/kudu/util/status_callback.h
index 70bbb97..c7fa141 100644
--- a/src/kudu/util/status_callback.h
+++ b/src/kudu/util/status_callback.h
@@ -14,26 +14,18 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_UTIL_STATUS_CALLBACK_H
-#define KUDU_UTIL_STATUS_CALLBACK_H
+#pragma once
#include <functional>
#include <string>
-#include "kudu/gutil/callback_forward.h"
-
namespace kudu {
class Status;
// A callback which takes a Status. This is typically used for functions which
// produce asynchronous results and may fail.
-typedef Callback<void(const Status& status)> StatusCallback;
-
-// Like StatusCallback but uses the STL function objects.
-//
-// TODO(adar): should eventually replace all StatusCallback usage with this.
-typedef std::function<void(const Status& status)> StdStatusCallback;
+typedef std::function<void(const Status& status)> StatusCallback;
// To be used when a function signature requires a StatusCallback but none
// is needed.
@@ -44,11 +36,9 @@ extern void CrashIfNotOkStatusCB(const std::string& message, const Status& statu
// A closure (callback without arguments) that returns a Status indicating
// whether it was successful or not.
-typedef Callback<Status(void)> StatusClosure;
+typedef std::function<Status(void)> StatusClosure;
// To be used when setting a StatusClosure is optional.
extern Status DoNothingStatusClosure();
} // namespace kudu
-
-#endif