You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/12/15 02:30:40 UTC
[kudu] branch master updated: [client] small cleanup on
AsyncRandomTxnManagerRpc
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 49c922b [client] small cleanup on AsyncRandomTxnManagerRpc
49c922b is described below
commit 49c922b5cc3e5d7ee885c6f355eec973a5334441
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Dec 11 20:47:53 2020 -0800
[client] small cleanup on AsyncRandomTxnManagerRpc
This patch changes AsyncRandomTxnManagerRpc so it owns the request
protobuf message. The rationale: there is no need to keep a pointer
because the request message is no longer needed at the call sites
once it's populated and passed to the AsyncRandomTxnManagerRpc's
constructor. I also modified the call sites accordingly to use
move semantics for the corresponding argument while calling the
constructor.
This patch does not contain any functional changes.
Change-Id: Ide5b96644832bbcd2c3b08e72aee49f993ddb95f
Reviewed-on: http://gerrit.cloudera.org:8080/16867
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/client/transaction-internal.cc | 122 ++++++++++++++++---------------
src/kudu/client/txn_manager_proxy_rpc.cc | 6 +-
src/kudu/client/txn_manager_proxy_rpc.h | 4 +-
3 files changed, 70 insertions(+), 62 deletions(-)
diff --git a/src/kudu/client/transaction-internal.cc b/src/kudu/client/transaction-internal.cc
index 31e09ce..85c732b 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -102,18 +102,19 @@ Status KuduTransaction::Data::Begin(const sp::shared_ptr<KuduTransaction>& txn)
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
- const auto deadline = GetRpcDeadline(c.get());
- Synchronizer sync;
- BeginTransactionRequestPB req;
+
BeginTransactionResponsePB resp;
- // TODO(aserbin): should there be no retries for sending keepalive heartbeats?
- AsyncRandomTxnManagerRpc<
- BeginTransactionRequestPB, BeginTransactionResponsePB> rpc(
- deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
- &TxnManagerServiceProxy::BeginTransactionAsync, "BeginTransaction",
- sync.AsStatusCallback());
- rpc.SendRpc();
- RETURN_NOT_OK(sync.Wait());
+ {
+ Synchronizer sync;
+ BeginTransactionRequestPB req;
+ AsyncRandomTxnManagerRpc<BeginTransactionRequestPB,
+ BeginTransactionResponsePB> rpc(
+ GetRpcDeadline(c.get()), c.get(), BackoffType::EXPONENTIAL,
+ std::move(req), &resp, &TxnManagerServiceProxy::BeginTransactionAsync,
+ "BeginTransaction", sync.AsStatusCallback());
+ rpc.SendRpc();
+ RETURN_NOT_OK(sync.Wait());
+ }
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
@@ -149,18 +150,21 @@ Status KuduTransaction::Data::Commit(bool wait) {
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
+
const auto deadline = GetRpcDeadline(c.get());
- Synchronizer sync;
- CommitTransactionRequestPB req;
- req.set_txn_id(txn_id_);
CommitTransactionResponsePB resp;
- AsyncRandomTxnManagerRpc<CommitTransactionRequestPB,
- CommitTransactionResponsePB> rpc(
- deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
- &TxnManagerServiceProxy::CommitTransactionAsync, "CommitTransaction",
- sync.AsStatusCallback());
- rpc.SendRpc();
- RETURN_NOT_OK(sync.Wait());
+ {
+ Synchronizer sync;
+ CommitTransactionRequestPB req;
+ req.set_txn_id(txn_id_);
+ AsyncRandomTxnManagerRpc<CommitTransactionRequestPB,
+ CommitTransactionResponsePB> rpc(
+ deadline, c.get(), BackoffType::EXPONENTIAL,
+ std::move(req), &resp, &TxnManagerServiceProxy::CommitTransactionAsync,
+ "CommitTransaction", sync.AsStatusCallback());
+ rpc.SendRpc();
+ RETURN_NOT_OK(sync.Wait());
+ }
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
@@ -190,18 +194,20 @@ Status KuduTransaction::Data::Rollback() {
if (!c) {
return Status::IllegalState("associated KuduClient is gone");
}
- const auto deadline = GetRpcDeadline(c.get());
- Synchronizer sync;
- AbortTransactionRequestPB req;
- req.set_txn_id(txn_id_);
+
AbortTransactionResponsePB resp;
- AsyncRandomTxnManagerRpc<AbortTransactionRequestPB,
- AbortTransactionResponsePB> rpc(
- deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
- &TxnManagerServiceProxy::AbortTransactionAsync, "AbortTransaction",
- sync.AsStatusCallback());
- rpc.SendRpc();
- RETURN_NOT_OK(sync.Wait());
+ {
+ Synchronizer sync;
+ AbortTransactionRequestPB req;
+ req.set_txn_id(txn_id_);
+ AsyncRandomTxnManagerRpc<AbortTransactionRequestPB,
+ AbortTransactionResponsePB> rpc(
+ GetRpcDeadline(c.get()), c.get(), BackoffType::EXPONENTIAL,
+ std::move(req), &resp, &TxnManagerServiceProxy::AbortTransactionAsync,
+ "AbortTransaction", sync.AsStatusCallback());
+ rpc.SendRpc();
+ RETURN_NOT_OK(sync.Wait());
+ }
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
@@ -275,17 +281,19 @@ Status KuduTransaction::Data::IsCommitCompleteImpl(
bool* is_complete,
Status* completion_status) {
DCHECK(client);
- Synchronizer sync;
- GetTransactionStateRequestPB req;
- req.set_txn_id(txn_id);
GetTransactionStateResponsePB resp;
- AsyncRandomTxnManagerRpc<GetTransactionStateRequestPB,
- GetTransactionStateResponsePB> rpc(
- deadline, client, BackoffType::EXPONENTIAL, req, &resp,
- &TxnManagerServiceProxy::GetTransactionStateAsync, "GetTransactionState",
- sync.AsStatusCallback());
- rpc.SendRpc();
- RETURN_NOT_OK(sync.Wait());
+ {
+ Synchronizer sync;
+ GetTransactionStateRequestPB req;
+ req.set_txn_id(txn_id);
+ AsyncRandomTxnManagerRpc<GetTransactionStateRequestPB,
+ GetTransactionStateResponsePB> rpc(
+ deadline, client, BackoffType::EXPONENTIAL, std::move(req), &resp,
+ &TxnManagerServiceProxy::GetTransactionStateAsync, "GetTransactionState",
+ sync.AsStatusCallback());
+ rpc.SendRpc();
+ RETURN_NOT_OK(sync.Wait());
+ }
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
@@ -377,25 +385,25 @@ void KuduTransaction::Data::SendTxnKeepAliveTask(
const auto next_run_after = MonoDelta::FromMilliseconds(
std::max<uint32_t>(1, txn->data_->txn_keep_alive_ms_ / 2));
auto deadline = MonoTime::Now() + next_run_after;
- KeepTransactionAliveRequestPB req;
- req.set_txn_id(txn_id);
sp::shared_ptr<KeepaliveRpcCtx> ctx(new KeepaliveRpcCtx);
ctx->weak_txn = weak_txn;
- unique_ptr<AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
- KeepTransactionAliveResponsePB>> rpc(
- new AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
- KeepTransactionAliveResponsePB>(
- deadline, c.get(), BackoffType::LINEAR,
- req, &ctx->resp,
- &TxnManagerServiceProxy::KeepTransactionAliveAsync,
- "KeepTransactionAlive",
- [ctx](const Status& s) {
- TxnKeepAliveCb(s, std::move(ctx));
- }
- ));
- ctx->rpc = std::move(rpc);
+ {
+ KeepTransactionAliveRequestPB req;
+ req.set_txn_id(txn_id);
+ unique_ptr<AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+ KeepTransactionAliveResponsePB>> rpc(
+ new AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+ KeepTransactionAliveResponsePB>(
+ deadline, c.get(), BackoffType::LINEAR, std::move(req), &ctx->resp,
+ &TxnManagerServiceProxy::KeepTransactionAliveAsync,
+ "KeepTransactionAlive",
+ [ctx](const Status& s) {
+ TxnKeepAliveCb(s, std::move(ctx));
+ }));
+ ctx->rpc = std::move(rpc);
+ }
// Send the RPC and handle the response asynchronously.
ctx->rpc->SendRpc();
diff --git a/src/kudu/client/txn_manager_proxy_rpc.cc b/src/kudu/client/txn_manager_proxy_rpc.cc
index 12bf81c..c58d2df 100644
--- a/src/kudu/client/txn_manager_proxy_rpc.cc
+++ b/src/kudu/client/txn_manager_proxy_rpc.cc
@@ -69,7 +69,7 @@ AsyncRandomTxnManagerRpc<ReqClass, RespClass>::AsyncRandomTxnManagerRpc(
const MonoTime& deadline,
KuduClient* client,
BackoffType backoff,
- const ReqClass& req,
+ ReqClass req,
RespClass* resp,
const std::function<void(TxnManagerServiceProxy*,
const ReqClass&, RespClass*,
@@ -79,7 +79,7 @@ AsyncRandomTxnManagerRpc<ReqClass, RespClass>::AsyncRandomTxnManagerRpc(
StatusCallback user_cb)
: Rpc(deadline, client->data_->messenger_, backoff),
client_(client),
- req_(&req),
+ req_(std::move(req)),
resp_(resp),
func_(func),
rpc_name_(std::move(rpc_name)),
@@ -103,7 +103,7 @@ void AsyncRandomTxnManagerRpc<ReqClass, RespClass>::SendRpc() {
RpcController* controller = mutable_retrier()->mutable_controller();
controller->Reset();
controller->set_deadline(deadline);
- func_(client_->data_->txn_manager_proxy().get(), *req_, resp_, controller,
+ func_(client_->data_->txn_manager_proxy().get(), req_, resp_, controller,
[this]() { this->SendRpcCb(Status::OK()); });
}
diff --git a/src/kudu/client/txn_manager_proxy_rpc.h b/src/kudu/client/txn_manager_proxy_rpc.h
index 501c9e7..00b0bc0 100644
--- a/src/kudu/client/txn_manager_proxy_rpc.h
+++ b/src/kudu/client/txn_manager_proxy_rpc.h
@@ -67,7 +67,7 @@ class AsyncRandomTxnManagerRpc : public rpc::Rpc {
const MonoTime& deadline,
KuduClient* client,
rpc::BackoffType backoff,
- const ReqClass& req,
+ ReqClass req,
RespClass* resp,
const std::function<void(transactions::TxnManagerServiceProxy*,
const ReqClass&, RespClass*,
@@ -108,7 +108,7 @@ class AsyncRandomTxnManagerRpc : public rpc::Rpc {
bool RetryIfNecessary(Status* status);
KuduClient* client_;
- const ReqClass* req_;
+ const ReqClass req_;
RespClass* resp_;
// Asynchronous function that sends an RPC to current TxnManager.