You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/02/11 02:17:12 UTC

[kudu] branch master updated: [transactions] add TxnSystemClient::CheckOpenTxnSystemTable()

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new fd6d1e1  [transactions] add TxnSystemClient::CheckOpenTxnSystemTable()
fd6d1e1 is described below

commit fd6d1e19f40e8cf67545cbe910b7e689eba73bde
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Feb 8 21:55:02 2021 -0800

    [transactions] add TxnSystemClient::CheckOpenTxnSystemTable()
    
    This patch adds a new ClientCheckOpenTxnSystemTable() method to
    TxnSystemClient.  The new method is targeting the use case in a
    follow-up patch where txn system client is used to automatically
    register transaction participants while serving write requests
    arriving to tablet servers.
    
    NOTE: as of now, it would be necessary to re-open the transaction
          system table when a new partition added (i.e. when txn IDs
          grow) to make sense of using this approach with the newly
          introduced ClientCheckOpenTxnSystemTable() method.
          I'm planning to address that in a follow-up patch.
    
    Change-Id: If2e9b3cfc1566ff8ddbe56be980fb7d360759c0f
    Reviewed-on: http://gerrit.cloudera.org:8080/17044
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../integration-tests/txn_status_table-itest.cc    | 88 ++++++++++++++++++++++
 src/kudu/transactions/txn_system_client.cc         | 23 ++++++
 src/kudu/transactions/txn_system_client.h          |  7 ++
 3 files changed, 118 insertions(+)

diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc
index 9dd1cf7..0f38400 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -17,6 +17,7 @@
 
 #include <algorithm>
 #include <atomic>
+#include <cstddef>
 #include <cstdint>
 #include <functional>
 #include <map>
@@ -88,6 +89,7 @@ using kudu::transactions::TxnStatePB;
 using kudu::transactions::TxnStatusEntryPB;
 using kudu::transactions::TxnStatusTablet;
 using kudu::transactions::TxnSystemClient;
+using std::atomic;
 using std::map;
 using std::string;
 using std::thread;
@@ -660,6 +662,92 @@ TEST_F(TxnStatusTableITest, TestSystemClientConcurrentCalls) {
   }
 }
 
