You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/02/01 08:37:20 UTC

[kudu] branch master updated: KUDU-2612: loosen restrictions for BEGIN_COMMIT ops

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c5e655  KUDU-2612: loosen restrictions for BEGIN_COMMIT ops
8c5e655 is described below

commit 8c5e655d2d29e7758f26a373ea240631722ea1cf
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Mon Jan 25 18:28:20 2021 -0800

    KUDU-2612: loosen restrictions for BEGIN_COMMIT ops
    
    This patch updates the state validations for the BEGIN_COMMIT. Rather
    than only requiring an in-flight transaction to be kOpen (the common
    case) or kCommitInProgress (if re-attempting a BEGIN_COMMIT call), it's
    more robust to allow the call to be made even after the transaction has
    already been finalized. This will allow a to-be-merged implementation of
    a commit task to be retried with much less fuss.
    
    Here's some pseudo-code for such a commit task:
    1. persist COMMIT_IN_PROGRESS record on TxnStatusManager
    2. foreach participant as p: BEGIN_COMMIT(p)
    3. commit_ts = max timestamp used across BEGIN_COMMIT ops
    4. foreach participant as p: FINALIZE_COMMIT(p, commit_ts)
    5. persist COMMITTED record on TxnStatusManager
    
    If the commit task is interrupted (e.g. by some crash)  between steps 3
    and 5, we may be left with some participants with fully finalized
    commits. In such cases, all other participants _must_ also finalize, and
    they must finalize with the same timestamp. To ensure this, it must be
    possible to re-run a commit task. However, re-running it without this
    patch may lead to issues because the BEGIN_COMMIT ops would yield
    errors, complaining about an illegal state on participants that were
    finalized.
    
    This patch allows for a BEGIN_COMMIT op to succeed and return
    immediately if a FINALIZE_COMMIT op has already completed. If so, the
    finalized commit timestamp is sent back, allowing for the above commit
    task to be repeatable.
    
    Change-Id: Ifa4c5314190c84648c1b1edea7aab776b4882f97
    Reviewed-on: http://gerrit.cloudera.org:8080/16992
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 .../integration-tests/txn_participant-itest.cc     | 65 ++++++++++++++++++----
 src/kudu/tablet/tablet_metadata.cc                 | 10 +++-
 src/kudu/tablet/tablet_metadata.h                  |  7 ++-
 src/kudu/tablet/txn_participant-test.cc            | 27 ++++++++-
 src/kudu/tablet/txn_participant.h                  | 33 ++++++++++-
 5 files changed, 123 insertions(+), 19 deletions(-)

diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc
index fcbd083..b270bea 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -549,6 +549,49 @@ TEST_F(TxnParticipantITest, TestProxyBasicCalls) {
   }
 }
 
+TEST_F(TxnParticipantITest, TestBeginCommitAfterFinalize) {
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kTxnId = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
+  const auto tablet_id = replicas[kLeaderIdx]->tablet_id();
+  {
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    ASSERT_OK(ParticipateInTransactionCheckResp(
+        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN, &code));
+    ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+  }
+  // Commit the transaction.
+  Timestamp begin_commit_ts;
+  {
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    ASSERT_OK(ParticipateInTransactionCheckResp(
+        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+        &code, &begin_commit_ts));
+    ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+    ASSERT_NE(Timestamp::kInvalidTimestamp, begin_commit_ts);
+  }
+  {
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    ASSERT_OK(ParticipateInTransactionCheckResp(
+        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT, &code));
+    ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+  }
+  // A call to BEGIN_COMMIT should yield the finalized commit timestamp.
+  Timestamp refetched_begin_commit_ts;
+  {
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    Status s = ParticipateInTransactionCheckResp(
+        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT, &code,
+        &refetched_begin_commit_ts);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, code);
+    ASSERT_NE(Timestamp::kInvalidTimestamp, refetched_begin_commit_ts);
+    ASSERT_EQ(Timestamp(kDummyCommitTimestamp), refetched_begin_commit_ts);
+  }
+}
+
 TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
@@ -581,7 +624,7 @@ TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
     ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
   }
 
-  // Start commititng and ensure we can't start another transaction.
+  // Start committing and ensure we can't start another transaction.
   Timestamp begin_commit_ts;
   {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
@@ -609,22 +652,21 @@ TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
     ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
   }
 
-  // Finalize the commit and ensure we can do nothing else.
+  // Finalize the commit and ensure we can't begin or abort.
   {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
     ASSERT_OK(ParticipateInTransactionCheckResp(
         admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT, &code));
     ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
   }
