You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/04/05 01:04:28 UTC

[kudu] 01/02: KUDU-2612: allow ABORT_TXN txn ops to succeed if txn hasn't started

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 6be9794f91a2eebb291e4db2daf63a0f12c3ae2a
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Mar 23 23:42:36 2021 -0700

    KUDU-2612: allow ABORT_TXN txn ops to succeed if txn hasn't started
    
    This patch allows ABORT_TXN ops to proceed even if the participant
    hasn't yet initialized the transaction. This is desirable in the case
    where a participant has successfully registered with the
    TxnStatusManager, but hasn't yet replicated its BEGIN_TXN op. In these
    cases, the ops will unregister any pending TxnOpDispatchers, as well as
    mark the existence of the transaction on the participant, preventing
    further participant registrations for the transaction.
    
    Take the following as an example:
    1. a transactional write op is sent to a participant
    2. the participant successfully registers with the TxnStatusManager
    3. before the BEGIN_TXN op replicates, the node crashes
    4. no further retry of the write is sent
    5. the user attempt to commit the transaction
    6. the BEGIN_COMMIT op fails because the transaction hasn't started on
       the participant
    7. an ABORT_TXN op is scheduled but that fails too because the the
       transaction hasn't started on the participant
    
    With this patch, step 7 should succeed, and successfully replicate the
    ABORT_TXN op, even though the transaction doesn't exist. The test[1] for
    this isn't enabled yet -- some additional plumbing is required to ensure
    we return the correct error codes.
    
    Along the way, I hardened up an edge case for BEGIN_COMMIT so
    TXN_ILLEGAL_STATE is returned if the transaction doesn't exist, instead
    of returning a plain IllegalState, which would have resulted in the
    ParticipantRpc being retried.
    
    [1] see TxnOpDispatcherITest.CommitWithWriteOpPendingParticipantRegistered
    
    Change-Id: Ia4c864b4f14e42008d3aa8f4454c8b2abf9bb766
    Reviewed-on: http://gerrit.cloudera.org:8080/17233
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 .../integration-tests/txn_participant-itest.cc     | 44 ++++++++++++++
 src/kudu/tablet/tablet.cc                          |  3 +-
 src/kudu/tablet/tablet_metadata.cc                 |  5 +-
 src/kudu/tablet/txn_metadata.h                     |  2 +-
 src/kudu/tablet/txn_participant-test.cc            | 49 ++++++++-------
 src/kudu/tablet/txn_participant.h                  | 70 +++++++++++++---------
 6 files changed, 119 insertions(+), 54 deletions(-)

diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc
index 6a7e21e..241b5e9 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -592,6 +592,27 @@ TEST_F(TxnParticipantITest, TestBeginCommitAfterFinalize) {
   }
 }
 
+TEST_F(TxnParticipantITest, TestProxyErrorWhenNotBegun) {
+  constexpr const int kLeaderIdx = 0;
+  auto txn_id = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
+  const auto tablet_id = replicas[kLeaderIdx]->tablet_id();
+  for (auto type : { ParticipantOpPB::BEGIN_COMMIT,
+                     ParticipantOpPB::FINALIZE_COMMIT }) {
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    Status s = ParticipateInTransactionCheckResp(
+        admin_proxy.get(), tablet_id, txn_id++, type, &code);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
+  }
+  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+  ASSERT_OK(ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, txn_id++, ParticipantOpPB::ABORT_TXN, &code));
+  ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+}
+
 TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
@@ -932,6 +953,29 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientAbortSequence) {
       { { kTxnOne, kAborted, -1 }, { kTxnTwo, kAborted, -1 } }));
 }
 