+// Check the operation of the CheckOpenTxnStatusTable() method; make sure
+// it's possible to register txn participants even with stale information
+// on the range parititions.
+TEST_F(TxnStatusTableITest, CheckOpenTxnStatusTable) {
+  static constexpr auto kPartitionWidth = 1000;
+  static constexpr const char* const kUser = "CheckOpen";
+
+  {
+    // At this point, the transaction status table doesn't exist yet.
+    auto s = txn_sys_client_->CheckOpenTxnStatusTable();
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
+
+  ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(kPartitionWidth));
+  ASSERT_OK(txn_sys_client_->CheckOpenTxnStatusTable());
+  const auto* sys_table_ptr_ref = txn_sys_client_->txn_status_table().get();
+
+  cluster_->mini_master()->Shutdown();
+  // Now, once CheckOpenTxnStatusTable() succeeded, there should be no RPC calls
+  // to the cluster upon subsequent invocations, and CheckOpenTxnStatusTable()
+  // should return Status::OK(), keeping the original shared pointer.
+  ASSERT_OK(txn_sys_client_->CheckOpenTxnStatusTable());
+  const auto* sys_table_ptr = txn_sys_client_->txn_status_table().get();
+  ASSERT_EQ(sys_table_ptr_ref, sys_table_ptr);
+
+  ASSERT_OK(cluster_->mini_master()->Restart());
+  ASSERT_OK(txn_sys_client_->BeginTransaction(0, kUser));
+
+  static constexpr auto kNewTxnId = kPartitionWidth + 1;
+  {
+    // Behind the scenes, create tablets for the next transaction IDs range
+    // and start a new transaction.
+    unique_ptr<TxnSystemClient> tsc;
+    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &tsc));
+    // Re-open the system table.
+    ASSERT_OK(tsc->OpenTxnStatusTable());
+    ASSERT_OK(tsc->AddTxnStatusTableRange(kPartitionWidth, 2 * kPartitionWidth));
+    ASSERT_OK(tsc->BeginTransaction(kNewTxnId, kUser));
+  }
+
+  // It would be great if it were possible to use the same client which was not
+  // aware of a new range in the txn status table to register a participant
+  // in a transaction with ID in the new range. This assumes the client would
+  // automatically re-fetch metadata from masters upon getting a request
+  // for a non-covered range. However, this isn't implemented yet.
+  //
+  // TODO(aserbin): change this to expected Status::OK() after implementing that
+  auto s = txn_sys_client_->RegisterParticipant(
+      kNewTxnId, "txn_participant", kUser, MonoDelta::FromSeconds(10));
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "No tablet covering the requested range partition");
+}
+
+TEST_F(TxnStatusTableITest, CheckOpenTxnStatusTableConcurrent) {
+  static constexpr auto kNumThreads = 8;
+  static constexpr const char* const kUser = "CheckOpenConcurrent";
+  static constexpr const char* const kParticipant = "txn_participant";
+
+  ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(2 * kNumThreads));
+  atomic<size_t> success_count_begin = 0;
+  atomic<size_t> success_count_register = 0;
+  vector<thread> threads;
+  threads.reserve(kNumThreads);
+  for (int idx = 0; idx < kNumThreads; ++idx) {
+    threads.emplace_back([&] {
+      CHECK_OK(txn_sys_client_->CheckOpenTxnStatusTable());
+      // The call below is of "best effort" approach: the essence here is just
+      // to make concurrent requests to the txn system client once
+      // CheckOpenTxnStatusTable() has been called.
+      auto s = txn_sys_client_->BeginTransaction(0, kUser);
+      if (s.ok()) {
+        ++success_count_begin;
+      }
+      s = txn_sys_client_->RegisterParticipant(
+          0, kParticipant, kUser, MonoDelta::FromSeconds(10));
+      if (s.ok()) {
+        ++success_count_register;
+      }
+    });
+  }
+  std::for_each(threads.begin(), threads.end(), [&] (thread& t) { t.join(); });
+  ASSERT_EQ(1, success_count_begin);
+  ASSERT_LE(1, success_count_register);
+}
+
 class MultiServerTxnStatusTableITest : public TxnStatusTableITest {
  public:
   void SetUp() override {
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index ded9dfc..67279d8 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -157,6 +157,29 @@ Status TxnSystemClient::OpenTxnStatusTable() {
   return Status::OK();
 }
 
+Status TxnSystemClient::CheckOpenTxnStatusTable() {
+  {
+    std::lock_guard<simple_spinlock> l(table_lock_);
+    if (txn_status_table_) {
+      return Status::OK();
+    }
+  }
+
+  // TODO(aserbin): enqueue concurrent calls to the OpenTable() above, if any
+  client::sp::shared_ptr<KuduTable> table;
+  RETURN_NOT_OK(client_->OpenTable(TxnStatusTablet::kTxnStatusTableName, &table));
+
+  {
+    std::lock_guard<simple_spinlock> l(table_lock_);
+    // Extra check to handle concurrent callers.
+    if (!txn_status_table_) {
+      txn_status_table_ = std::move(table);
+    }
+  }
+
+  return Status::OK();
+}
+
 Status TxnSystemClient::BeginTransaction(int64_t txn_id,
                                          const string& user,
                                          uint32_t* txn_keepalive_ms,
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index de2983d..0c5bc3e 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -45,6 +45,7 @@ class KuduTable;
 namespace itest {
 class TxnStatusTableITest;
 class TxnStatusTableITest_TestProtectCreateAndAlter_Test;
+class TxnStatusTableITest_CheckOpenTxnStatusTable_Test;
 } // namespace itest
 
 namespace rpc {
@@ -137,6 +138,11 @@ class TxnSystemClient {
   // masters.
   Status OpenTxnStatusTable();
 
+  // Check if the transaction status table is already open, returning
+  // Status::OK() if so. Otherwise, open the transaction status table. In the
+  // latter case, the result status of opening the table is returned.
+  Status CheckOpenTxnStatusTable();
+
   // Sends an RPC to the leader of the given tablet to participate in a
   // transaction.
   //
@@ -155,6 +161,7 @@ class TxnSystemClient {
 
   friend class itest::TxnStatusTableITest;
   FRIEND_TEST(itest::TxnStatusTableITest, TestProtectCreateAndAlter);
+  FRIEND_TEST(itest::TxnStatusTableITest, CheckOpenTxnStatusTable);
 
   explicit TxnSystemClient(client::sp::shared_ptr<client::KuduClient> client)
       : client_(std::move(client)) {}