You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/04/27 19:12:39 UTC

[kudu] branch master updated: KUDU-2612: propagate commit timestamp (C++ client)

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new e495d6b  KUDU-2612: propagate commit timestamp (C++ client)
e495d6b is described below

commit e495d6bb759fdae7cd001d86df3bae5c4f5f2b36
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Apr 23 16:39:40 2021 -0700

    KUDU-2612: propagate commit timestamp (C++ client)
    
    With this patch, the commit timestamp for a non-empty transaction
    is propagated to a Kudu C++ client upon calling
    KuduTransaction::IsCommitComplete() (that means that committing
    a transaction synchronously via KuduTransaction::Commit() propagates
    the commit timestamp as well).
    
    Updating the last observed timestamp with the commit timestamp is
    necessary to achieve consistency in the READ_YOUR_WRITES mode when
    reading the data of a transaction which has just been committed.  The
    commit phase might take some time and may even be retried in some cases,
    so even the client observed timestamps for all the write operations
    it sent in the context this transaction, the maximum timestamp collected
    among the involved transaction participants might be far ahead of the
    last timestamp observed by the client so far.
    
    Change-Id: If3ff321895a25326b962a47d5fa868e45aefcb4d
    Reviewed-on: http://gerrit.cloudera.org:8080/17345
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/client/transaction-internal.cc           |  15 +++
 src/kudu/integration-tests/txn_write_ops-itest.cc | 113 +++++++++++++++++++++-
 src/kudu/master/txn_manager.proto                 |   6 +-
 src/kudu/master/txn_manager_service.cc            |   7 ++
 src/kudu/transactions/txn_status_manager.cc       |   3 +-
 src/kudu/transactions/txn_system_client.cc        |  10 +-
 6 files changed, 142 insertions(+), 12 deletions(-)

diff --git a/src/kudu/client/transaction-internal.cc b/src/kudu/client/transaction-internal.cc
index d639dac..0aed483 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -344,6 +344,21 @@ Status KuduTransaction::Data::IsCommitCompleteImpl(
         return Status::IllegalState(errmsg);
       }
   }