+TEST_F(TxnParticipantITest, TestTxnSystemClientErrorWhenNotBegun) {
+  constexpr const int kLeaderIdx = 0;
+  int txn_id = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  auto* leader_replica = replicas[kLeaderIdx];
+  const auto tablet_id = leader_replica->tablet_id();
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  unique_ptr<TxnSystemClient> txn_client;
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client));
+
+  for (auto type : { ParticipantOpPB::BEGIN_COMMIT,
+                     ParticipantOpPB::FINALIZE_COMMIT }) {
+    Status s = txn_client->ParticipateInTransaction(
+        tablet_id, MakeParticipantOp(txn_id++, type), kDefaultTimeout);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    NO_FATALS(CheckReplicasMatchTxns(replicas, {}));
+  }
+
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(txn_id++, ParticipantOpPB::ABORT_TXN), kDefaultTimeout));
+  NO_FATALS(CheckReplicasMatchTxns(replicas, { { 2, kAborted, -1 } }));
+}
+
 TEST_F(TxnParticipantITest, TestTxnSystemClientTimeoutWhenNoMajority) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 6945594..1b1a27c 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1220,8 +1220,7 @@ void Tablet::AbortTransaction(Txn* txn,  const OpId& op_id) {
   metadata_->AbortTransaction(txn_id, std::move(anchor));
   {
     std::lock_guard<rw_spinlock> lock(component_lock_);
-    auto txn_rowsets = EraseKeyReturnValuePtr(&uncommitted_rowsets_by_txn_id_, txn_id);
-    CHECK(txn_rowsets);
+    uncommitted_rowsets_by_txn_id_.erase(txn_id);
   }
   txn->AbortTransaction();
 }
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index f572085..0d83dad 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -844,7 +844,10 @@ void TabletMetadata::AddCommitTimestamp(int64_t txn_id, Timestamp commit_timesta
 
 void TabletMetadata::AbortTransaction(int64_t txn_id, unique_ptr<MinLogIndexAnchorer> log_anchor) {
   std::lock_guard<LockType> l(data_lock_);
-  auto txn_metadata = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
+  // NOTE: we can't emplace with a raw pointer here; if the lookup succeeds, we
+  // wouldn't use it and we'd have a memory leak, so use scoped_refptr.
+  auto txn_metadata = LookupOrEmplace(&txn_metadata_by_txn_id_, txn_id,
+                                      scoped_refptr<TxnMetadata>(new TxnMetadata));
   CHECK(txn_metadata);
   txn_metadata->set_aborted();
   anchors_needing_flush_.emplace_back(std::move(log_anchor));
diff --git a/src/kudu/tablet/txn_metadata.h b/src/kudu/tablet/txn_metadata.h
index b250181..bcbeb0d 100644
--- a/src/kudu/tablet/txn_metadata.h
+++ b/src/kudu/tablet/txn_metadata.h
@@ -57,8 +57,8 @@ class TxnMetadata : public RefCountedThreadSafe<TxnMetadata> {
   void set_commit_timestamp(Timestamp commit_ts) {
     std::lock_guard<simple_spinlock> l(lock_);
     CHECK(boost::none == commit_timestamp_);
-    CHECK(boost::none != commit_mvcc_op_timestamp_);
     CHECK(!aborted_);
+    CHECK(boost::none != commit_mvcc_op_timestamp_);
     commit_timestamp_ = commit_ts;
   }
   void set_commit_mvcc_op_timestamp(Timestamp op_ts) {
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index c1e5437..4e54f31 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <initializer_list>
 #include <map>
 #include <memory>
 #include <ostream>
@@ -246,26 +247,30 @@ TEST_F(TxnParticipantTest, TestSuccessfulSequences) {
 }
 
 TEST_F(TxnParticipantTest, TestParticipantOpsWhenNotBegun) {
-  const auto check_bad_ops = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
-                                  int64_t txn_id) {
-    for (const auto& type : ops) {
-      ParticipantResponsePB resp;
-      ASSERT_OK(CallParticipantOp(
-          tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
-      SCOPED_TRACE(SecureShortDebugString(resp));
-      ASSERT_TRUE(resp.has_error());
-      ASSERT_TRUE(resp.error().has_status());
-      ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, resp.error().code());
-      ASSERT_EQ(AppStatusPB::ILLEGAL_STATE, resp.error().status().code());
-      ASSERT_FALSE(resp.has_timestamp());
-    }
-  };
-  NO_FATALS(check_bad_ops({
-    ParticipantOpPB::BEGIN_COMMIT,
-    ParticipantOpPB::FINALIZE_COMMIT,
-    ParticipantOpPB::ABORT_TXN,
-  }, 1));
-  ASSERT_TRUE(txn_participant()->GetTxnsForTests().empty());
+  auto txn_id = 0;
+  for (auto type : { ParticipantOpPB::BEGIN_COMMIT,
+                     ParticipantOpPB::FINALIZE_COMMIT }) {
+    ParticipantResponsePB resp;
+    ASSERT_OK(CallParticipantOp(
+        tablet_replica_.get(), txn_id++, type, kDummyCommitTimestamp, &resp));
+    SCOPED_TRACE(SecureShortDebugString(resp));
+    ASSERT_TRUE(resp.has_error());
+    ASSERT_TRUE(resp.error().has_status());
+    ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, resp.error().code());
+    ASSERT_EQ(AppStatusPB::ILLEGAL_STATE, resp.error().status().code());
+    ASSERT_FALSE(resp.has_timestamp());
+    ASSERT_TRUE(txn_participant()->GetTxnsForTests().empty());
+  }
+  ParticipantResponsePB resp;
+  ASSERT_OK(CallParticipantOp(
+      tablet_replica_.get(), txn_id++, ParticipantOpPB::ABORT_TXN, kDummyCommitTimestamp, &resp));
+  SCOPED_TRACE(SecureShortDebugString(resp));
+  ASSERT_FALSE(resp.has_error());
+  ASSERT_FALSE(resp.error().has_status());
+  ASSERT_TRUE(resp.has_timestamp());
+  ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+      { 2, kAborted, -1 },
+  }), txn_participant()->GetTxnsForTests());
 }
 
 TEST_F(TxnParticipantTest, TestIllegalTransitions) {
@@ -408,13 +413,11 @@ TEST_F(TxnParticipantTest, TestConcurrentOps) {
   // Regardless of order, we should have been able to begin the transaction.
   ASSERT_OK(status_for_op(ParticipantOpPB::BEGIN_TXN));
 
-  // If we finalized the commit, we should have begun committing, and we must
-  // not have been able to abort.
+  // If we finalized the commit, we must not have been able to abort.
   if (status_for_op(ParticipantOpPB::FINALIZE_COMMIT).ok()) {
     ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
         { kTxnId, kCommitted, kDummyCommitTimestamp },
     }), txn_participant()->GetTxnsForTests());
