You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/12/15 02:30:40 UTC

[kudu] branch master updated: [client] small cleanup on AsyncRandomTxnManagerRpc

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 49c922b  [client] small cleanup on AsyncRandomTxnManagerRpc
49c922b is described below

commit 49c922b5cc3e5d7ee885c6f355eec973a5334441
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Dec 11 20:47:53 2020 -0800

    [client] small cleanup on AsyncRandomTxnManagerRpc
    
    This patch changes AsyncRandomTxnManagerRpc so it owns the request
    protobuf message.  The rationale: there is no need to keep a pointer
    because the request message is no longer needed at the call sites
    once it's populated and passed to the AsyncRandomTxnManagerRpc's
    constructor.  I also modified the call sites accordingly to use
    move semantics for the corresponding argument while calling the
    constructor.
    
    This patch does not contain any functional changes.
    
    Change-Id: Ide5b96644832bbcd2c3b08e72aee49f993ddb95f
    Reviewed-on: http://gerrit.cloudera.org:8080/16867
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/transaction-internal.cc  | 122 ++++++++++++++++---------------
 src/kudu/client/txn_manager_proxy_rpc.cc |   6 +-
 src/kudu/client/txn_manager_proxy_rpc.h  |   4 +-
 3 files changed, 70 insertions(+), 62 deletions(-)

diff --git a/src/kudu/client/transaction-internal.cc b/src/kudu/client/transaction-internal.cc
index 31e09ce..85c732b 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -102,18 +102,19 @@ Status KuduTransaction::Data::Begin(const sp::shared_ptr<KuduTransaction>& txn)
   if (!c) {
     return Status::IllegalState("associated KuduClient is gone");
   }
-  const auto deadline = GetRpcDeadline(c.get());
-  Synchronizer sync;
-  BeginTransactionRequestPB req;
+
   BeginTransactionResponsePB resp;
