You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/09/28 21:56:52 UTC

[kudu] branch master updated: KUDU-2612 p6 (c): add GetTransactionStatus() to TxnSystemClient

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 01a9a5a  KUDU-2612 p6 (c): add GetTransactionStatus() to TxnSystemClient
01a9a5a is described below

commit 01a9a5a9ca4000862fa92c7cbff42967a067c326
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Sep 23 22:18:48 2020 -0700

    KUDU-2612 p6 (c): add GetTransactionStatus() to TxnSystemClient
    
    This patch adds GetTransactionStatus() method into the interface
    of the TxnSystemClient class.  Added corresponding test as well.
    
    This is a follow-up to cb1c2efb59373453e734074a02021f14c403257d
    and 0f9ff5ff043125be4a1150be0306373619b4ca89
    
    Change-Id: I7fac7158df307d03db6a48087e7c5a16269a3bc6
    Reviewed-on: http://gerrit.cloudera.org:8080/16501
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../integration-tests/txn_status_table-itest.cc    | 65 +++++++++++++++++++++-
 src/kudu/transactions/coordinator_rpc.cc           | 27 ++++++---
 src/kudu/transactions/coordinator_rpc.h            |  9 ++-
 src/kudu/transactions/txn_system_client.cc         | 38 ++++++++++++-
 src/kudu/transactions/txn_system_client.h          | 16 +++++-
 5 files changed, 136 insertions(+), 19 deletions(-)

diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc
index d26a1a7..3d85b7e 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -50,6 +50,7 @@
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/transactions/transactions.pb.h"
 #include "kudu/transactions/txn_status_tablet.h"
 #include "kudu/transactions/txn_system_client.h"
 #include "kudu/tserver/mini_tablet_server.h"
@@ -66,18 +67,20 @@ DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_string(superuser_acl);
 DECLARE_string(user_acl);
 
-using kudu::client::sp::shared_ptr;
 using kudu::client::AuthenticationCredentialsPB;
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
 using kudu::client::KuduTable;
+using kudu::client::sp::shared_ptr;
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
-using kudu::itest::TabletServerMap;
 using kudu::itest::TServerDetails;
+using kudu::itest::TabletServerMap;
 using kudu::tablet::TabletReplica;
-using kudu::transactions::TxnSystemClient;
+using kudu::transactions::TxnStatePB;
+using kudu::transactions::TxnStatusEntryPB;
 using kudu::transactions::TxnStatusTablet;
+using kudu::transactions::TxnSystemClient;
 using std::map;
 using std::string;
 using std::thread;
@@ -436,6 +439,7 @@ TEST_F(TxnStatusTableITest, SystemClientCommitAndAbortTransaction) {
 
   ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
   ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser));
+
   // Calling BeginCommitTransaction() on already committing transaction is OK.
   ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser));
   // It's completely legal to abort a transaction that has entered the commit
@@ -488,6 +492,61 @@ TEST_F(TxnStatusTableITest, SystemClientCommitAndAbortTransaction) {
   }
 }
 