-    ASSERT_OK(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::BEGIN_COMMIT)]);
     ASSERT_FALSE(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::ABORT_TXN)].ok());
 
   // If we aborted the commit, we could not have finalized the commit.
diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h
index c497f04..8539372 100644
--- a/src/kudu/tablet/txn_participant.h
+++ b/src/kudu/tablet/txn_participant.h
@@ -140,6 +140,7 @@ class Txn : public RefCountedThreadSafe<Txn> {
       Timestamp timestamp;
       TxnState meta_state;
       if (!tablet_metadata_->HasTxnMetadata(txn_id_, &meta_state, &timestamp)) {
+        *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
         return Status::IllegalState("Transaction hasn't been successfully started");
       }
       if (PREDICT_FALSE(meta_state != kCommitted && meta_state != kCommitInProgress)) {
@@ -180,13 +181,13 @@ class Txn : public RefCountedThreadSafe<Txn> {
   }
   Status ValidateFinalize(tserver::TabletServerErrorPB::Code* code) const {
     DCHECK(state_lock_.is_locked());
-    RETURN_NOT_OK(CheckFinishedInitializing(code, kCommitted));
+    RETURN_NOT_OK(CheckPersistedMetadataState(code, kCommitted));
     if (PREDICT_FALSE(state_ == kCommitted)) {
       *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
       return Status::IllegalState(
           strings::Substitute("Transaction $0 has already been committed", txn_id_));
     }
-    if (PREDICT_FALSE(state_ != kCommitInProgress)) {
+    if (PREDICT_FALSE(state_ != kInitializing && state_ != kCommitInProgress)) {
       *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
       return Status::IllegalState(
           strings::Substitute("Cannot finalize transaction in state: $0",
@@ -196,14 +197,16 @@ class Txn : public RefCountedThreadSafe<Txn> {
   }
   Status ValidateAbort(tserver::TabletServerErrorPB::Code* code) const {
     DCHECK(state_lock_.is_locked());
-    RETURN_NOT_OK(CheckFinishedInitializing(code, kAborted));
+    // NOTE: it's allowed that we replicate an ABORT_TXN op even if the
+    // transaction doesn't exist. This allows us to ensure we remove any
+    // pending TxnOpDispatchers.
+    RETURN_NOT_OK(CheckPersistedMetadataState(code, kAborted));
     if (PREDICT_FALSE(state_ == kAborted)) {
       *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
       return Status::IllegalState(
           strings::Substitute("Transaction $0 has already been aborted", txn_id_));
     }
-    if (PREDICT_FALSE(state_ != kOpen &&
-                      state_ != kCommitInProgress)) {
+    if (PREDICT_FALSE(state_ == kCommitted)) {
       *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
       return Status::IllegalState(
           strings::Substitute("Cannot abort transaction in state: $0",
@@ -262,38 +265,51 @@ class Txn : public RefCountedThreadSafe<Txn> {
     state_ = s;
   }
 
-  // Returns an error if the transaction has not finished initializing. This
-  // may mean that an in-flight transaction didn't exist so we created a new
-  // one, in which case we need to check if there's existing metadata for it.
+  // If we're initializing a new transaction, checks for any persisted
+  // transaction metadata, ensuring it's compatible with our expected state,
+  // and returning an error otherwise.
   //
   // NOTE: we only need to check the expected metadata state when we're
   // aborting or finalizing a commit, since these end the in-flight
   // transaction. In other cases, we should be able to check the state of the
   // in-flight transaction.
-  Status CheckFinishedInitializing(
+  Status CheckPersistedMetadataState(
       tserver::TabletServerErrorPB::Code* code,
       TxnState expected_metadata_state = kNone) const {
-    DCHECK(expected_metadata_state == kNone ||
-           expected_metadata_state == kAborted ||
-           expected_metadata_state == kCommitted);
-    if (PREDICT_FALSE(state_ == kInitializing)) {
-      TxnState meta_state;
-      if (tablet_metadata_->HasTxnMetadata(txn_id_, &meta_state)) {
-        if (expected_metadata_state != kNone && expected_metadata_state == meta_state) {
-          *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
-          return Status::IllegalState(
-              strings::Substitute("Transaction metadata for transaction $0 already set",
-                                  txn_id_));
-        }
-        // We created this transaction as a part of this op (i.e. it was not
-        // already in flight), and there is existing metadata for it.
-        *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
+    DCHECK(expected_metadata_state == kAborted || expected_metadata_state == kCommitted);
+    // If we're not initializing a new transaction, there's an already
+    // in-flight, non-persisted transaction, and we'll check its state
+    // elsewhere.
+    if (PREDICT_TRUE(state_ != kInitializing)) {
+      return Status::OK();
+    }
+    TxnState meta_state;
+    if (tablet_metadata_->HasTxnMetadata(txn_id_, &meta_state)) {
+      if (expected_metadata_state == meta_state) {
+        *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
         return Status::IllegalState(
-            strings::Substitute("Transaction metadata for transaction $0 already exists",
+            strings::Substitute("Transaction metadata for transaction $0 already set",
                                 txn_id_));
       }
-      // TODO(awong): add another code for this?
-      return Status::IllegalState("Transaction hasn't been successfully started");
+      // We created this transaction as a part of this op (i.e. it was not
+      // already in flight), and there is existing metadata for it that doesn't
+      // match the expected state.
+      *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
+      return Status::IllegalState(
+          strings::Substitute("Transaction metadata for transaction $0 already exists",
+                              txn_id_));
+    }
+    // If we don't have metadata for the transaction, it depends on what the
+    // expected state is. E.g. ABORT_TXN ops can and should be replicated even
+    // if we haven't successfully initialized the transaction. This is possible
+    // if we've successfully registered the participant, but haven't yet
+    // replicated the BEGIN_TXN op. The same can't be said for FINALIZE_COMMIT
+    // ops, so return an error here in that case.
+    if (expected_metadata_state == kCommitted) {
+      *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
+      return Status::IllegalState(
+          strings::Substitute("Transaction metadata for transaction $0 does not exist",
+                              txn_id_));
     }
     return Status::OK();
   }