You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2021/04/30 17:29:37 UTC

[kudu] branch master updated (28f2f35 -> c7ede99)

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 28f2f35  KUDU-2612: propagate commit timestamp (Java client)
     new 4c53ce8  KUDU-2612: fix race when adopting partition lock
     new 10d8e32  [client] return TimedOut if the deadline has passed in master RPCs
     new fcfe6db  [tests] enable using txns in TestWorkload
     new 00c1b67  KUDU-2754: Mark max_log_files as stable
     new c7ede99  [backup] Ensure listed backups are sorted by table

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kudu/backup/KuduBackupLister.scala  |   3 +-
 src/kudu/client/master_proxy_rpc.cc                |   4 +-
 src/kudu/client/txn_manager_proxy_rpc.cc           |   4 +-
 src/kudu/integration-tests/test_workload.cc        |  57 +++++++-
 src/kudu/integration-tests/test_workload.h         |  56 +++++++-
 src/kudu/integration-tests/txn_write_ops-itest.cc  | 148 ++++++++++++++++++++-
 src/kudu/tablet/txn_participant-test.cc            |  20 +++
 src/kudu/tablet/txn_participant.cc                 |   1 +
 src/kudu/tablet/txn_participant.h                  |   2 +
 src/kudu/util/logging.cc                           |   2 +-
 10 files changed, 283 insertions(+), 14 deletions(-)

[kudu] 02/05: [client] return TimedOut if the deadline has passed in master RPCs

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 10d8e320bdc6902ae223038c8c406cc6774a6075
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Apr 29 16:28:17 2021 -0700

    [client] return TimedOut if the deadline has passed in master RPCs
    
    There was a narrow window in which we could return a NetworkError to
    users of the master-bound RPCs instead of a TimedOut error. Without this
    change I saw ClientTest.NoTxnManager fail 1/300 times and
    TxnStatusTableITest.TestSystemClientMasterDown fail ~1/3 of the time
    running in a loop via dist-test.
    
    With this change, I ran it with no issue 1000/1000 times each.
    
    This is a follow-up to 96047a8d861d61673b9e2589930eccd78a16e483.
    
    Change-Id: I309ba70d62a48b4dc8fb0e8f30f532593c254860
    Reviewed-on: http://gerrit.cloudera.org:8080/17367
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/master_proxy_rpc.cc      | 4 +++-
 src/kudu/client/txn_manager_proxy_rpc.cc | 4 +++-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/src/kudu/client/master_proxy_rpc.cc b/src/kudu/client/master_proxy_rpc.cc
index 975a7af..da4bb1e 100644
--- a/src/kudu/client/master_proxy_rpc.cc
+++ b/src/kudu/client/master_proxy_rpc.cc
@@ -37,6 +37,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/status_callback.h"
 
@@ -241,7 +242,8 @@ bool AsyncLeaderMasterRpc<ReqClass, RespClass>::RetryOrReconnectIfNecessary(
       return true;
     }
     // And if we've passed the overall deadline, we shouldn't retry.
-    s = s.CloneAndPrepend(Substitute("$0 timed out after deadline expired", rpc_name_));
+    s = Status::TimedOut(Substitute("$0 timed out after deadline expired: $1",
+                                    rpc_name_, s.message().ToString()));
   }
 
   // Next, parse RPC errors that happened after the connection succeeded.
diff --git a/src/kudu/client/txn_manager_proxy_rpc.cc b/src/kudu/client/txn_manager_proxy_rpc.cc
index cf72650..ac7c03f 100644
--- a/src/kudu/client/txn_manager_proxy_rpc.cc
+++ b/src/kudu/client/txn_manager_proxy_rpc.cc
@@ -37,6 +37,7 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/status_callback.h"
 
@@ -184,7 +185,8 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
       return true;
     }
     // And if we've passed the overall deadline, we shouldn't retry.
-    s = s.CloneAndPrepend(Substitute("$0 timed out after deadline expired", rpc_name_));
+    s = Status::TimedOut(Substitute("$0 timed out after deadline expired: $1",
+                                    rpc_name_, s.message().ToString()));
   }
 
   // Next, parse RPC errors that happened after the connection succeeded.