+TEST_F(TxnStatusTableITest, GetTransactionStatus) {
+  const auto verify_state = [&](TxnStatePB state) {
+    TxnStatusEntryPB txn_status;
+    ASSERT_OK(txn_sys_client_->GetTransactionStatus(1, kUser, &txn_status));
+    ASSERT_TRUE(txn_status.has_user());
+    ASSERT_STREQ(kUser, txn_status.user().c_str());
+    ASSERT_TRUE(txn_status.has_state());
+    ASSERT_EQ(state, txn_status.state());
+  };
+
+  ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
+  ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+
+  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+  NO_FATALS(verify_state(TxnStatePB::OPEN));
+
+  ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser));
+  NO_FATALS(verify_state(TxnStatePB::COMMIT_IN_PROGRESS));
+
+  ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
+  NO_FATALS(verify_state(TxnStatePB::ABORTED));
+
+  {
+    auto s = txn_sys_client_->BeginCommitTransaction(1, kUser);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    NO_FATALS(verify_state(TxnStatePB::ABORTED));
+  }
+
+  // In the negative scenarios below, check for the expected status code
+  // and make sure nothing is set in the output argument.
+  {
+    TxnStatusEntryPB empty;
+    auto s = txn_sys_client_->GetTransactionStatus(1, "interloper", &empty);
+    ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+    ASSERT_FALSE(empty.has_user());
+    ASSERT_FALSE(empty.has_state());
+  }
+
+  {
+    TxnStatusEntryPB empty;
+    auto s = txn_sys_client_->GetTransactionStatus(2, kUser, &empty);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_FALSE(empty.has_user());
+    ASSERT_FALSE(empty.has_state());
+  }
+
+  {
+    TxnStatusEntryPB empty;
+    auto s = txn_sys_client_->GetTransactionStatus(2, "", &empty);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_FALSE(empty.has_user());
+    ASSERT_FALSE(empty.has_state());
+  }
+}
+
 // Test that a transaction system client can make concurrent calls to multiple
 // transaction status tablets.
 TEST_F(TxnStatusTableITest, TestSystemClientConcurrentCalls) {
diff --git a/src/kudu/transactions/coordinator_rpc.cc b/src/kudu/transactions/coordinator_rpc.cc
index e0870c8..d8fa3d6 100644
--- a/src/kudu/transactions/coordinator_rpc.cc
+++ b/src/kudu/transactions/coordinator_rpc.cc
@@ -49,6 +49,7 @@ using kudu::rpc::ErrorStatusPB;
 using kudu::rpc::ResponseCallback;
 using kudu::rpc::RetriableRpc;
 using kudu::rpc::RetriableRpcStatus;
+using kudu::tserver::CoordinatorOpResultPB;
 using std::string;
 using std::unique_ptr;
 using strings::Substitute;
@@ -57,9 +58,11 @@ namespace kudu {
 class MonoTime;
 namespace transactions {
 
-CoordinatorRpc* CoordinatorRpc::NewRpc(unique_ptr<TxnStatusTabletContext> ctx,
-                                       const MonoTime& deadline,
-                                       StatusCallback cb) {
+CoordinatorRpc* CoordinatorRpc::NewRpc(
+    unique_ptr<TxnStatusTabletContext> ctx,
+    const MonoTime& deadline,
+    StatusCallback cb,
+    CoordinatorOpResultPB* op_result) {
   KuduClient* client = ctx->table->client();
   scoped_refptr<MetaCacheServerPicker> server_picker(
       new MetaCacheServerPicker(client,
@@ -69,7 +72,8 @@ CoordinatorRpc* CoordinatorRpc::NewRpc(unique_ptr<TxnStatusTabletContext> ctx,
   CoordinatorRpc* rpc = new CoordinatorRpc(std::move(ctx),
                                            server_picker,
                                            deadline,
-                                           std::move(cb));
+                                           std::move(cb),
+                                           op_result);
   return rpc;
 }
 
@@ -85,6 +89,9 @@ void CoordinatorRpc::Finish(const Status& status) {
       resp_.has_op_result() && resp_.op_result().has_op_error()) {
     final_status = StatusFromPB(resp_.op_result().op_error());
   }
+  if (resp_.has_op_result() && op_result_) {
+    *op_result_ = resp_.op_result();
+  }
   cb_(final_status);
 }
 
@@ -99,7 +106,8 @@ bool CoordinatorRpc::GetNewAuthnTokenAndRetry() {
 CoordinatorRpc::CoordinatorRpc(unique_ptr<TxnStatusTabletContext> ctx,
                                const scoped_refptr<MetaCacheServerPicker>& replica_picker,
                                const MonoTime& deadline,
-                               StatusCallback cb)
+                               StatusCallback cb,
+                               CoordinatorOpResultPB* op_result)
     : RetriableRpc(replica_picker,
                    DCHECK_NOTNULL(ctx->table)->client()->data_->request_tracker_,
                    deadline,
@@ -107,15 +115,16 @@ CoordinatorRpc::CoordinatorRpc(unique_ptr<TxnStatusTabletContext> ctx,
       client_(ctx->table->client()),
       table_(std::move(ctx->table)),
       tablet_(std::move(ctx->tablet)),
-      cb_(std::move(cb)) {
+      cb_(std::move(cb)),
+      op_result_(op_result) {
   req_.set_txn_status_tablet_id(tablet_->tablet_id());
   *req_.mutable_op() = std::move(ctx->coordinate_txn_op);
 }
 
-void CoordinatorRpc::Try(RemoteTabletServer* replica, const ResponseCallback& callback) {
+void CoordinatorRpc::Try(RemoteTabletServer* replica,
+                         const ResponseCallback& callback) {
   replica->admin_proxy()->CoordinateTransactionAsync(
-      req_, &resp_, mutable_retrier()->mutable_controller(),
-      callback);
+      req_, &resp_, mutable_retrier()->mutable_controller(), callback);
 }
 
 // TODO(awong): much of this is borrowed from WriteRpc::AnalyzeResponse(). It'd
diff --git a/src/kudu/transactions/coordinator_rpc.h b/src/kudu/transactions/coordinator_rpc.h
index 00c56a6..2af39ff 100644
--- a/src/kudu/transactions/coordinator_rpc.h
+++ b/src/kudu/transactions/coordinator_rpc.h
@@ -55,9 +55,12 @@ class CoordinatorRpc final : public rpc::RetriableRpc<client::internal::RemoteTa
                                                       tserver::CoordinateTransactionRequestPB,
                                                       tserver::CoordinateTransactionResponsePB> {
  public:
+  // NOTE: if 'op_result' is non-null, the memory it points to should stay valid
+  //       until the RPC completes (i.e. until callback 'cb' is invoked).
   static CoordinatorRpc* NewRpc(std::unique_ptr<TxnStatusTabletContext> ctx,
                                 const MonoTime& deadline,
-                                StatusCallback cb);
+                                StatusCallback cb,
+                                tserver::CoordinatorOpResultPB* op_result = nullptr);
 
   ~CoordinatorRpc() {}
 
@@ -78,12 +81,14 @@ class CoordinatorRpc final : public rpc::RetriableRpc<client::internal::RemoteTa
   CoordinatorRpc(std::unique_ptr<TxnStatusTabletContext> ctx,
                  const scoped_refptr<client::internal::MetaCacheServerPicker>& replica_picker,
                  const MonoTime& deadline,
-                 StatusCallback cb);
+                 StatusCallback cb,
+                 tserver::CoordinatorOpResultPB* op_result);
 
   client::KuduClient* client_;
   client::sp::shared_ptr<client::KuduTable> table_;
   scoped_refptr<client::internal::RemoteTablet> tablet_;
   const StatusCallback cb_;
+  tserver::CoordinatorOpResultPB* op_result_;
 };
 
 } // namespace transactions
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index 3cb1d4e..27a909a 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -37,6 +37,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/transactions/coordinator_rpc.h"
 #include "kudu/transactions/txn_status_tablet.h"
+#include "kudu/transactions/transactions.pb.h"
 #include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/util/async_util.h"
 
@@ -48,6 +49,7 @@ using kudu::client::KuduTableAlterer;
 using kudu::client::KuduTableCreator;
 using kudu::client::internal::MetaCache;
 using kudu::tserver::CoordinatorOpPB;
+using kudu::tserver::CoordinatorOpResultPB;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -174,9 +176,38 @@ Status TxnSystemClient::AbortTransaction(int64_t txn_id,
   return s.Wait();
 }
 
+Status TxnSystemClient::GetTransactionStatus(int64_t txn_id,
+                                             const string& user,
+                                             TxnStatusEntryPB* txn_status,
+                                             MonoDelta timeout) {
+  DCHECK(txn_status);
+  CoordinatorOpPB coordinate_txn_op;
+  coordinate_txn_op.set_type(CoordinatorOpPB::GET_TXN_STATUS);
+  coordinate_txn_op.set_txn_id(txn_id);
+  coordinate_txn_op.set_user(user);
+  Synchronizer s;
+  CoordinatorOpResultPB result;
+  RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op),
+                                           timeout,
+                                           s.AsStatusCallback(),
+                                           &result));
+  const auto ret = s.Wait();
+  if (ret.ok()) {
+    // Retrieve the response and set corresponding output parameters.
+    DCHECK(!result.has_op_error());
+    DCHECK(result.has_txn_status());
+    TxnStatusEntryPB ret;
+    ret.set_state(result.txn_status().state());
+    ret.set_allocated_user(result.mutable_txn_status()->release_user());
+    *txn_status = std::move(ret);
+  }
+  return ret;
+}
+
 Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_txn_op,
                                                    const MonoDelta& timeout,
-                                                   const StatusCallback& cb) {
+                                                   const StatusCallback& cb,
+                                                   CoordinatorOpResultPB* result) {
   const MonoTime deadline = MonoTime::Now() + timeout;
   unique_ptr<TxnStatusTabletContext> ctx(
       new TxnStatusTabletContext({
@@ -200,7 +231,7 @@ Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_tx
       &ctx_raw->tablet,
       // TODO(awong): when we start using C++14, stack-allocate 'ctx' and
       // move capture it.
-      [cb, deadline, ctx_raw] (const Status& s) {
+      [cb, deadline, ctx_raw, result] (const Status& s) {
         // First, take ownership of the context.
         unique_ptr<TxnStatusTabletContext> ctx(ctx_raw);
 
@@ -213,7 +244,8 @@ Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_tx
         CoordinatorRpc* rpc = CoordinatorRpc::NewRpc(
             std::move(ctx),
             deadline,
-            cb);
+            cb,
+            result);
         rpc->SendRpc();
       });
   return Status::OK();
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index b2b74fd..f3c8568 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -44,10 +44,13 @@ class TxnStatusTableITest_TestProtectCreateAndAlter_Test;
 
 namespace tserver {
 class CoordinatorOpPB;
+class CoordinatorOpResultPB;
 } // namespace tserver
 
 namespace transactions {
 
+class TxnStatusEntryPB;
+
 // Wrapper around a KuduClient used by Kudu for making transaction-related
 // calls to various servers.
 class TxnSystemClient {
@@ -97,11 +100,20 @@ class TxnSystemClient {
                           const std::string& user,
                           MonoDelta timeout = MonoDelta::FromSeconds(10));
 
+  // Retrieves transactions status. On success, returns Status::OK() and stores
+  // the result status in the 'txn_status' output parameter. On failure,
+  // returns corresponding Status.
+  Status GetTransactionStatus(int64_t txn_id,
+                              const std::string& user,
+                              TxnStatusEntryPB* txn_status,
+                              MonoDelta timeout = MonoDelta::FromSeconds(10));
+
   // Opens the transaction status table, refreshing metadata with that from the
   // masters.
   Status OpenTxnStatusTable();
 
  private:
+
   friend class itest::TxnStatusTableITest;
   FRIEND_TEST(itest::TxnStatusTableITest, TestProtectCreateAndAlter);
 
@@ -115,7 +127,8 @@ class TxnSystemClient {
 
   Status CoordinateTransactionAsync(tserver::CoordinatorOpPB coordinate_txn_op,
                                     const MonoDelta& timeout,
-                                    const StatusCallback& cb);
+                                    const StatusCallback& cb,
+                                    tserver::CoordinatorOpResultPB* result = nullptr);
 
   client::sp::shared_ptr<client::KuduTable> txn_status_table() {
     std::lock_guard<simple_spinlock> l(table_lock_);
@@ -130,4 +143,3 @@ class TxnSystemClient {
 
 } // namespace transactions
 } // namespace kudu
-