+  // Update last observed timestamp -- this is necessary to achieve consistency
+  // in the READ_YOUR_WRITES mode when trying to read back the data of a
+  // transaction that has just been committed. The commit phase might take some
+  // time and may even be retried when the involved participants are not
+  // available, so even the client has observed timestamps for all the write
+  // operations sent in the context this transaction, the maximum timestamp
+  // among all the transaction participants might be far ahead of the latest
+  // timestamp observed by the client while performing the transactional write
+  // operations.
+  // NOTE: an empty transaction doesn't have the commit timestamp assigned.
+  if (resp.has_commit_timestamp()) {
+    DCHECK(state == TxnStatePB::COMMITTED ||
+           state == TxnStatePB::FINALIZE_IN_PROGRESS);
+    client->data_->UpdateLatestObservedTimestamp(resp.commit_timestamp());
+  }
   return Status::OK();
 }
 
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index 2bab38d..7554b78 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -186,6 +186,7 @@ int64_t GetTxnId(const shared_ptr<KuduTransaction>& txn) {
 
 Status CountRows(KuduTable* table, size_t* num_rows) {
   KuduScanner scanner(table);
+  RETURN_NOT_OK(scanner.SetReadMode(KuduScanner::READ_YOUR_WRITES));
   RETURN_NOT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
   RETURN_NOT_OK(scanner.Open());
   size_t count = 0;
@@ -295,9 +296,113 @@ class TxnWriteOpsITest : public ExternalMiniClusterITestBase {
   string tablet_uuid_;
 };
 
+// Make sure txn commit timestamp is being propagated to a client with a call
+// to KuduTransaction::IsCommitComplete(). This includes committing a
+// transaction synchronously by calling KuduTransaction::Commit().
+TEST_F(TxnWriteOpsITest, CommitTimestampPropagation) {
+  static constexpr int kRowsNum = 1000;
+
+  const vector<string> master_flags = {
+    // Enable TxnManager in Kudu masters.
+    // TODO(aserbin): remove this customization once the flag is 'on' by default
+    "--txn_manager_enabled=true",
+
+    // Scenarios based on this test fixture assume the txn status table
+    // is created at start, not on first transaction-related operation.
+    "--txn_manager_lazily_initialized=false",
+  };
+  NO_FATALS(StartCluster({}, master_flags, kNumTabletServers));
+  NO_FATALS(Prepare());
+
+  // Start a transaction, write a bunch or rows into the test table, and then
+  // commit the transaction asynchronously. Check for transaction status and
+  // make sure the latest observed timestamp changes accordingly once the
+  // transaction is committed.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    shared_ptr<KuduSession> session;
+    ASSERT_OK(txn->CreateSession(&session));
+    ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+    NO_FATALS(InsertRows(table_.get(), session.get(), kRowsNum));
+    ASSERT_OK(session->Flush());
+    ASSERT_EQ(0, session->CountPendingErrors());
+
+    const auto ts_before_commit = client_->GetLatestObservedTimestamp();
+    ASSERT_OK(txn->Commit(false));
+    const auto ts_after_commit_async = client_->GetLatestObservedTimestamp();
+    ASSERT_EQ(ts_before_commit, ts_after_commit_async);
+
+    uint64_t ts_after_committed = 0;
+    ASSERT_EVENTUALLY([&] {
+      bool is_complete = false;
+      Status completion_status;
+      ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+      ASSERT_TRUE(is_complete);
+      ts_after_committed = client_->GetLatestObservedTimestamp();
+    });
+    ASSERT_GT(ts_after_committed, ts_before_commit);
+
+    // A sanity check: calling IsCommitComplete() again after the commit
+    // timestamp has been propagated doesn't change the timestamp observed
+    // by the client.
+    for (auto i = 0; i < 10; ++i) {
+      bool is_complete = false;
+      Status completion_status;
+      ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+      ASSERT_TRUE(is_complete);
+      ASSERT_EQ(ts_after_committed, client_->GetLatestObservedTimestamp());
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+
+    size_t count;
+    ASSERT_OK(CountRows(table_.get(), &count));
+    ASSERT_EQ(kRowsNum, count);
+  }
+
+  // Start a transaction, write a bunch or rows into the test table, and then
+  // commit the transaction synchronously. Make sure the latest observed
+  // timestamp changes accordingly once the transaction is committed.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    shared_ptr<KuduSession> session;
+    ASSERT_OK(txn->CreateSession(&session));
+    ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+    NO_FATALS(InsertRows(table_.get(), session.get(), kRowsNum, kRowsNum));
+    ASSERT_OK(session->Flush());
+    ASSERT_EQ(0, session->CountPendingErrors());
+
+    const auto ts_before_commit = client_->GetLatestObservedTimestamp();
+    ASSERT_OK(txn->Commit());
+    const auto ts_after_sync_commit = client_->GetLatestObservedTimestamp();
+    ASSERT_GT(ts_after_sync_commit, ts_before_commit);
+
+    size_t count;
+    ASSERT_OK(CountRows(table_.get(), &count));
+    ASSERT_EQ(2 * kRowsNum, count);
+  }
+
+  // An empty transaction doesn't have a timestamp, so there is nothing to
+  // propagate back to client when an empty transaction is committed.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+
+    const auto ts_before_commit = client_->GetLatestObservedTimestamp();
+    ASSERT_OK(txn->Commit());
+    const auto ts_after_sync_commit = client_->GetLatestObservedTimestamp();
+    ASSERT_EQ(ts_before_commit, ts_after_sync_commit);
+
+    size_t count;
+    ASSERT_OK(CountRows(table_.get(), &count));
+    ASSERT_EQ(2 * kRowsNum, count);
+  }
+}
+
 // Test that our deadlock prevention mechanisms work by writing across
 // different tablets concurrently from multiple transactions.
