You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2021/04/30 17:29:40 UTC

[kudu] 03/05: [tests] enable using txns in TestWorkload

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit fcfe6dbb3466df0edf368f5d7257a7766f9a1dc5
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Mon Apr 19 13:12:45 2021 -0700

    [tests] enable using txns in TestWorkload
    
    I have an upcoming test in which it'd convenient to have an easy means
    to generate a transactional workload. This patch introduces some options
    to the TestWorkload class that satisfy this need:
    - set_begin_txn()
    - set_commit_txn()
    - set_rollback_txn()
    - set_txn_id(int64_t txn_id)
    
    Change-Id: Ia81daac8fcfd552603a0302c3d9aa411ea082ab1
    Reviewed-on: http://gerrit.cloudera.org:8080/17326
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/integration-tests/test_workload.cc       |  57 ++++++++-
 src/kudu/integration-tests/test_workload.h        |  56 +++++++-
 src/kudu/integration-tests/txn_write_ops-itest.cc | 148 +++++++++++++++++++++-
 3 files changed, 251 insertions(+), 10 deletions(-)

diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index f646a70..51e089a 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -19,6 +19,7 @@
 
 #include <memory>
 #include <ostream>
+#include <string>
 
 #include <glog/logging.h>
 
@@ -27,12 +28,14 @@
 #include "kudu/client/schema.h"
 #include "kudu/client/write_op.h"
 #include "kudu/common/partial_row.h"
+#include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/gutil/mathlimits.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/integration-tests/data_gen_util.h"
 #include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/transactions/transactions.pb.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_util.h"
@@ -48,9 +51,12 @@ using kudu::client::KuduSchema;
 using kudu::client::KuduSession;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
+using kudu::client::KuduTransaction;
 using kudu::client::KuduUpdate;
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::MiniCluster;
+using kudu::transactions::TxnTokenPB;
+using std::string;
 using std::unique_ptr;
 using std::vector;
 