-  // TODO(aserbin): should there be no retries for sending keepalive heartbeats?
-  AsyncRandomTxnManagerRpc<
-      BeginTransactionRequestPB, BeginTransactionResponsePB> rpc(
-      deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
-      &TxnManagerServiceProxy::BeginTransactionAsync, "BeginTransaction",
-      sync.AsStatusCallback());
-  rpc.SendRpc();
-  RETURN_NOT_OK(sync.Wait());
+  {
+    Synchronizer sync;
+    BeginTransactionRequestPB req;
+    AsyncRandomTxnManagerRpc<BeginTransactionRequestPB,
+                             BeginTransactionResponsePB> rpc(
+        GetRpcDeadline(c.get()), c.get(), BackoffType::EXPONENTIAL,
+        std::move(req), &resp, &TxnManagerServiceProxy::BeginTransactionAsync,
+        "BeginTransaction", sync.AsStatusCallback());
+    rpc.SendRpc();
+    RETURN_NOT_OK(sync.Wait());
+  }
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
@@ -149,18 +150,21 @@ Status KuduTransaction::Data::Commit(bool wait) {
   if (!c) {
     return Status::IllegalState("associated KuduClient is gone");
   }
+
   const auto deadline = GetRpcDeadline(c.get());
-  Synchronizer sync;
-  CommitTransactionRequestPB req;
-  req.set_txn_id(txn_id_);
   CommitTransactionResponsePB resp;
-  AsyncRandomTxnManagerRpc<CommitTransactionRequestPB,
-                           CommitTransactionResponsePB> rpc(
-      deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
-      &TxnManagerServiceProxy::CommitTransactionAsync, "CommitTransaction",
-      sync.AsStatusCallback());
-  rpc.SendRpc();
-  RETURN_NOT_OK(sync.Wait());
+  {
+    Synchronizer sync;
+    CommitTransactionRequestPB req;
+    req.set_txn_id(txn_id_);
+    AsyncRandomTxnManagerRpc<CommitTransactionRequestPB,
+                             CommitTransactionResponsePB> rpc(
+        deadline, c.get(), BackoffType::EXPONENTIAL,
+        std::move(req), &resp, &TxnManagerServiceProxy::CommitTransactionAsync,
+        "CommitTransaction", sync.AsStatusCallback());
+    rpc.SendRpc();
+    RETURN_NOT_OK(sync.Wait());
+  }
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
@@ -190,18 +194,20 @@ Status KuduTransaction::Data::Rollback() {
   if (!c) {
     return Status::IllegalState("associated KuduClient is gone");
   }
-  const auto deadline = GetRpcDeadline(c.get());
-  Synchronizer sync;
-  AbortTransactionRequestPB req;
-  req.set_txn_id(txn_id_);
+
   AbortTransactionResponsePB resp;
-  AsyncRandomTxnManagerRpc<AbortTransactionRequestPB,
-                           AbortTransactionResponsePB> rpc(
-      deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
-      &TxnManagerServiceProxy::AbortTransactionAsync, "AbortTransaction",
-      sync.AsStatusCallback());
-  rpc.SendRpc();
-  RETURN_NOT_OK(sync.Wait());
+  {
+    Synchronizer sync;
+    AbortTransactionRequestPB req;
+    req.set_txn_id(txn_id_);
+    AsyncRandomTxnManagerRpc<AbortTransactionRequestPB,
+                             AbortTransactionResponsePB> rpc(
+        GetRpcDeadline(c.get()), c.get(), BackoffType::EXPONENTIAL,
+        std::move(req), &resp, &TxnManagerServiceProxy::AbortTransactionAsync,
+        "AbortTransaction", sync.AsStatusCallback());
+    rpc.SendRpc();
+    RETURN_NOT_OK(sync.Wait());
+  }
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
@@ -275,17 +281,19 @@ Status KuduTransaction::Data::IsCommitCompleteImpl(
     bool* is_complete,
     Status* completion_status) {
   DCHECK(client);
-  Synchronizer sync;
-  GetTransactionStateRequestPB req;
-  req.set_txn_id(txn_id);
   GetTransactionStateResponsePB resp;
-  AsyncRandomTxnManagerRpc<GetTransactionStateRequestPB,
-                           GetTransactionStateResponsePB> rpc(
-      deadline, client, BackoffType::EXPONENTIAL, req, &resp,
-      &TxnManagerServiceProxy::GetTransactionStateAsync, "GetTransactionState",
-      sync.AsStatusCallback());
-  rpc.SendRpc();
-  RETURN_NOT_OK(sync.Wait());
+  {
+    Synchronizer sync;
+    GetTransactionStateRequestPB req;
+    req.set_txn_id(txn_id);
+    AsyncRandomTxnManagerRpc<GetTransactionStateRequestPB,
+                             GetTransactionStateResponsePB> rpc(
+        deadline, client, BackoffType::EXPONENTIAL, std::move(req), &resp,
+        &TxnManagerServiceProxy::GetTransactionStateAsync, "GetTransactionState",
+        sync.AsStatusCallback());
+    rpc.SendRpc();
+    RETURN_NOT_OK(sync.Wait());
+  }
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
@@ -377,25 +385,25 @@ void KuduTransaction::Data::SendTxnKeepAliveTask(
   const auto next_run_after = MonoDelta::FromMilliseconds(
       std::max<uint32_t>(1, txn->data_->txn_keep_alive_ms_ / 2));
   auto deadline = MonoTime::Now() + next_run_after;
-  KeepTransactionAliveRequestPB req;
-  req.set_txn_id(txn_id);
 
   sp::shared_ptr<KeepaliveRpcCtx> ctx(new KeepaliveRpcCtx);
   ctx->weak_txn = weak_txn;
 
-  unique_ptr<AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
-                                      KeepTransactionAliveResponsePB>> rpc(
-      new AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
-                                   KeepTransactionAliveResponsePB>(
-          deadline, c.get(), BackoffType::LINEAR,
-          req, &ctx->resp,
-          &TxnManagerServiceProxy::KeepTransactionAliveAsync,
-          "KeepTransactionAlive",
-          [ctx](const Status& s) {
-            TxnKeepAliveCb(s, std::move(ctx));
-          }
-  ));
-  ctx->rpc = std::move(rpc);
+  {
+    KeepTransactionAliveRequestPB req;
+    req.set_txn_id(txn_id);
+    unique_ptr<AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+                                        KeepTransactionAliveResponsePB>> rpc(
+        new AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+                                     KeepTransactionAliveResponsePB>(
+            deadline, c.get(), BackoffType::LINEAR, std::move(req), &ctx->resp,
+            &TxnManagerServiceProxy::KeepTransactionAliveAsync,
+            "KeepTransactionAlive",
+            [ctx](const Status& s) {
+              TxnKeepAliveCb(s, std::move(ctx));
+            }));
+    ctx->rpc = std::move(rpc);
+  }
 
   // Send the RPC and handle the response asynchronously.
   ctx->rpc->SendRpc();
diff --git a/src/kudu/client/txn_manager_proxy_rpc.cc b/src/kudu/client/txn_manager_proxy_rpc.cc
index 12bf81c..c58d2df 100644
--- a/src/kudu/client/txn_manager_proxy_rpc.cc
+++ b/src/kudu/client/txn_manager_proxy_rpc.cc
@@ -69,7 +69,7 @@ AsyncRandomTxnManagerRpc<ReqClass, RespClass>::AsyncRandomTxnManagerRpc(
     const MonoTime& deadline,
     KuduClient* client,
     BackoffType backoff,
-    const ReqClass& req,
+    ReqClass req,
     RespClass* resp,
     const std::function<void(TxnManagerServiceProxy*,
                              const ReqClass&, RespClass*,
@@ -79,7 +79,7 @@ AsyncRandomTxnManagerRpc<ReqClass, RespClass>::AsyncRandomTxnManagerRpc(
     StatusCallback user_cb)
     : Rpc(deadline, client->data_->messenger_, backoff),
       client_(client),
-      req_(&req),
+      req_(std::move(req)),
       resp_(resp),
       func_(func),
       rpc_name_(std::move(rpc_name)),
@@ -103,7 +103,7 @@ void AsyncRandomTxnManagerRpc<ReqClass, RespClass>::SendRpc() {
   RpcController* controller = mutable_retrier()->mutable_controller();
   controller->Reset();
   controller->set_deadline(deadline);
-  func_(client_->data_->txn_manager_proxy().get(), *req_, resp_, controller,
+  func_(client_->data_->txn_manager_proxy().get(), req_, resp_, controller,
         [this]() { this->SendRpcCb(Status::OK()); });
 }
 
diff --git a/src/kudu/client/txn_manager_proxy_rpc.h b/src/kudu/client/txn_manager_proxy_rpc.h
index 501c9e7..00b0bc0 100644
--- a/src/kudu/client/txn_manager_proxy_rpc.h
+++ b/src/kudu/client/txn_manager_proxy_rpc.h
@@ -67,7 +67,7 @@ class AsyncRandomTxnManagerRpc : public rpc::Rpc {
       const MonoTime& deadline,
       KuduClient* client,
       rpc::BackoffType backoff,
-      const ReqClass& req,
+      ReqClass req,
       RespClass* resp,
       const std::function<void(transactions::TxnManagerServiceProxy*,
                                const ReqClass&, RespClass*,
@@ -108,7 +108,7 @@ class AsyncRandomTxnManagerRpc : public rpc::Rpc {
   bool RetryIfNecessary(Status* status);
 
   KuduClient* client_;
-  const ReqClass* req_;
+  const ReqClass req_;
   RespClass* resp_;
 
   // Asynchronous function that sends an RPC to current TxnManager.