-  {
+  for (auto type : { ParticipantOpPB::BEGIN_COMMIT, ParticipantOpPB::FINALIZE_COMMIT }) {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
     Status s = ParticipateInTransactionCheckResp(
-        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT, &code);
+        admin_proxy.get(), tablet_id, kTxnId, type, &code);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, code);
   }
   for (auto type : { ParticipantOpPB::BEGIN_TXN,
-                     ParticipantOpPB::BEGIN_COMMIT,
                      ParticipantOpPB::ABORT_TXN }) {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
     Status s = ParticipateInTransactionCheckResp(
@@ -816,21 +858,22 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) {
   ASSERT_EQ(refetched_begin_commit_ts, begin_commit_ts);
   NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kCommitInProgress, -1 } }));
 
-  // Once we finish committing, we should be unable to do anything.
+  // Once we finish committing, we should be unable to begin or abort.
   ASSERT_OK(txn_client->ParticipateInTransaction(
       tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, kDummyCommitTimestamp),
       kDefaultTimeout));
   NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}}));
-  for (const auto type : { ParticipantOpPB::BEGIN_TXN, ParticipantOpPB::BEGIN_COMMIT,
-                           ParticipantOpPB::ABORT_TXN }) {
+  for (const auto type : { ParticipantOpPB::BEGIN_TXN, ParticipantOpPB::ABORT_TXN }) {
     Status s = txn_client->ParticipateInTransaction(
         tablet_id, MakeParticipantOp(kTxnId, type), kDefaultTimeout);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
   }
   NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}}));
-  ASSERT_OK(txn_client->ParticipateInTransaction(
-      tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, kDummyCommitTimestamp),
-      kDefaultTimeout));
+  for (const auto type : { ParticipantOpPB::BEGIN_COMMIT, ParticipantOpPB::FINALIZE_COMMIT }) {
+    ASSERT_OK(txn_client->ParticipateInTransaction(
+        tablet_id, MakeParticipantOp(kTxnId, type, kDummyCommitTimestamp),
+        kDefaultTimeout));
+  }
   NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}}));
 }
 
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index ddd15a2..f572085 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -850,17 +850,25 @@ void TabletMetadata::AbortTransaction(int64_t txn_id, unique_ptr<MinLogIndexAnch
   anchors_needing_flush_.emplace_back(std::move(log_anchor));
 }
 
-bool TabletMetadata::HasTxnMetadata(int64_t txn_id, TxnState* state) {
+bool TabletMetadata::HasTxnMetadata(int64_t txn_id, TxnState* state, Timestamp* timestamp) {
   std::lock_guard<LockType> l(data_lock_);
   auto txn_meta = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
   if (txn_meta) {
     if (!state) return true;
     if (txn_meta->commit_timestamp()) {
       *state = kCommitted;
+      if (timestamp) {
+        DCHECK(txn_meta->commit_timestamp());
+        *timestamp = *txn_meta->commit_timestamp();
+      }
     } else if (txn_meta->aborted()) {
       *state = kAborted;
     } else if (txn_meta->commit_mvcc_op_timestamp()) {
       *state = kCommitInProgress;
+      if (timestamp) {
+        DCHECK(txn_meta->commit_mvcc_op_timestamp());
+        *timestamp = *txn_meta->commit_mvcc_op_timestamp();
+      }
     } else {
       *state = kOpen;
     }
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index e4ea591..b9edeeb 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -272,8 +272,11 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
   void AbortTransaction(int64_t txn_id, std::unique_ptr<log::MinLogIndexAnchorer> log_anchor);
 
   // Returns whether a given transaction has metadata, and if requested, what
-  // state the transaction is in.
-  bool HasTxnMetadata(int64_t txn_id, TxnState* state = nullptr);
+  // state the transaction is in. If set, 'timestamp' is set to the begin
+  // commit timestamp or finalized commit timestamp, depending on whether
+  // 'state' is kCommitInProgress or kCommitted respectively.
+  bool HasTxnMetadata(int64_t txn_id, TxnState* state = nullptr,
+                      Timestamp* timestamp = nullptr);
 
   // Returns the transaction IDs that were persisted as being in-flight,
   // terminal (committed or aborted), and having un-flushed MRSs.
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index 6a8fd74..9373110 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -269,7 +269,8 @@ TEST_F(TxnParticipantTest, TestTransactionNotFound) {
 }
 
 TEST_F(TxnParticipantTest, TestIllegalTransitions) {
-  const auto check_valid_op = [&] (const ParticipantOpPB::ParticipantOpType& type, int64_t txn_id) {
+  const auto check_valid_op = [&] (const ParticipantOpPB::ParticipantOpType& type,
+                                   int64_t txn_id) {
     ParticipantResponsePB resp;
     ASSERT_OK(CallParticipantOp(
         tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
@@ -278,7 +279,7 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
     ASSERT_TRUE(resp.has_timestamp());
   };
   const auto check_bad_ops = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
-                                       int64_t txn_id) {
+                                  int64_t txn_id) {
     for (const auto& type : ops) {
       ParticipantResponsePB resp;
       ASSERT_OK(CallParticipantOp(
@@ -291,6 +292,24 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
       ASSERT_FALSE(resp.has_timestamp());
     }
   };
+  const auto check_already_applied = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
+                                          int64_t txn_id, bool expect_timestamp) {
+    for (const auto& type : ops) {
+      ParticipantResponsePB resp;
+      ASSERT_OK(CallParticipantOp(
+          tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
+      SCOPED_TRACE(SecureShortDebugString(resp));
+      ASSERT_TRUE(resp.has_error());
+      ASSERT_TRUE(resp.error().has_status());
+      ASSERT_EQ(AppStatusPB::ILLEGAL_STATE, resp.error().status().code());
+      ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, resp.error().code());
+      if (expect_timestamp) {
+        ASSERT_TRUE(resp.has_timestamp());
+      } else {
+        ASSERT_FALSE(resp.has_timestamp());
+      }
+    }
+  };
   // Once we've begun the transaction, we can't finalize without beginning to
   // commit.
   NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_TXN, kTxnId));
@@ -301,6 +320,7 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
 
   // Once we begin committing, we can't start the transaction again.
   NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_COMMIT, kTxnId));
+  NO_FATALS(check_already_applied({ ParticipantOpPB::BEGIN_COMMIT }, kTxnId, true));
   NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN }, kTxnId));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
       { kTxnId, kCommitInProgress, -1 },
@@ -308,8 +328,9 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
 
   // Once we've begun finalizing, we can't do anything.
   NO_FATALS(check_valid_op(ParticipantOpPB::FINALIZE_COMMIT, kTxnId));
+  NO_FATALS(check_already_applied({ ParticipantOpPB::BEGIN_COMMIT }, kTxnId, true));
+  NO_FATALS(check_already_applied({ ParticipantOpPB::FINALIZE_COMMIT }, kTxnId, false));
   NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN,
-                            ParticipantOpPB::BEGIN_COMMIT,
                             ParticipantOpPB::ABORT_TXN }, kTxnId));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
       { kTxnId, kCommitted, kDummyCommitTimestamp },
diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h
index bb0c356..d276840 100644
--- a/src/kudu/tablet/txn_participant.h
+++ b/src/kudu/tablet/txn_participant.h
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include <glog/logging.h>
+#include <boost/optional/optional.hpp>
 
 #include "kudu/common/timestamp.h"
 #include "kudu/consensus/log_anchor_registry.h"
@@ -134,13 +135,41 @@ class Txn : public RefCountedThreadSafe<Txn> {
   Status ValidateBeginCommit(tserver::TabletServerErrorPB::Code* code,
                              Timestamp* begin_commit_ts) const {
     DCHECK(state_lock_.is_locked());
-    RETURN_NOT_OK(CheckFinishedInitializing(code));
+    boost::optional<Timestamp> already_applied_timestamp;
+    if (PREDICT_FALSE(state_ == kInitializing)) {
+      Timestamp timestamp;
+      TxnState meta_state;
+      if (!tablet_metadata_->HasTxnMetadata(txn_id_, &meta_state, &timestamp)) {
+        return Status::NotFound("Transaction hasn't been successfully started");
+      }
+      if (PREDICT_FALSE(meta_state != kCommitted && meta_state != kCommitInProgress)) {
+        *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
+        return Status::IllegalState(
+            strings::Substitute("Cannot begin committing transaction in state: $0",
+                                TxnStateToString(state_)));
+      }
+      // There's no in-flight transaction, but we've already committed the
+      // transaction and persisted a commit timestamp. Return the commit
+      // timestamp.
+      already_applied_timestamp = timestamp;
+    }
+    // If we're in the process of committing, return the commit timestamp we
+    // have available to us.
     if (PREDICT_FALSE(state_ == kCommitInProgress)) {
-      *begin_commit_ts = DCHECK_NOTNULL(commit_op_)->timestamp();
+      already_applied_timestamp = DCHECK_NOTNULL(commit_op_)->timestamp();
+    }
+    if (PREDICT_FALSE(state_ == kCommitted)) {
+      DCHECK_NE(-1, commit_timestamp_);
+      already_applied_timestamp = Timestamp(commit_timestamp_);
+    }
+    if (already_applied_timestamp) {
+      DCHECK_NE(Timestamp::kInvalidTimestamp, *already_applied_timestamp);
+      *begin_commit_ts = *already_applied_timestamp;
       *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
       return Status::IllegalState(
           strings::Substitute("Transaction $0 commit already in progress", txn_id_));
     }
+    // If the transaction is otherwise not open, return an error.
     if (PREDICT_FALSE(state_ != kOpen)) {
       *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
       return Status::IllegalState(