You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/09/28 21:56:52 UTC
[kudu] branch master updated: KUDU-2612 p6 (c): add
GetTransactionStatus() to TxnSystemClient
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 01a9a5a KUDU-2612 p6 (c): add GetTransactionStatus() to TxnSystemClient
01a9a5a is described below
commit 01a9a5a9ca4000862fa92c7cbff42967a067c326
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Sep 23 22:18:48 2020 -0700
KUDU-2612 p6 (c): add GetTransactionStatus() to TxnSystemClient
This patch adds GetTransactionStatus() method into the interface
of the TxnSystemClient class. Added corresponding test as well.
This is a follow-up to cb1c2efb59373453e734074a02021f14c403257d
and 0f9ff5ff043125be4a1150be0306373619b4ca89
Change-Id: I7fac7158df307d03db6a48087e7c5a16269a3bc6
Reviewed-on: http://gerrit.cloudera.org:8080/16501
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
.../integration-tests/txn_status_table-itest.cc | 65 +++++++++++++++++++++-
src/kudu/transactions/coordinator_rpc.cc | 27 ++++++---
src/kudu/transactions/coordinator_rpc.h | 9 ++-
src/kudu/transactions/txn_system_client.cc | 38 ++++++++++++-
src/kudu/transactions/txn_system_client.h | 16 +++++-
5 files changed, 136 insertions(+), 19 deletions(-)
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc
index d26a1a7..3d85b7e 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -50,6 +50,7 @@
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
+#include "kudu/transactions/transactions.pb.h"
#include "kudu/transactions/txn_status_tablet.h"
#include "kudu/transactions/txn_system_client.h"
#include "kudu/tserver/mini_tablet_server.h"
@@ -66,18 +67,20 @@ DECLARE_double(leader_failure_max_missed_heartbeat_periods);
DECLARE_string(superuser_acl);
DECLARE_string(user_acl);
-using kudu::client::sp::shared_ptr;
using kudu::client::AuthenticationCredentialsPB;
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduTable;
+using kudu::client::sp::shared_ptr;
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
-using kudu::itest::TabletServerMap;
using kudu::itest::TServerDetails;
+using kudu::itest::TabletServerMap;
using kudu::tablet::TabletReplica;
-using kudu::transactions::TxnSystemClient;
+using kudu::transactions::TxnStatePB;
+using kudu::transactions::TxnStatusEntryPB;
using kudu::transactions::TxnStatusTablet;
+using kudu::transactions::TxnSystemClient;
using std::map;
using std::string;
using std::thread;
@@ -436,6 +439,7 @@ TEST_F(TxnStatusTableITest, SystemClientCommitAndAbortTransaction) {
ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser));
+
// Calling BeginCommitTransaction() on already committing transaction is OK.
ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser));
// It's completely legal to abort a transaction that has entered the commit
@@ -488,6 +492,61 @@ TEST_F(TxnStatusTableITest, SystemClientCommitAndAbortTransaction) {
}
}
+TEST_F(TxnStatusTableITest, GetTransactionStatus) {
+ const auto verify_state = [&](TxnStatePB state) {
+ TxnStatusEntryPB txn_status;
+ ASSERT_OK(txn_sys_client_->GetTransactionStatus(1, kUser, &txn_status));
+ ASSERT_TRUE(txn_status.has_user());
+ ASSERT_STREQ(kUser, txn_status.user().c_str());
+ ASSERT_TRUE(txn_status.has_state());
+ ASSERT_EQ(state, txn_status.state());
+ };
+
+ ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
+ ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+
+ ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+ NO_FATALS(verify_state(TxnStatePB::OPEN));
+
+ ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser));
+ NO_FATALS(verify_state(TxnStatePB::COMMIT_IN_PROGRESS));
+
+ ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
+ NO_FATALS(verify_state(TxnStatePB::ABORTED));
+
+ {
+ auto s = txn_sys_client_->BeginCommitTransaction(1, kUser);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ NO_FATALS(verify_state(TxnStatePB::ABORTED));
+ }
+
+ // In the negative scenarios below, check for the expected status code
+ // and make sure nothing is set in the output argument.
+ {
+ TxnStatusEntryPB empty;
+ auto s = txn_sys_client_->GetTransactionStatus(1, "interloper", &empty);
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ ASSERT_FALSE(empty.has_user());
+ ASSERT_FALSE(empty.has_state());
+ }
+
+ {
+ TxnStatusEntryPB empty;
+ auto s = txn_sys_client_->GetTransactionStatus(2, kUser, &empty);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_FALSE(empty.has_user());
+ ASSERT_FALSE(empty.has_state());
+ }
+
+ {
+ TxnStatusEntryPB empty;
+ auto s = txn_sys_client_->GetTransactionStatus(2, "", &empty);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_FALSE(empty.has_user());
+ ASSERT_FALSE(empty.has_state());
+ }
+}
+
// Test that a transaction system client can make concurrent calls to multiple
// transaction status tablets.
TEST_F(TxnStatusTableITest, TestSystemClientConcurrentCalls) {
diff --git a/src/kudu/transactions/coordinator_rpc.cc b/src/kudu/transactions/coordinator_rpc.cc
index e0870c8..d8fa3d6 100644
--- a/src/kudu/transactions/coordinator_rpc.cc
+++ b/src/kudu/transactions/coordinator_rpc.cc
@@ -49,6 +49,7 @@ using kudu::rpc::ErrorStatusPB;
using kudu::rpc::ResponseCallback;
using kudu::rpc::RetriableRpc;
using kudu::rpc::RetriableRpcStatus;
+using kudu::tserver::CoordinatorOpResultPB;
using std::string;
using std::unique_ptr;
using strings::Substitute;
@@ -57,9 +58,11 @@ namespace kudu {
class MonoTime;
namespace transactions {
-CoordinatorRpc* CoordinatorRpc::NewRpc(unique_ptr<TxnStatusTabletContext> ctx,
- const MonoTime& deadline,
- StatusCallback cb) {
+CoordinatorRpc* CoordinatorRpc::NewRpc(
+ unique_ptr<TxnStatusTabletContext> ctx,
+ const MonoTime& deadline,
+ StatusCallback cb,
+ CoordinatorOpResultPB* op_result) {
KuduClient* client = ctx->table->client();
scoped_refptr<MetaCacheServerPicker> server_picker(
new MetaCacheServerPicker(client,
@@ -69,7 +72,8 @@ CoordinatorRpc* CoordinatorRpc::NewRpc(unique_ptr<TxnStatusTabletContext> ctx,
CoordinatorRpc* rpc = new CoordinatorRpc(std::move(ctx),
server_picker,
deadline,
- std::move(cb));
+ std::move(cb),
+ op_result);
return rpc;
}
@@ -85,6 +89,9 @@ void CoordinatorRpc::Finish(const Status& status) {
resp_.has_op_result() && resp_.op_result().has_op_error()) {
final_status = StatusFromPB(resp_.op_result().op_error());
}
+ if (resp_.has_op_result() && op_result_) {
+ *op_result_ = resp_.op_result();
+ }
cb_(final_status);
}
@@ -99,7 +106,8 @@ bool CoordinatorRpc::GetNewAuthnTokenAndRetry() {
CoordinatorRpc::CoordinatorRpc(unique_ptr<TxnStatusTabletContext> ctx,
const scoped_refptr<MetaCacheServerPicker>& replica_picker,
const MonoTime& deadline,
- StatusCallback cb)
+ StatusCallback cb,
+ CoordinatorOpResultPB* op_result)
: RetriableRpc(replica_picker,
DCHECK_NOTNULL(ctx->table)->client()->data_->request_tracker_,
deadline,
@@ -107,15 +115,16 @@ CoordinatorRpc::CoordinatorRpc(unique_ptr<TxnStatusTabletContext> ctx,
client_(ctx->table->client()),
table_(std::move(ctx->table)),
tablet_(std::move(ctx->tablet)),
- cb_(std::move(cb)) {
+ cb_(std::move(cb)),
+ op_result_(op_result) {
req_.set_txn_status_tablet_id(tablet_->tablet_id());
*req_.mutable_op() = std::move(ctx->coordinate_txn_op);
}
-void CoordinatorRpc::Try(RemoteTabletServer* replica, const ResponseCallback& callback) {
+void CoordinatorRpc::Try(RemoteTabletServer* replica,
+ const ResponseCallback& callback) {
replica->admin_proxy()->CoordinateTransactionAsync(
- req_, &resp_, mutable_retrier()->mutable_controller(),
- callback);
+ req_, &resp_, mutable_retrier()->mutable_controller(), callback);
}
// TODO(awong): much of this is borrowed from WriteRpc::AnalyzeResponse(). It'd
diff --git a/src/kudu/transactions/coordinator_rpc.h b/src/kudu/transactions/coordinator_rpc.h
index 00c56a6..2af39ff 100644
--- a/src/kudu/transactions/coordinator_rpc.h
+++ b/src/kudu/transactions/coordinator_rpc.h
@@ -55,9 +55,12 @@ class CoordinatorRpc final : public rpc::RetriableRpc<client::internal::RemoteTa
tserver::CoordinateTransactionRequestPB,
tserver::CoordinateTransactionResponsePB> {
public:
+ // NOTE: if 'op_result' is non-null, the memory it points to should stay valid
+ // until the RPC completes (i.e. until callback 'cb' is invoked).
static CoordinatorRpc* NewRpc(std::unique_ptr<TxnStatusTabletContext> ctx,
const MonoTime& deadline,
- StatusCallback cb);
+ StatusCallback cb,
+ tserver::CoordinatorOpResultPB* op_result = nullptr);
~CoordinatorRpc() {}
@@ -78,12 +81,14 @@ class CoordinatorRpc final : public rpc::RetriableRpc<client::internal::RemoteTa
CoordinatorRpc(std::unique_ptr<TxnStatusTabletContext> ctx,
const scoped_refptr<client::internal::MetaCacheServerPicker>& replica_picker,
const MonoTime& deadline,
- StatusCallback cb);
+ StatusCallback cb,
+ tserver::CoordinatorOpResultPB* op_result);
client::KuduClient* client_;
client::sp::shared_ptr<client::KuduTable> table_;
scoped_refptr<client::internal::RemoteTablet> tablet_;
const StatusCallback cb_;
+ tserver::CoordinatorOpResultPB* op_result_;
};
} // namespace transactions
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index 3cb1d4e..27a909a 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -37,6 +37,7 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/transactions/coordinator_rpc.h"
#include "kudu/transactions/txn_status_tablet.h"
+#include "kudu/transactions/transactions.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/async_util.h"
@@ -48,6 +49,7 @@ using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
using kudu::client::internal::MetaCache;
using kudu::tserver::CoordinatorOpPB;
+using kudu::tserver::CoordinatorOpResultPB;
using std::string;
using std::unique_ptr;
using std::vector;
@@ -174,9 +176,38 @@ Status TxnSystemClient::AbortTransaction(int64_t txn_id,
return s.Wait();
}
+Status TxnSystemClient::GetTransactionStatus(int64_t txn_id,
+ const string& user,
+ TxnStatusEntryPB* txn_status,
+ MonoDelta timeout) {
+ DCHECK(txn_status);
+ CoordinatorOpPB coordinate_txn_op;
+ coordinate_txn_op.set_type(CoordinatorOpPB::GET_TXN_STATUS);
+ coordinate_txn_op.set_txn_id(txn_id);
+ coordinate_txn_op.set_user(user);
+ Synchronizer s;
+ CoordinatorOpResultPB result;
+ RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op),
+ timeout,
+ s.AsStatusCallback(),
+ &result));
+ const auto ret = s.Wait();
+ if (ret.ok()) {
+ // Retrieve the response and set corresponding output parameters.
+ DCHECK(!result.has_op_error());
+ DCHECK(result.has_txn_status());
+ TxnStatusEntryPB ret;
+ ret.set_state(result.txn_status().state());
+ ret.set_allocated_user(result.mutable_txn_status()->release_user());
+ *txn_status = std::move(ret);
+ }
+ return ret;
+}
+
Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_txn_op,
const MonoDelta& timeout,
- const StatusCallback& cb) {
+ const StatusCallback& cb,
+ CoordinatorOpResultPB* result) {
const MonoTime deadline = MonoTime::Now() + timeout;
unique_ptr<TxnStatusTabletContext> ctx(
new TxnStatusTabletContext({
@@ -200,7 +231,7 @@ Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_tx
&ctx_raw->tablet,
// TODO(awong): when we start using C++14, stack-allocate 'ctx' and
// move capture it.
- [cb, deadline, ctx_raw] (const Status& s) {
+ [cb, deadline, ctx_raw, result] (const Status& s) {
// First, take ownership of the context.
unique_ptr<TxnStatusTabletContext> ctx(ctx_raw);
@@ -213,7 +244,8 @@ Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_tx
CoordinatorRpc* rpc = CoordinatorRpc::NewRpc(
std::move(ctx),
deadline,
- cb);
+ cb,
+ result);
rpc->SendRpc();
});
return Status::OK();
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index b2b74fd..f3c8568 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -44,10 +44,13 @@ class TxnStatusTableITest_TestProtectCreateAndAlter_Test;
namespace tserver {
class CoordinatorOpPB;
+class CoordinatorOpResultPB;
} // namespace tserver
namespace transactions {
+class TxnStatusEntryPB;
+
// Wrapper around a KuduClient used by Kudu for making transaction-related
// calls to various servers.
class TxnSystemClient {
@@ -97,11 +100,20 @@ class TxnSystemClient {
const std::string& user,
MonoDelta timeout = MonoDelta::FromSeconds(10));
+ // Retrieves transactions status. On success, returns Status::OK() and stores
+ // the result status in the 'txn_status' output parameter. On failure,
+ // returns corresponding Status.
+ Status GetTransactionStatus(int64_t txn_id,
+ const std::string& user,
+ TxnStatusEntryPB* txn_status,
+ MonoDelta timeout = MonoDelta::FromSeconds(10));
+
// Opens the transaction status table, refreshing metadata with that from the
// masters.
Status OpenTxnStatusTable();
private:
+
friend class itest::TxnStatusTableITest;
FRIEND_TEST(itest::TxnStatusTableITest, TestProtectCreateAndAlter);
@@ -115,7 +127,8 @@ class TxnSystemClient {
Status CoordinateTransactionAsync(tserver::CoordinatorOpPB coordinate_txn_op,
const MonoDelta& timeout,
- const StatusCallback& cb);
+ const StatusCallback& cb,
+ tserver::CoordinatorOpResultPB* result = nullptr);
client::sp::shared_ptr<client::KuduTable> txn_status_table() {
std::lock_guard<simple_spinlock> l(table_lock_);
@@ -130,4 +143,3 @@ class TxnSystemClient {
} // namespace transactions
} // namespace kudu
-