@@ -71,6 +77,10 @@ TestWorkload::TestWorkload(MiniCluster* cluster,
     write_batch_size_(50),
     write_interval_millis_(0),
     write_timeout_millis_(20000),
+    txn_id_(TxnId::kInvalidTxnId),
+    begin_txn_(false),
+    commit_txn_(false),
+    rollback_txn_(false),
     fault_tolerant_(true),
     verify_num_rows_(true),
     read_errors_allowed_(false),
@@ -141,7 +151,12 @@ void TestWorkload::WriteThread() {
   shared_ptr<KuduTable> table;
   OpenTable(&table);
 
-  shared_ptr<KuduSession> session = client_->NewSession();
+  shared_ptr<KuduSession> session;
+  if (txn_) {
+    CHECK_OK(txn_->CreateSession(&session));
+  } else {
+    session = client_->NewSession();
+  }
   session->SetTimeoutMillis(write_timeout_millis_);
   CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
 
@@ -247,7 +262,8 @@ void TestWorkload::ReadThread() {
     // Note: when INSERT_RANDOM_ROWS_WITH_DELETE is used, ReadThread doesn't really verify
     // anything except that a scan works.
     int64_t expected_min_rows = 0;
-    if (write_pattern_ != INSERT_RANDOM_ROWS_WITH_DELETE && verify_num_rows_) {
+    if (write_pattern_ != INSERT_RANDOM_ROWS_WITH_DELETE && verify_num_rows_ &&
+        !begin_txn_ && !txn_id_.IsValid()) {
       expected_min_rows = rows_inserted_.Load();
     }
     size_t row_count = 0;
@@ -290,6 +306,12 @@ shared_ptr<KuduClient> TestWorkload::CreateClient() {
 }
 
 void TestWorkload::Setup() {
+  if (begin_txn_) {
+    CHECK(!txn_id_.IsValid()) << "Cannot begin txn and supply txn ID at the same time";
+  }
+  if (commit_txn_ || rollback_txn_) {
+    CHECK(txn_id_.IsValid() || begin_txn_) << "Must participate in a txn to commit or abort";
+  }
   if (!client_) {
     CHECK_OK(cluster_->CreateClient(&client_builder_, &client_));
   }
@@ -396,6 +418,29 @@ void TestWorkload::Start() {
   CHECK(!should_run_.Load()) << "Already started";
   should_run_.Store(true);
   start_latch_.Reset(num_write_threads_ + num_read_threads_);
+  if (txn_id_.IsValid()) {
+    // TODO(awong): add an API to set the keepalive. For now just use an
+    // arbitrary, short default value.
+    CHECK(!txn_);
+    TxnTokenPB txn_token_pb;
+    txn_token_pb.set_txn_id(txn_id_.value());
+    txn_token_pb.set_enable_keepalive(true);
+    txn_token_pb.set_keepalive_millis(1000);
+    string txn_token_str;
+    CHECK(txn_token_pb.SerializeToString(&txn_token_str));
+    CHECK_OK(KuduTransaction::Deserialize(client_, txn_token_str, &txn_));
+  }
+  if (begin_txn_) {
+    CHECK(!txn_);
+    CHECK(!txn_id_.IsValid());
+    CHECK_OK(client_->NewTransaction(&txn_));
+    string txn_str;
+    CHECK_OK(txn_->Serialize(&txn_str));
+    TxnTokenPB txn_token_pb;
+    CHECK(txn_token_pb.ParseFromString(txn_str));
+    CHECK(txn_token_pb.has_txn_id());
+    txn_id_ = TxnId(txn_token_pb.txn_id());
+  }
   for (int i = 0; i < num_write_threads_; i++) {
     threads_.emplace_back(&TestWorkload::WriteThread, this);
   }
@@ -422,6 +467,14 @@ void TestWorkload::StopAndJoin() {
     t.join();
   }
   threads_.clear();
+  if (txn_) {
+    if (commit_txn_) {
+      CHECK_OK(txn_->Commit());
+    }
+    if (rollback_txn_) {
+      CHECK_OK(txn_->Rollback());
+    }
+  }
 }
 
 } // namespace kudu
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index fb86b76..57ef175 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -31,6 +31,7 @@
 #include "kudu/client/client.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/common/txn_id.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/countdown_latch.h"
@@ -77,6 +78,44 @@ class TestWorkload {
       PartitioningType partitioning = PartitioningType::RANGE);
   ~TestWorkload();
 
+  // Ingest the workload as part of the given transaction. set_begin_txn() must
+  // not be called if this is set.
+  void set_txn_id(int64_t txn_id) {
+    txn_id_ = TxnId(txn_id);
+    CHECK(txn_id_.IsValid());
+  }
+
+  // Ingest the workload as a part of a new transaction. set_txn_id() must not
+  // be called if this is set.
+  void set_begin_txn() {
+    begin_txn_ = true;
+    CHECK(!txn_id_.IsValid());
+  }
+
+  // Commit the transaction that this workload is a part of upon calling
+  // StopAndJoin(). If set, either set_begin_txn() or set_txn_id() must be set
+  // as well. If not set, but either set_begin_txn() or set_txn_id() is set,
+  // the workload will ingest as a part of the transaction, but not call
+  // commit on completion.
+  //
+  // set_rollback_txn() must not be called if this is set.
+  void set_commit_txn() {
+    commit_txn_ = true;
+    CHECK(!rollback_txn_);
+  }
+
+  // Abort the transaction that this workload is a part of upon calling
+  // StopAndJoin(). If set, either set_begin_txn() or set_txn_id() must be set
+  // as well. If not set, but either set_begin_txn() or set_txn_id() is set,
+  // the workload will ingest as a part of the transaction, but not call abort
+  // on completion.
+  //
+  // set_commit_txn() must not be called if this is set.
+  void set_rollback_txn() {
+    rollback_txn_ = true;
+    CHECK(!commit_txn_);
+  }
+
   // Sets whether the read thread should crash if scanning to the cluster fails
   // for whatever reason. If set to true, errors will be populated in
   // 'read_errors_'.
@@ -152,6 +191,9 @@ class TestWorkload {
 
   // Set whether we should attempt to verify the number of rows when scanning.
   // An incorrect number of rows may be indicative of a stale read.
+  //
+  // If either set_begin_txn() or set_txn_id() has been called, does not verify
+  // the number of rows.
   void set_verify_num_rows(bool should_verify) {
     verify_num_rows_ = should_verify;
   }
@@ -250,8 +292,14 @@ class TestWorkload {
   // Delete created table, etc.
   Status Cleanup();
 
+  int64_t txn_id() const {
+    CHECK(txn_id_.IsValid());
+    return txn_id_.value();
+  }
+
   // Return the number of rows inserted so far. This may be called either
-  // during or after the write workload.
+  // during or after the write workload. If writing as a part of a transaction,
+  // these rows may have not been committed.
   int64_t rows_inserted() const {
     return rows_inserted_.Load();
   }
@@ -298,6 +346,10 @@ class TestWorkload {
   int write_batch_size_;
   int write_interval_millis_;
   int write_timeout_millis_;
+  TxnId txn_id_;
+  bool begin_txn_;
+  bool commit_txn_;
+  bool rollback_txn_;
   bool fault_tolerant_;
   bool verify_num_rows_;
   bool read_errors_allowed_;
@@ -321,6 +373,8 @@ class TestWorkload {
   AtomicInt<int64_t> batches_completed_;
   AtomicInt<int32_t> sequential_key_gen_;
 
+  client::sp::shared_ptr<client::KuduTransaction> txn_;
+
   std::vector<std::thread> threads_;
 
   mutable simple_spinlock read_error_lock_;
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index b53c796..1aabbc1 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -60,6 +60,7 @@
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/rpc/rpc_controller.h"
@@ -201,6 +202,12 @@ Status CountRows(KuduTable* table, size_t* num_rows) {
   return Status::OK();
 }
 
+Status CountRows(KuduClient* client, const string& table_name, size_t* num_rows) {
+  shared_ptr<KuduTable> table;
+  RETURN_NOT_OK(client->OpenTable(table_name, &table));
+  return CountRows(table.get(), num_rows);
+}
+
 Status GetSingleRowError(KuduSession* session) {
   vector<KuduError*> errors;
   ElementDeleter drop(&errors);
@@ -932,7 +939,7 @@ class TxnOpDispatcherITest : public KuduTest {
     CHECK_OK(BuildSchema(&schema_));
   }
 
-  void Prepare(int num_tservers, bool create_table = true, int num_replicas = 0) {
+  void SetupCluster(int num_tservers, int num_replicas = 0) {
     if (num_replicas == 0) {
       num_replicas = num_tservers;
     }
@@ -944,7 +951,13 @@ class TxnOpDispatcherITest : public KuduTest {
     opts.num_tablet_servers = num_tservers;
     cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
     ASSERT_OK(cluster_->StartSync());
+  }
 
+  void Prepare(int num_tservers, bool create_table = true, int num_replicas = 0) {
+    if (num_replicas == 0) {
+      num_replicas = num_tservers;
+    }
+    NO_FATALS(SetupCluster(num_tservers, num_replicas));
     KuduClientBuilder builder;
     builder.default_admin_operation_timeout(kTimeout);
     ASSERT_OK(cluster_->CreateClient(&builder, &client_));
@@ -992,14 +1005,15 @@ class TxnOpDispatcherITest : public KuduTest {
   }
 
   // Get all replicas of the test table.
-  vector<scoped_refptr<TabletReplica>> GetAllReplicas() const {
+  vector<scoped_refptr<TabletReplica>> GetAllReplicas(const string& table_name = "") const {
+    const string& target_table = table_name.empty() ? kTableName : table_name;
     vector<scoped_refptr<TabletReplica>> result;
     for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
       auto* server = cluster_->mini_tablet_server(i)->server();
       vector<scoped_refptr<TabletReplica>> replicas;
       server->tablet_manager()->GetTabletReplicas(&replicas);
       for (auto& r : replicas) {
-        if (r->tablet()->metadata()->table_name() == kTableName) {
+        if (r->tablet()->metadata()->table_name() == target_table) {
           result.emplace_back(std::move(r));
         }
       }
@@ -1008,10 +1022,11 @@ class TxnOpDispatcherITest : public KuduTest {
   }
 
   size_t GetTxnOpDispatchersTotalCount(
-      vector<scoped_refptr<TabletReplica>> replicas = {}) {
+      vector<scoped_refptr<TabletReplica>> replicas = {},
+      const string& table_name = "") {
     if (replicas.empty()) {
       // No replicas were specified, get the list of all test table's replicas.
-      replicas = GetAllReplicas();
+      replicas = GetAllReplicas(table_name);
     }
     size_t elem_count = 0;
     for (auto& r : replicas) {
@@ -1041,8 +1056,8 @@ class TxnOpDispatcherITest : public KuduTest {
   typedef vector<std::shared_ptr<typename TabletReplica::TxnOpDispatcher>>
       OpDispatchers;
   typedef map<int64_t, OpDispatchers> OpDispatchersPerTxnId;
-  OpDispatchersPerTxnId GetTxnOpDispatchers() {
-    auto replicas = GetAllReplicas();
+  OpDispatchersPerTxnId GetTxnOpDispatchers(const string& table_name = "") {
+    auto replicas = GetAllReplicas(table_name);
     OpDispatchersPerTxnId result;
     for (auto& r : replicas) {
       std::lock_guard<simple_spinlock> guard(r->txn_op_dispatchers_lock_);
@@ -2191,4 +2206,123 @@ TEST_F(TxnOpDispatcherITest, DISABLED_TxnMultipleSingleRowsWithServerRestart) {
   }
 }
 
+// Test beginning and aborting a transaction from the same test workload.
+TEST_F(TxnOpDispatcherITest, TestBeginAbortTransactionalTestWorkload) {
+  NO_FATALS(SetupCluster(1));
+  TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH);
+  w.set_num_replicas(1);
+  w.set_num_tablets(3);
+  w.set_begin_txn();
+  w.set_rollback_txn();
+  w.Setup();
+  w.Start();
+  const auto& table_name = w.table_name();
+  while (w.rows_inserted() < 1000) {
+    SleepFor(MonoDelta::FromMilliseconds(5));
+  }
+  // Each participant should have a dispatcher.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, table_name));
+  });
+  w.StopAndJoin();
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, table_name));
+  });
+  // By the end of it, we should have aborted the rows and they should not be
+  // visible to clients.
+  size_t num_rows;
+  ASSERT_OK(CountRows(w.client().get(), table_name, &num_rows));
+  ASSERT_EQ(0, num_rows);
+}
+
+// Test beginning and committing a transaction from the same test workload.
+TEST_F(TxnOpDispatcherITest, TestBeginCommitTransactionalTestWorkload) {
+  NO_FATALS(SetupCluster(1));
+  TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH);
+  w.set_num_replicas(1);
+  w.set_num_tablets(3);
+  w.set_begin_txn();
+  w.set_commit_txn();
+  w.Setup();
+  w.Start();
+  const auto& table_name = w.table_name();
+  while (w.rows_inserted() < 1000) {
+    SleepFor(MonoDelta::FromMilliseconds(5));
+  }
+  // Each participant should have a dispatcher.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, table_name));
+  });
+  w.StopAndJoin();
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, table_name));
+  });
+  // By the end of it, we should have committed the rows and they should be
+  // visible to clients.
+  size_t num_rows;
+  ASSERT_OK(CountRows(w.client().get(), table_name, &num_rows));
+  ASSERT_EQ(w.rows_inserted(), num_rows);
+}
+
+// Test beginning and committing a transaction from separate test workloads.
+TEST_F(TxnOpDispatcherITest, TestSeparateBeginCommitTestWorkloads) {
+  NO_FATALS(SetupCluster(1));
+  int64_t txn_id;
+  string first_table_name;
+  size_t first_rows_inserted;
+  {
+    TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH);
+    w.set_begin_txn();
+    w.set_num_replicas(1);
+    w.set_num_tablets(3);
+    w.Setup();
+    w.Start();
+    while (w.rows_inserted() < 1000) {
+      SleepFor(MonoDelta::FromMilliseconds(5));
+    }
+    first_table_name = w.table_name();
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, first_table_name));
+    });
+    w.StopAndJoin();
+    first_rows_inserted = w.rows_inserted();
+    txn_id = w.txn_id();
+    size_t num_rows;
+    ASSERT_OK(CountRows(w.client().get(), first_table_name, &num_rows));
+    ASSERT_EQ(0, num_rows);
+  }
+  // Create a new workload, and insert as a part of the same transaction.
+  {
+    TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH);
+    const auto& kSecondTableName = "default.second_table";
+    w.set_txn_id(txn_id);
+    w.set_commit_txn();
+    w.set_table_name(kSecondTableName);
+    w.set_num_replicas(1);
+    w.set_num_tablets(3);
+    w.Setup();
+    w.Start();
+    while (w.rows_inserted() < 1000) {
+      SleepFor(MonoDelta::FromMilliseconds(5));
+    }
+    // We should have dispatchers for both tables.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, first_table_name));
+      ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, kSecondTableName));
+    });
+    w.StopAndJoin();
+    // Once committed, the dispatchers should be unregistered.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, first_table_name));
+      ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, kSecondTableName));
+    });
+    size_t num_rows;
+    ASSERT_OK(CountRows(w.client().get(), first_table_name, &num_rows));
+    ASSERT_EQ(first_rows_inserted, num_rows);
+    ASSERT_OK(CountRows(w.client().get(), kSecondTableName, &num_rows));
+    ASSERT_EQ(w.rows_inserted(), num_rows);
+  }
+}
+
+
 } // namespace kudu