-TEST_F(TxnWriteOpsITest, TestDeadlockPrevention) {
+TEST_F(TxnWriteOpsITest, DeadlockPrevention) {
   constexpr const int kNumTxns = 8;
   const vector<string> master_flags = {
     "--txn_manager_enabled=true",
@@ -307,8 +412,7 @@ TEST_F(TxnWriteOpsITest, TestDeadlockPrevention) {
     "--txn_manager_lazily_initialized=false",
   };
   NO_FATALS(StartCluster({}, master_flags, kNumTabletServers));
-  vector<string> tablets_uuids;
-  NO_FATALS(Prepare(&tablets_uuids));
+  NO_FATALS(Prepare());
   vector<thread> threads;
   threads.reserve(kNumTxns);
   vector<int> random_keys(kNumTxns * 2);
@@ -391,8 +495,7 @@ TEST_F(TxnWriteOpsITest, TxnMultipleSingleRowWritesCommit) {
   };
   NO_FATALS(StartCluster({}, master_flags, kNumTabletServers));
 
-  vector<string> tablets_uuids;
-  NO_FATALS(Prepare(&tablets_uuids));
+  NO_FATALS(Prepare());
   shared_ptr<KuduTransaction> txn;
   ASSERT_OK(client_->NewTransaction(&txn));
   shared_ptr<KuduSession> session;
diff --git a/src/kudu/master/txn_manager.proto b/src/kudu/master/txn_manager.proto
index 1915c85..5b68161 100644
--- a/src/kudu/master/txn_manager.proto
+++ b/src/kudu/master/txn_manager.proto
@@ -94,8 +94,12 @@ message GetTransactionStateResponsePB {
 
   // The transaction state at the time of processing the request.
   optional TxnStatePB state = 2;
-}
 
+  // Commit timestamp associated with the transaction if it is a non-empty one
+  // (i.e. has at least one row written) and has already been in the COMMITTED
+  // or FINALIZE_IN_PROGRESS state at the time of processing the request.
+  optional fixed64 commit_timestamp = 3;
+}
 
 message KeepTransactionAliveRequestPB {
   optional int64 txn_id = 1;
diff --git a/src/kudu/master/txn_manager_service.cc b/src/kudu/master/txn_manager_service.cc
index de26065..01d4a2c 100644
--- a/src/kudu/master/txn_manager_service.cc
+++ b/src/kudu/master/txn_manager_service.cc
@@ -120,6 +120,13 @@ void TxnManagerServiceImpl::GetTransactionState(
   if (PREDICT_TRUE(s.ok())) {
     DCHECK(txn_status.has_state());
     resp->set_state(txn_status.state());
+    // An empty transaction doesn't have a commit timestamp; only non-empty ones
+    // have their commit timestamps assigned and persisted.
+    if (txn_status.has_commit_timestamp()) {
+      DCHECK(txn_status.state() == TxnStatePB::COMMITTED ||
+             txn_status.state() == TxnStatePB::FINALIZE_IN_PROGRESS);
+      resp->set_commit_timestamp(txn_status.commit_timestamp());
+    }
   }
   CheckRespErrorOrSetUnknown(s, resp);
   return ctx->RespondSuccess();
diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc
index 3a7bef3..e130d85 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -1120,7 +1120,8 @@ Status TxnStatusManager::CompleteCommitTransaction(int64_t txn_id) {
 
   TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
   const auto& pb = txn_lock.data().pb;
-  const auto& state = pb.state();
+  DCHECK(pb.has_state());
+  const auto state = pb.state();
   if (state == TxnStatePB::COMMITTED) {
     return Status::OK();
   }
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index 67279d8..927b1ed 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -268,17 +268,17 @@ Status TxnSystemClient::GetTransactionStatus(int64_t txn_id,
                                            timeout,
                                            s.AsStatusCallback(),
                                            &result));
-  const auto ret = s.Wait();
-  if (ret.ok()) {
+  const auto rs = s.Wait();
+  if (rs.ok()) {
     // Retrieve the response and set corresponding output parameters.
     DCHECK(!result.has_op_error());
     DCHECK(result.has_txn_status());
+    DCHECK(result.txn_status().has_state());
     TxnStatusEntryPB ret;
-    ret.set_state(result.txn_status().state());
-    ret.set_allocated_user(result.mutable_txn_status()->release_user());
+    ret.Swap(result.mutable_txn_status());
     *txn_status = std::move(ret);
   }
-  return ret;
+  return rs;
 }
 
 Status TxnSystemClient::KeepTransactionAlive(int64_t txn_id,