[kudu] 01/05: KUDU-2612: fix race when adopting partition lock

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 4c53ce892a083ff660f4b7af2374fd6b273329f8
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Wed Apr 28 21:26:41 2021 -0700

    KUDU-2612: fix race when adopting partition lock
    
    The way we were transferring the partition lock to the Txn wasn't
    thread-safe. This patch wraps the critical section with a spinlock.
    
    This patch includes a test that would trigger a TSAN error.
    
    Change-Id: Ifc7ac7f474baf308860847298355b300c76d9ef5
    Reviewed-on: http://gerrit.cloudera.org:8080/17361
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/tablet/txn_participant-test.cc | 20 ++++++++++++++++++++
 src/kudu/tablet/txn_participant.cc      |  1 +
 src/kudu/tablet/txn_participant.h       |  2 ++
 3 files changed, 23 insertions(+)

diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index cedcebd..41ceb44 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -39,6 +39,7 @@
 #include "kudu/common/iterator.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol.h"
@@ -389,6 +390,25 @@ TEST_F(TxnParticipantTest, TestConcurrentTransactions) {
   }
 }
 
+TEST_F(TxnParticipantTest, TestConcurrentWrites) {
+  constexpr const auto kNumRows = 10;
+  constexpr const auto kTxnId = 0;
+  ParticipantResponsePB resp;
+  ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
+                              ParticipantOpPB::BEGIN_TXN, -1, &resp));
+  vector<thread> threads;
+  Status statuses[kNumRows];
+  for (int i = 0; i < kNumRows; i++) {
+    threads.emplace_back([&, i] {
+      statuses[i] = Write(i, kTxnId);
+    });
+  }
+  std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); });
+  for (const auto& s : statuses) {
+    EXPECT_OK(s);
+  }
+}
+
 // Concurrently try to apply every op and test, based on the results, that some
 // invariants are maintained.
 TEST_F(TxnParticipantTest, TestConcurrentOps) {
diff --git a/src/kudu/tablet/txn_participant.cc b/src/kudu/tablet/txn_participant.cc
index 4a63d53..1f72eb5 100644
--- a/src/kudu/tablet/txn_participant.cc
+++ b/src/kudu/tablet/txn_participant.cc
@@ -77,6 +77,7 @@ void Txn::AcquireReadLock(shared_lock<rw_semaphore>* txn_lock) {
 void Txn::AdoptPartitionLock(ScopedPartitionLock partition_lock) {
   if (PREDICT_TRUE(FLAGS_enable_txn_partition_lock)) {
     TabletServerErrorPB::Code code = tserver::TabletServerErrorPB::UNKNOWN_ERROR;
+    std::lock_guard<simple_spinlock> l(lock_);
 #ifndef NDEBUG
     CHECK(partition_lock.IsAcquired(&code)) << code;
     if (partition_lock_.IsAcquired(&code)) {
diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h
index 92b7b00..43800ae 100644
--- a/src/kudu/tablet/txn_participant.h
+++ b/src/kudu/tablet/txn_participant.h
@@ -265,6 +265,7 @@ class Txn : public RefCountedThreadSafe<Txn> {
   }
 
   void ReleasePartitionLock() {
+    std::lock_guard<simple_spinlock> l(lock_);
     partition_lock_.Release();
   }
 
@@ -358,6 +359,7 @@ class Txn : public RefCountedThreadSafe<Txn> {
   std::unique_ptr<ScopedOp> commit_op_;
 
   // Holds the partition lock acquired for this transaction.
+  simple_spinlock lock_;
   ScopedPartitionLock partition_lock_;
 
   DISALLOW_COPY_AND_ASSIGN(Txn);

[kudu] 03/05: [tests] enable using txns in TestWorkload

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit fcfe6dbb3466df0edf368f5d7257a7766f9a1dc5
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Mon Apr 19 13:12:45 2021 -0700

    [tests] enable using txns in TestWorkload
    
    I have an upcoming test in which it'd convenient to have an easy means
    to generate a transactional workload. This patch introduces some options
    to the TestWorkload class that satisfy this need:
    - set_begin_txn()
    - set_commit_txn()
    - set_rollback_txn()
    - set_txn_id(int64_t txn_id)
    
    Change-Id: Ia81daac8fcfd552603a0302c3d9aa411ea082ab1
    Reviewed-on: http://gerrit.cloudera.org:8080/17326
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/integration-tests/test_workload.cc       |  57 ++++++++-
 src/kudu/integration-tests/test_workload.h        |  56 +++++++-
 src/kudu/integration-tests/txn_write_ops-itest.cc | 148 +++++++++++++++++++++-
 3 files changed, 251 insertions(+), 10 deletions(-)

diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index f646a70..51e089a 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -19,6 +19,7 @@
 
 #include <memory>
 #include <ostream>
+#include <string>
 
 #include <glog/logging.h>
 
@@ -27,12 +28,14 @@
 #include "kudu/client/schema.h"
 #include "kudu/client/write_op.h"
 #include "kudu/common/partial_row.h"
+#include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/gutil/mathlimits.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/integration-tests/data_gen_util.h"
 #include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/transactions/transactions.pb.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_util.h"
@@ -48,9 +51,12 @@ using kudu::client::KuduSchema;
 using kudu::client::KuduSession;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
+using kudu::client::KuduTransaction;
 using kudu::client::KuduUpdate;
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::MiniCluster;
+using kudu::transactions::TxnTokenPB;
+using std::string;
 using std::unique_ptr;
 using std::vector;
 
@@ -71,6 +77,10 @@ TestWorkload::TestWorkload(MiniCluster* cluster,
     write_batch_size_(50),
     write_interval_millis_(0),
     write_timeout_millis_(20000),
+    txn_id_(TxnId::kInvalidTxnId),
+    begin_txn_(false),
+    commit_txn_(false),
+    rollback_txn_(false),
     fault_tolerant_(true),
     verify_num_rows_(true),
     read_errors_allowed_(false),
@@ -141,7 +151,12 @@ void TestWorkload::WriteThread() {
   shared_ptr<KuduTable> table;
   OpenTable(&table);
 
-  shared_ptr<KuduSession> session = client_->NewSession();
+  shared_ptr<KuduSession> session;
+  if (txn_) {
+    CHECK_OK(txn_->CreateSession(&session));
+  } else {
+    session = client_->NewSession();
+  }
   session->SetTimeoutMillis(write_timeout_millis_);
   CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
 
@@ -247,7 +262,8 @@ void TestWorkload::ReadThread() {
     // Note: when INSERT_RANDOM_ROWS_WITH_DELETE is used, ReadThread doesn't really verify
     // anything except that a scan works.
     int64_t expected_min_rows = 0;
-    if (write_pattern_ != INSERT_RANDOM_ROWS_WITH_DELETE && verify_num_rows_) {
+    if (write_pattern_ != INSERT_RANDOM_ROWS_WITH_DELETE && verify_num_rows_ &&
+        !begin_txn_ && !txn_id_.IsValid()) {
       expected_min_rows = rows_inserted_.Load();
     }
     size_t row_count = 0;
@@ -290,6 +306,12 @@ shared_ptr<KuduClient> TestWorkload::CreateClient() {
 }
 
 void TestWorkload::Setup() {
+  if (begin_txn_) {
+    CHECK(!txn_id_.IsValid()) << "Cannot begin txn and supply txn ID at the same time";
+  }
+  if (commit_txn_ || rollback_txn_) {
+    CHECK(txn_id_.IsValid() || begin_txn_) << "Must participate in a txn to commit or abort";
+  }
   if (!client_) {
     CHECK_OK(cluster_->CreateClient(&client_builder_, &client_));
   }
@@ -396,6 +418,29 @@ void TestWorkload::Start() {
   CHECK(!should_run_.Load()) << "Already started";
   should_run_.Store(true);
   start_latch_.Reset(num_write_threads_ + num_read_threads_);
+  if (txn_id_.IsValid()) {
+    // TODO(awong): add an API to set the keepalive. For now just use an
+    // arbitrary, short default value.
+    CHECK(!txn_);
+    TxnTokenPB txn_token_pb;
+    txn_token_pb.set_txn_id(txn_id_.value());
+    txn_token_pb.set_enable_keepalive(true);
+    txn_token_pb.set_keepalive_millis(1000);
+    string txn_token_str;
+    CHECK(txn_token_pb.SerializeToString(&txn_token_str));
+    CHECK_OK(KuduTransaction::Deserialize(client_, txn_token_str, &txn_));
+  }
+  if (begin_txn_) {
+    CHECK(!txn_);
+    CHECK(!txn_id_.IsValid());
+    CHECK_OK(client_->NewTransaction(&txn_));
+    string txn_str;
+    CHECK_OK(txn_->Serialize(&txn_str));
+    TxnTokenPB txn_token_pb;
+    CHECK(txn_token_pb.ParseFromString(txn_str));
+    CHECK(txn_token_pb.has_txn_id());
+    txn_id_ = TxnId(txn_token_pb.txn_id());
+  }
   for (int i = 0; i < num_write_threads_; i++) {
     threads_.emplace_back(&TestWorkload::WriteThread, this);
   }
@@ -422,6 +467,14 @@ void TestWorkload::StopAndJoin() {
     t.join();
   }
   threads_.clear();
+  if (txn_) {
+    if (commit_txn_) {
+      CHECK_OK(txn_->Commit());
+    }
+    if (rollback_txn_) {
+      CHECK_OK(txn_->Rollback());
+    }
+  }
 }
 
 } // namespace kudu
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index fb86b76..57ef175 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -31,6 +31,7 @@
 #include "kudu/client/client.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/common/txn_id.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/countdown_latch.h"
@@ -77,6 +78,44 @@ class TestWorkload {
       PartitioningType partitioning = PartitioningType::RANGE);
   ~TestWorkload();
 
+  // Ingest the workload as part of the given transaction. set_begin_txn() must
+  // not be called if this is set.
+  void set_txn_id(int64_t txn_id) {
+    txn_id_ = TxnId(txn_id);
+    CHECK(txn_id_.IsValid());
+  }
+
+  // Ingest the workload as a part of a new transaction. set_txn_id() must not
+  // be called if this is set.
+  void set_begin_txn() {
+    begin_txn_ = true;
+    CHECK(!txn_id_.IsValid());
+  }
+
+  // Commit the transaction that this workload is a part of upon calling
+  // StopAndJoin(). If set, either set_begin_txn() or set_txn_id() must be set
+  // as well. If not set, but either set_begin_txn() or set_txn_id() is set,
+  // the workload will ingest as a part of the transaction, but not call
+  // commit on completion.
+  //
+  // set_rollback_txn() must not be called if this is set.
+  void set_commit_txn() {
+    commit_txn_ = true;
+    CHECK(!rollback_txn_);
+  }
+
+  // Abort the transaction that this workload is a part of upon calling
+  // StopAndJoin(). If set, either set_begin_txn() or set_txn_id() must be set
+  // as well. If not set, but either set_begin_txn() or set_txn_id() is set,
+  // the workload will ingest as a part of the transaction, but not call abort
+  // on completion.
+  //
+  // set_commit_txn() must not be called if this is set.
+  void set_rollback_txn() {
+    rollback_txn_ = true;
+    CHECK(!commit_txn_);
+  }
+
   // Sets whether the read thread should crash if scanning to the cluster fails
   // for whatever reason. If set to true, errors will be populated in
   // 'read_errors_'.
@@ -152,6 +191,9 @@ class TestWorkload {
 
   // Set whether we should attempt to verify the number of rows when scanning.
   // An incorrect number of rows may be indicative of a stale read.
+  //
+  // If either set_begin_txn() or set_txn_id() has been called, does not verify
+  // the number of rows.
   void set_verify_num_rows(bool should_verify) {
     verify_num_rows_ = should_verify;
   }
@@ -250,8 +292,14 @@ class TestWorkload {
   // Delete created table, etc.
   Status Cleanup();
 
+  int64_t txn_id() const {
+    CHECK(txn_id_.IsValid());
+    return txn_id_.value();
+  }
+
   // Return the number of rows inserted so far. This may be called either
-  // during or after the write workload.
+  // during or after the write workload. If writing as a part of a transaction,
+  // these rows may have not been committed.
   int64_t rows_inserted() const {
     return rows_inserted_.Load();
   }
@@ -298,6 +346,10 @@ class TestWorkload {
   int write_batch_size_;
   int write_interval_millis_;
   int write_timeout_millis_;
+  TxnId txn_id_;
+  bool begin_txn_;
+  bool commit_txn_;
+  bool rollback_txn_;
   bool fault_tolerant_;
   bool verify_num_rows_;
   bool read_errors_allowed_;
@@ -321,6 +373,8 @@ class TestWorkload {
   AtomicInt<int64_t> batches_completed_;
   AtomicInt<int32_t> sequential_key_gen_;
 
+  client::sp::shared_ptr<client::KuduTransaction> txn_;
+
   std::vector<std::thread> threads_;
 
   mutable simple_spinlock read_error_lock_;
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index b53c796..1aabbc1 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -60,6 +60,7 @@
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/rpc/rpc_controller.h"
@@ -201,6 +202,12 @@ Status CountRows(KuduTable* table, size_t* num_rows) {
   return Status::OK();
 }
 
+Status CountRows(KuduClient* client, const string& table_name, size_t* num_rows) {
+  shared_ptr<KuduTable> table;
+  RETURN_NOT_OK(client->OpenTable(table_name, &table));
+  return CountRows(table.get(), num_rows);
+}
+
 Status GetSingleRowError(KuduSession* session) {
   vector<KuduError*> errors;
   ElementDeleter drop(&errors);
@@ -932,7 +939,7 @@ class TxnOpDispatcherITest : public KuduTest {
     CHECK_OK(BuildSchema(&schema_));
   }
 
-  void Prepare(int num_tservers, bool create_table = true, int num_replicas = 0) {
+  void SetupCluster(int num_tservers, int num_replicas = 0) {
     if (num_replicas == 0) {
       num_replicas = num_tservers;
     }
@@ -944,7 +951,13 @@ class TxnOpDispatcherITest : public KuduTest {
     opts.num_tablet_servers = num_tservers;
     cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
     ASSERT_OK(cluster_->StartSync());
+  }
 
+  void Prepare(int num_tservers, bool create_table = true, int num_replicas = 0) {
+    if (num_replicas == 0) {
+      num_replicas = num_tservers;
+    }
+    NO_FATALS(SetupCluster(num_tservers, num_replicas));
     KuduClientBuilder builder;
     builder.default_admin_operation_timeout(kTimeout);
     ASSERT_OK(cluster_->CreateClient(&builder, &client_));
@@ -992,14 +1005,15 @@ class TxnOpDispatcherITest : public KuduTest {
   }
 
   // Get all replicas of the test table.
-  vector<scoped_refptr<TabletReplica>> GetAllReplicas() const {
+  vector<scoped_refptr<TabletReplica>> GetAllReplicas(const string& table_name = "") const {
+    const string& target_table = table_name.empty() ? kTableName : table_name;
     vector<scoped_refptr<TabletReplica>> result;
     for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
       auto* server = cluster_->mini_tablet_server(i)->server();
       vector<scoped_refptr<TabletReplica>> replicas;
       server->tablet_manager()->GetTabletReplicas(&replicas);
       for (auto& r : replicas) {
-        if (r->tablet()->metadata()->table_name() == kTableName) {
+        if (r->tablet()->metadata()->table_name() == target_table) {
           result.emplace_back(std::move(r));
         }
       }
@@ -1008,10 +1022,11 @@ class TxnOpDispatcherITest : public KuduTest {
   }
 
   size_t GetTxnOpDispatchersTotalCount(
-      vector<scoped_refptr<TabletReplica>> replicas = {}) {
+      vector<scoped_refptr<TabletReplica>> replicas = {},
+      const string& table_name = "") {
     if (replicas.empty()) {
       // No replicas were specified, get the list of all test table's replicas.
-      replicas = GetAllReplicas();
+      replicas = GetAllReplicas(table_name);
     }
     size_t elem_count = 0;
     for (auto& r : replicas) {
@@ -1041,8 +1056,8 @@ class TxnOpDispatcherITest : public KuduTest {
   typedef vector<std::shared_ptr<typename TabletReplica::TxnOpDispatcher>>
       OpDispatchers;
   typedef map<int64_t, OpDispatchers> OpDispatchersPerTxnId;
-  OpDispatchersPerTxnId GetTxnOpDispatchers() {
-    auto replicas = GetAllReplicas();
+  OpDispatchersPerTxnId GetTxnOpDispatchers(const string& table_name = "") {
+    auto replicas = GetAllReplicas(table_name);
     OpDispatchersPerTxnId result;
     for (auto& r : replicas) {
       std::lock_guard<simple_spinlock> guard(r->txn_op_dispatchers_lock_);
@@ -2191,4 +2206,123 @@ TEST_F(TxnOpDispatcherITest, DISABLED_TxnMultipleSingleRowsWithServerRestart) {
   }
 }
 
+// Test beginning and aborting a transaction from the same test workload.
+TEST_F(TxnOpDispatcherITest, TestBeginAbortTransactionalTestWorkload) {
+  NO_FATALS(SetupCluster(1));
+  TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH);
+  w.set_num_replicas(1);
+  w.set_num_tablets(3);
+  w.set_begin_txn();
+  w.set_rollback_txn();
+  w.Setup();
+  w.Start();
+  const auto& table_name = w.table_name();
+  while (w.rows_inserted() < 1000) {
+    SleepFor(MonoDelta::FromMilliseconds(5));
+  }
+  // Each participant should have a dispatcher.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, table_name));
+  });
+  w.StopAndJoin();
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, table_name));
+  });
+  // By the end of it, we should have aborted the rows and they should not be
+  // visible to clients.
+  size_t num_rows;
+  ASSERT_OK(CountRows(w.client().get(), table_name, &num_rows));
+  ASSERT_EQ(0, num_rows);
+}
+
+// Test beginning and committing a transaction from the same test workload.
+TEST_F(TxnOpDispatcherITest, TestBeginCommitTransactionalTestWorkload) {
+  NO_FATALS(SetupCluster(1));
+  TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH);
+  w.set_num_replicas(1);
+  w.set_num_tablets(3);
+  w.set_begin_txn();
+  w.set_commit_txn();
+  w.Setup();
+  w.Start();
+  const auto& table_name = w.table_name();
+  while (w.rows_inserted() < 1000) {
+    SleepFor(MonoDelta::FromMilliseconds(5));
+  }
+  // Each participant should have a dispatcher.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, table_name));
+  });
+  w.StopAndJoin();
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, table_name));
+  });
+  // By the end of it, we should have committed the rows and they should be
+  // visible to clients.
+  size_t num_rows;
+  ASSERT_OK(CountRows(w.client().get(), table_name, &num_rows));
+  ASSERT_EQ(w.rows_inserted(), num_rows);
+}
+
+// Test beginning and committing a transaction from separate test workloads.
+TEST_F(TxnOpDispatcherITest, TestSeparateBeginCommitTestWorkloads) {
+  NO_FATALS(SetupCluster(1));
+  int64_t txn_id;
+  string first_table_name;
+  size_t first_rows_inserted;
+  {
+    TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH);
+    w.set_begin_txn();
+    w.set_num_replicas(1);
+    w.set_num_tablets(3);
+    w.Setup();
+    w.Start();
+    while (w.rows_inserted() < 1000) {
+      SleepFor(MonoDelta::FromMilliseconds(5));
+    }
+    first_table_name = w.table_name();
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, first_table_name));
+    });
+    w.StopAndJoin();
+    first_rows_inserted = w.rows_inserted();
+    txn_id = w.txn_id();
+    size_t num_rows;
+    ASSERT_OK(CountRows(w.client().get(), first_table_name, &num_rows));
+    ASSERT_EQ(0, num_rows);
+  }
+  // Create a new workload, and insert as a part of the same transaction.
+  {
+    TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH);
+    const auto& kSecondTableName = "default.second_table";
+    w.set_txn_id(txn_id);
+    w.set_commit_txn();
+    w.set_table_name(kSecondTableName);
+    w.set_num_replicas(1);
+    w.set_num_tablets(3);
+    w.Setup();
+    w.Start();
+    while (w.rows_inserted() < 1000) {
+      SleepFor(MonoDelta::FromMilliseconds(5));
+    }
+    // We should have dispatchers for both tables.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, first_table_name));
+      ASSERT_EQ(3, GetTxnOpDispatchersTotalCount({}, kSecondTableName));
+    });
+    w.StopAndJoin();
+    // Once committed, the dispatchers should be unregistered.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, first_table_name));
+      ASSERT_EQ(0, GetTxnOpDispatchersTotalCount({}, kSecondTableName));
+    });
+    size_t num_rows;
+    ASSERT_OK(CountRows(w.client().get(), first_table_name, &num_rows));
+    ASSERT_EQ(first_rows_inserted, num_rows);
+    ASSERT_OK(CountRows(w.client().get(), kSecondTableName, &num_rows));
+    ASSERT_EQ(w.rows_inserted(), num_rows);
+  }
+}
+
+
 } // namespace kudu

[kudu] 04/05: KUDU-2754: Mark max_log_files as stable

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 00c1b674545541a8a2a17aaca0738136540eb8b8
Author: Grant Henke <gr...@apache.org>
AuthorDate: Thu Apr 29 09:27:13 2021 -0500

    KUDU-2754: Mark max_log_files as stable
    
    The max_log_files feature has existed and been in use since 2016.
    This patch marks the flag as stable so it can be used without
    the need of unlocking experimental flags.
    
    Change-Id: Icd8ca1ce314cc0d333c57a6582165da475224afb
    Reviewed-on: http://gerrit.cloudera.org:8080/17362
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 src/kudu/util/logging.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/kudu/util/logging.cc b/src/kudu/util/logging.cc
index 96b9a54..3ec98f2 100644
--- a/src/kudu/util/logging.cc
+++ b/src/kudu/util/logging.cc
@@ -68,7 +68,7 @@ DEFINE_int32(max_log_files, 10,
     "Maximum number of log files to retain per severity level. The most recent "
     "log files are retained. If set to 0, all log files are retained.");
 TAG_FLAG(max_log_files, runtime);
-TAG_FLAG(max_log_files, experimental);
+TAG_FLAG(max_log_files, stable);
 
 #define PROJ_NAME "kudu"
 

[kudu] 05/05: [backup] Ensure listed backups are sorted by table

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c7ede99d71ef9343f2ccdd57c147d0895223b1b7
Author: Grant Henke <gr...@apache.org>
AuthorDate: Fri Apr 30 09:45:50 2021 -0500

    [backup] Ensure listed backups are sorted by table
    
    Recently we saw a weird failure in the tests that list all the
    backup graphs (instead of providing exact names) where
    the order was descending instead of ascending. This patch
    forces the backup graphs to be sorted to ensure the listing
    order stays consistent.
    
    Change-Id: Icad68f7a3d399c1a5fc302e6328143d4b01227a0
    Reviewed-on: http://gerrit.cloudera.org:8080/17375
    Tested-by: Grant Henke <gr...@apache.org>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 .../src/main/scala/org/apache/kudu/backup/KuduBackupLister.scala       | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/KuduBackupLister.scala b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/KuduBackupLister.scala
index 8303da9..0fe39d5 100644
--- a/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/KuduBackupLister.scala
+++ b/java/kudu-backup-tools/src/main/scala/org/apache/kudu/backup/KuduBackupLister.scala
@@ -43,7 +43,8 @@ object KuduBackupLister {
     val io: BackupIO = new BackupIO(new Configuration(), options.rootPath)
     val backupGraphs =
       if (sortedTables.isEmpty)
-        io.readAllBackupGraphs()
+        // Sort by table name for a consistent ordering.
+        io.readAllBackupGraphs().sortBy(_.backupBase.metadata.getTableName)
       else
         io.readBackupGraphsByTableName(sortedTables)