You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/02/11 02:17:12 UTC
[kudu] branch master updated: [transactions] add
TxnSystemClient::CheckOpenTxnSystemTable()
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new fd6d1e1 [transactions] add TxnSystemClient::CheckOpenTxnSystemTable()
fd6d1e1 is described below
commit fd6d1e19f40e8cf67545cbe910b7e689eba73bde
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Feb 8 21:55:02 2021 -0800
[transactions] add TxnSystemClient::CheckOpenTxnSystemTable()
This patch adds a new ClientCheckOpenTxnSystemTable() method to
TxnSystemClient. The new method is targeting the use case in a
follow-up patch where txn system client is used to automatically
register transaction participants while serving write requests
arriving to tablet servers.
NOTE: as of now, it would be necessary to re-open the transaction
system table when a new partition added (i.e. when txn IDs
grow) to make sense of using this approach with the newly
introduced ClientCheckOpenTxnSystemTable() method.
I'm planning to address that in a follow-up patch.
Change-Id: If2e9b3cfc1566ff8ddbe56be980fb7d360759c0f
Reviewed-on: http://gerrit.cloudera.org:8080/17044
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Kudu Jenkins
---
.../integration-tests/txn_status_table-itest.cc | 88 ++++++++++++++++++++++
src/kudu/transactions/txn_system_client.cc | 23 ++++++
src/kudu/transactions/txn_system_client.h | 7 ++
3 files changed, 118 insertions(+)
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc
index 9dd1cf7..0f38400 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -17,6 +17,7 @@
#include <algorithm>
#include <atomic>
+#include <cstddef>
#include <cstdint>
#include <functional>
#include <map>
@@ -88,6 +89,7 @@ using kudu::transactions::TxnStatePB;
using kudu::transactions::TxnStatusEntryPB;
using kudu::transactions::TxnStatusTablet;
using kudu::transactions::TxnSystemClient;
+using std::atomic;
using std::map;
using std::string;
using std::thread;
@@ -660,6 +662,92 @@ TEST_F(TxnStatusTableITest, TestSystemClientConcurrentCalls) {
}
}
+// Check the operation of the CheckOpenTxnStatusTable() method; make sure
+// it's possible to register txn participants even with stale information
+// on the range parititions.
+TEST_F(TxnStatusTableITest, CheckOpenTxnStatusTable) {
+ static constexpr auto kPartitionWidth = 1000;
+ static constexpr const char* const kUser = "CheckOpen";
+
+ {
+ // At this point, the transaction status table doesn't exist yet.
+ auto s = txn_sys_client_->CheckOpenTxnStatusTable();
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ }
+
+ ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(kPartitionWidth));
+ ASSERT_OK(txn_sys_client_->CheckOpenTxnStatusTable());
+ const auto* sys_table_ptr_ref = txn_sys_client_->txn_status_table().get();
+
+ cluster_->mini_master()->Shutdown();
+ // Now, once CheckOpenTxnStatusTable() succeeded, there should be no RPC calls
+ // to the cluster upon subsequent invocations, and CheckOpenTxnStatusTable()
+ // should return Status::OK(), keeping the original shared pointer.
+ ASSERT_OK(txn_sys_client_->CheckOpenTxnStatusTable());
+ const auto* sys_table_ptr = txn_sys_client_->txn_status_table().get();
+ ASSERT_EQ(sys_table_ptr_ref, sys_table_ptr);
+
+ ASSERT_OK(cluster_->mini_master()->Restart());
+ ASSERT_OK(txn_sys_client_->BeginTransaction(0, kUser));
+
+ static constexpr auto kNewTxnId = kPartitionWidth + 1;
+ {
+ // Behind the scenes, create tablets for the next transaction IDs range
+ // and start a new transaction.
+ unique_ptr<TxnSystemClient> tsc;
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &tsc));
+ // Re-open the system table.
+ ASSERT_OK(tsc->OpenTxnStatusTable());
+ ASSERT_OK(tsc->AddTxnStatusTableRange(kPartitionWidth, 2 * kPartitionWidth));
+ ASSERT_OK(tsc->BeginTransaction(kNewTxnId, kUser));
+ }
+
+ // It would be great if it were possible to use the same client which was not
+ // aware of a new range in the txn status table to register a participant
+ // in a transaction with ID in the new range. This assumes the client would
+ // automatically re-fetch metadata from masters upon getting a request
+ // for a non-covered range. However, this isn't implemented yet.
+ //
+ // TODO(aserbin): change this to expected Status::OK() after implementing that
+ auto s = txn_sys_client_->RegisterParticipant(
+ kNewTxnId, "txn_participant", kUser, MonoDelta::FromSeconds(10));
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "No tablet covering the requested range partition");
+}
+
+TEST_F(TxnStatusTableITest, CheckOpenTxnStatusTableConcurrent) {
+ static constexpr auto kNumThreads = 8;
+ static constexpr const char* const kUser = "CheckOpenConcurrent";
+ static constexpr const char* const kParticipant = "txn_participant";
+
+ ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(2 * kNumThreads));
+ atomic<size_t> success_count_begin = 0;
+ atomic<size_t> success_count_register = 0;
+ vector<thread> threads;
+ threads.reserve(kNumThreads);
+ for (int idx = 0; idx < kNumThreads; ++idx) {
+ threads.emplace_back([&] {
+ CHECK_OK(txn_sys_client_->CheckOpenTxnStatusTable());
+ // The call below is of "best effort" approach: the essence here is just
+ // to make concurrent requests to the txn system client once
+ // CheckOpenTxnStatusTable() has been called.
+ auto s = txn_sys_client_->BeginTransaction(0, kUser);
+ if (s.ok()) {
+ ++success_count_begin;
+ }
+ s = txn_sys_client_->RegisterParticipant(
+ 0, kParticipant, kUser, MonoDelta::FromSeconds(10));
+ if (s.ok()) {
+ ++success_count_register;
+ }
+ });
+ }
+ std::for_each(threads.begin(), threads.end(), [&] (thread& t) { t.join(); });
+ ASSERT_EQ(1, success_count_begin);
+ ASSERT_LE(1, success_count_register);
+}
+
class MultiServerTxnStatusTableITest : public TxnStatusTableITest {
public:
void SetUp() override {
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index ded9dfc..67279d8 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -157,6 +157,29 @@ Status TxnSystemClient::OpenTxnStatusTable() {
return Status::OK();
}
+Status TxnSystemClient::CheckOpenTxnStatusTable() {
+ {
+ std::lock_guard<simple_spinlock> l(table_lock_);
+ if (txn_status_table_) {
+ return Status::OK();
+ }
+ }
+
+ // TODO(aserbin): enqueue concurrent calls to the OpenTable() above, if any
+ client::sp::shared_ptr<KuduTable> table;
+ RETURN_NOT_OK(client_->OpenTable(TxnStatusTablet::kTxnStatusTableName, &table));
+
+ {
+ std::lock_guard<simple_spinlock> l(table_lock_);
+ // Extra check to handle concurrent callers.
+ if (!txn_status_table_) {
+ txn_status_table_ = std::move(table);
+ }
+ }
+
+ return Status::OK();
+}
+
Status TxnSystemClient::BeginTransaction(int64_t txn_id,
const string& user,
uint32_t* txn_keepalive_ms,
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index de2983d..0c5bc3e 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -45,6 +45,7 @@ class KuduTable;
namespace itest {
class TxnStatusTableITest;
class TxnStatusTableITest_TestProtectCreateAndAlter_Test;
+class TxnStatusTableITest_CheckOpenTxnStatusTable_Test;
} // namespace itest
namespace rpc {
@@ -137,6 +138,11 @@ class TxnSystemClient {
// masters.
Status OpenTxnStatusTable();
+ // Check if the transaction status table is already open, returning
+ // Status::OK() if so. Otherwise, open the transaction status table. In the
+ // latter case, the result status of opening the table is returned.
+ Status CheckOpenTxnStatusTable();
+
// Sends an RPC to the leader of the given tablet to participate in a
// transaction.
//
@@ -155,6 +161,7 @@ class TxnSystemClient {
friend class itest::TxnStatusTableITest;
FRIEND_TEST(itest::TxnStatusTableITest, TestProtectCreateAndAlter);
+ FRIEND_TEST(itest::TxnStatusTableITest, CheckOpenTxnStatusTable);
explicit TxnSystemClient(client::sp::shared_ptr<client::KuduClient> client)
: client_(std::move(client)) {}