You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/04/05 01:04:27 UTC

[kudu] branch master updated (ed3d916 -> 78fb204)

This is an automated email from the ASF dual-hosted git repository.

awong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from ed3d916  [master] Check for 'last_known_addr' field when adding master
     new 6be9794  KUDU-2612: allow ABORT_TXN txn ops to succeed if txn hasn't started
     new 78fb204  KUDU-2612: route txn op dispatching errors to write ops

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/client/batcher.cc                         |  6 ++
 src/kudu/integration-tests/txn_commit-itest.cc     |  2 +-
 .../integration-tests/txn_participant-itest.cc     | 44 ++++++++++++++
 src/kudu/integration-tests/txn_write_ops-itest.cc  | 60 ++++++++-----------
 src/kudu/tablet/tablet.cc                          |  3 +-
 src/kudu/tablet/tablet_metadata.cc                 | 11 ++--
 src/kudu/tablet/tablet_replica.cc                  | 44 +++++++-------
 src/kudu/tablet/tablet_replica.h                   | 16 +++--
 src/kudu/tablet/txn_metadata.h                     |  2 +-
 src/kudu/tablet/txn_participant-test.cc            | 49 ++++++++-------
 src/kudu/tablet/txn_participant.h                  | 70 +++++++++++++---------
 src/kudu/transactions/txn_status_manager.cc        | 24 +++-----
 src/kudu/tserver/tablet_service.cc                 |  5 +-
 src/kudu/tserver/ts_tablet_manager.cc              | 21 ++++---
 src/kudu/tserver/ts_tablet_manager.h               |  6 +-
 15 files changed, 213 insertions(+), 150 deletions(-)

[kudu] 01/02: KUDU-2612: allow ABORT_TXN txn ops to succeed if txn hasn't started

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 6be9794f91a2eebb291e4db2daf63a0f12c3ae2a
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Mar 23 23:42:36 2021 -0700

    KUDU-2612: allow ABORT_TXN txn ops to succeed if txn hasn't started
    
    This patch allows ABORT_TXN ops to proceed even if the participant
    hasn't yet initialized the transaction. This is desirable in the case
    where a participant has successfully registered with the
    TxnStatusManager, but hasn't yet replicated its BEGIN_TXN op. In these
    cases, the ops will unregister any pending TxnOpDispatchers, as well as
    mark the existence of the transaction on the participant, preventing
    further participant registrations for the transaction.
    
    Take the following as an example:
    1. a transactional write op is sent to a participant
    2. the participant successfully registers with the TxnStatusManager
    3. before the BEGIN_TXN op replicates, the node crashes
    4. no further retry of the write is sent
    5. the user attempt to commit the transaction
    6. the BEGIN_COMMIT op fails because the transaction hasn't started on
       the participant
    7. an ABORT_TXN op is scheduled but that fails too because the the
       transaction hasn't started on the participant
    
    With this patch, step 7 should succeed, and successfully replicate the
    ABORT_TXN op, even though the transaction doesn't exist. The test[1] for
    this isn't enabled yet -- some additional plumbing is required to ensure
    we return the correct error codes.
    
    Along the way, I hardened up an edge case for BEGIN_COMMIT so
    TXN_ILLEGAL_STATE is returned if the transaction doesn't exist, instead
    of returning a plain IllegalState, which would have resulted in the
    ParticipantRpc being retried.
    
    [1] see TxnOpDispatcherITest.CommitWithWriteOpPendingParticipantRegistered
    
    Change-Id: Ia4c864b4f14e42008d3aa8f4454c8b2abf9bb766
    Reviewed-on: http://gerrit.cloudera.org:8080/17233
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 .../integration-tests/txn_participant-itest.cc     | 44 ++++++++++++++
 src/kudu/tablet/tablet.cc                          |  3 +-
 src/kudu/tablet/tablet_metadata.cc                 |  5 +-
 src/kudu/tablet/txn_metadata.h                     |  2 +-
 src/kudu/tablet/txn_participant-test.cc            | 49 ++++++++-------
 src/kudu/tablet/txn_participant.h                  | 70 +++++++++++++---------
 6 files changed, 119 insertions(+), 54 deletions(-)

diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc
index 6a7e21e..241b5e9 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -592,6 +592,27 @@ TEST_F(TxnParticipantITest, TestBeginCommitAfterFinalize) {
   }
 }
 
+TEST_F(TxnParticipantITest, TestProxyErrorWhenNotBegun) {
+  constexpr const int kLeaderIdx = 0;
+  auto txn_id = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
+  const auto tablet_id = replicas[kLeaderIdx]->tablet_id();
+  for (auto type : { ParticipantOpPB::BEGIN_COMMIT,
+                     ParticipantOpPB::FINALIZE_COMMIT }) {
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    Status s = ParticipateInTransactionCheckResp(
+        admin_proxy.get(), tablet_id, txn_id++, type, &code);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
+  }
+  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+  ASSERT_OK(ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, txn_id++, ParticipantOpPB::ABORT_TXN, &code));
+  ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+}
+
 TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
@@ -932,6 +953,29 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientAbortSequence) {
       { { kTxnOne, kAborted, -1 }, { kTxnTwo, kAborted, -1 } }));
 }
 
+TEST_F(TxnParticipantITest, TestTxnSystemClientErrorWhenNotBegun) {
+  constexpr const int kLeaderIdx = 0;
+  int txn_id = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  auto* leader_replica = replicas[kLeaderIdx];
+  const auto tablet_id = leader_replica->tablet_id();
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  unique_ptr<TxnSystemClient> txn_client;
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client));
+
+  for (auto type : { ParticipantOpPB::BEGIN_COMMIT,
+                     ParticipantOpPB::FINALIZE_COMMIT }) {
+    Status s = txn_client->ParticipateInTransaction(
+        tablet_id, MakeParticipantOp(txn_id++, type), kDefaultTimeout);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    NO_FATALS(CheckReplicasMatchTxns(replicas, {}));
+  }
+
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(txn_id++, ParticipantOpPB::ABORT_TXN), kDefaultTimeout));
+  NO_FATALS(CheckReplicasMatchTxns(replicas, { { 2, kAborted, -1 } }));
+}
+
 TEST_F(TxnParticipantITest, TestTxnSystemClientTimeoutWhenNoMajority) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 6945594..1b1a27c 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1220,8 +1220,7 @@ void Tablet::AbortTransaction(Txn* txn,  const OpId& op_id) {
   metadata_->AbortTransaction(txn_id, std::move(anchor));
   {
     std::lock_guard<rw_spinlock> lock(component_lock_);
-    auto txn_rowsets = EraseKeyReturnValuePtr(&uncommitted_rowsets_by_txn_id_, txn_id);
-    CHECK(txn_rowsets);
+    uncommitted_rowsets_by_txn_id_.erase(txn_id);
   }
   txn->AbortTransaction();
 }
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index f572085..0d83dad 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -844,7 +844,10 @@ void TabletMetadata::AddCommitTimestamp(int64_t txn_id, Timestamp commit_timesta
 
 void TabletMetadata::AbortTransaction(int64_t txn_id, unique_ptr<MinLogIndexAnchorer> log_anchor) {
   std::lock_guard<LockType> l(data_lock_);
-  auto txn_metadata = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
+  // NOTE: we can't emplace with a raw pointer here; if the lookup succeeds, we
+  // wouldn't use it and we'd have a memory leak, so use scoped_refptr.
+  auto txn_metadata = LookupOrEmplace(&txn_metadata_by_txn_id_, txn_id,
+                                      scoped_refptr<TxnMetadata>(new TxnMetadata));
   CHECK(txn_metadata);
   txn_metadata->set_aborted();
   anchors_needing_flush_.emplace_back(std::move(log_anchor));
diff --git a/src/kudu/tablet/txn_metadata.h b/src/kudu/tablet/txn_metadata.h
index b250181..bcbeb0d 100644
--- a/src/kudu/tablet/txn_metadata.h
+++ b/src/kudu/tablet/txn_metadata.h
@@ -57,8 +57,8 @@ class TxnMetadata : public RefCountedThreadSafe<TxnMetadata> {
   void set_commit_timestamp(Timestamp commit_ts) {
     std::lock_guard<simple_spinlock> l(lock_);
     CHECK(boost::none == commit_timestamp_);
-    CHECK(boost::none != commit_mvcc_op_timestamp_);
     CHECK(!aborted_);
+    CHECK(boost::none != commit_mvcc_op_timestamp_);
     commit_timestamp_ = commit_ts;
   }
   void set_commit_mvcc_op_timestamp(Timestamp op_ts) {
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index c1e5437..4e54f31 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <initializer_list>
 #include <map>
 #include <memory>
 #include <ostream>
@@ -246,26 +247,30 @@ TEST_F(TxnParticipantTest, TestSuccessfulSequences) {
 }
 
 TEST_F(TxnParticipantTest, TestParticipantOpsWhenNotBegun) {
-  const auto check_bad_ops = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
-                                  int64_t txn_id) {
-    for (const auto& type : ops) {
-      ParticipantResponsePB resp;
-      ASSERT_OK(CallParticipantOp(
-          tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
-      SCOPED_TRACE(SecureShortDebugString(resp));
-      ASSERT_TRUE(resp.has_error());
-      ASSERT_TRUE(resp.error().has_status());
-      ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, resp.error().code());
-      ASSERT_EQ(AppStatusPB::ILLEGAL_STATE, resp.error().status().code());
-      ASSERT_FALSE(resp.has_timestamp());
-    }
-  };
-  NO_FATALS(check_bad_ops({
-    ParticipantOpPB::BEGIN_COMMIT,
-    ParticipantOpPB::FINALIZE_COMMIT,
-    ParticipantOpPB::ABORT_TXN,
-  }, 1));
-  ASSERT_TRUE(txn_participant()->GetTxnsForTests().empty());
+  auto txn_id = 0;
+  for (auto type : { ParticipantOpPB::BEGIN_COMMIT,
+                     ParticipantOpPB::FINALIZE_COMMIT }) {
+    ParticipantResponsePB resp;
+    ASSERT_OK(CallParticipantOp(
+        tablet_replica_.get(), txn_id++, type, kDummyCommitTimestamp, &resp));
+    SCOPED_TRACE(SecureShortDebugString(resp));
+    ASSERT_TRUE(resp.has_error());
+    ASSERT_TRUE(resp.error().has_status());
+    ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, resp.error().code());
+    ASSERT_EQ(AppStatusPB::ILLEGAL_STATE, resp.error().status().code());
+    ASSERT_FALSE(resp.has_timestamp());
+    ASSERT_TRUE(txn_participant()->GetTxnsForTests().empty());
+  }
+  ParticipantResponsePB resp;
+  ASSERT_OK(CallParticipantOp(
+      tablet_replica_.get(), txn_id++, ParticipantOpPB::ABORT_TXN, kDummyCommitTimestamp, &resp));
+  SCOPED_TRACE(SecureShortDebugString(resp));
+  ASSERT_FALSE(resp.has_error());
+  ASSERT_FALSE(resp.error().has_status());
+  ASSERT_TRUE(resp.has_timestamp());
+  ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+      { 2, kAborted, -1 },
+  }), txn_participant()->GetTxnsForTests());
 }
 
 TEST_F(TxnParticipantTest, TestIllegalTransitions) {
@@ -408,13 +413,11 @@ TEST_F(TxnParticipantTest, TestConcurrentOps) {
   // Regardless of order, we should have been able to begin the transaction.
   ASSERT_OK(status_for_op(ParticipantOpPB::BEGIN_TXN));
 
-  // If we finalized the commit, we should have begun committing, and we must
-  // not have been able to abort.
+  // If we finalized the commit, we must not have been able to abort.
   if (status_for_op(ParticipantOpPB::FINALIZE_COMMIT).ok()) {
     ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
         { kTxnId, kCommitted, kDummyCommitTimestamp },
     }), txn_participant()->GetTxnsForTests());
-    ASSERT_OK(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::BEGIN_COMMIT)]);
     ASSERT_FALSE(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::ABORT_TXN)].ok());
 
   // If we aborted the commit, we could not have finalized the commit.
diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h
index c497f04..8539372 100644
--- a/src/kudu/tablet/txn_participant.h
+++ b/src/kudu/tablet/txn_participant.h
@@ -140,6 +140,7 @@ class Txn : public RefCountedThreadSafe<Txn> {
       Timestamp timestamp;
       TxnState meta_state;
       if (!tablet_metadata_->HasTxnMetadata(txn_id_, &meta_state, &timestamp)) {
+        *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
         return Status::IllegalState("Transaction hasn't been successfully started");
       }
       if (PREDICT_FALSE(meta_state != kCommitted && meta_state != kCommitInProgress)) {
@@ -180,13 +181,13 @@ class Txn : public RefCountedThreadSafe<Txn> {
   }
   Status ValidateFinalize(tserver::TabletServerErrorPB::Code* code) const {
     DCHECK(state_lock_.is_locked());
-    RETURN_NOT_OK(CheckFinishedInitializing(code, kCommitted));
+    RETURN_NOT_OK(CheckPersistedMetadataState(code, kCommitted));
     if (PREDICT_FALSE(state_ == kCommitted)) {
       *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
       return Status::IllegalState(
           strings::Substitute("Transaction $0 has already been committed", txn_id_));
     }
-    if (PREDICT_FALSE(state_ != kCommitInProgress)) {
+    if (PREDICT_FALSE(state_ != kInitializing && state_ != kCommitInProgress)) {
       *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
       return Status::IllegalState(
           strings::Substitute("Cannot finalize transaction in state: $0",
@@ -196,14 +197,16 @@ class Txn : public RefCountedThreadSafe<Txn> {
   }
   Status ValidateAbort(tserver::TabletServerErrorPB::Code* code) const {
     DCHECK(state_lock_.is_locked());
-    RETURN_NOT_OK(CheckFinishedInitializing(code, kAborted));
+    // NOTE: it's allowed that we replicate an ABORT_TXN op even if the
+    // transaction doesn't exist. This allows us to ensure we remove any
+    // pending TxnOpDispatchers.
+    RETURN_NOT_OK(CheckPersistedMetadataState(code, kAborted));
     if (PREDICT_FALSE(state_ == kAborted)) {
       *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
       return Status::IllegalState(
           strings::Substitute("Transaction $0 has already been aborted", txn_id_));
     }
-    if (PREDICT_FALSE(state_ != kOpen &&
-                      state_ != kCommitInProgress)) {
+    if (PREDICT_FALSE(state_ == kCommitted)) {
       *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
       return Status::IllegalState(
           strings::Substitute("Cannot abort transaction in state: $0",
@@ -262,38 +265,51 @@ class Txn : public RefCountedThreadSafe<Txn> {
     state_ = s;
   }
 
-  // Returns an error if the transaction has not finished initializing. This
-  // may mean that an in-flight transaction didn't exist so we created a new
-  // one, in which case we need to check if there's existing metadata for it.
+  // If we're initializing a new transaction, checks for any persisted
+  // transaction metadata, ensuring it's compatible with our expected state,
+  // and returning an error otherwise.
   //
   // NOTE: we only need to check the expected metadata state when we're
   // aborting or finalizing a commit, since these end the in-flight
   // transaction. In other cases, we should be able to check the state of the
   // in-flight transaction.
-  Status CheckFinishedInitializing(
+  Status CheckPersistedMetadataState(
       tserver::TabletServerErrorPB::Code* code,
       TxnState expected_metadata_state = kNone) const {
-    DCHECK(expected_metadata_state == kNone ||
-           expected_metadata_state == kAborted ||
-           expected_metadata_state == kCommitted);
-    if (PREDICT_FALSE(state_ == kInitializing)) {
-      TxnState meta_state;
-      if (tablet_metadata_->HasTxnMetadata(txn_id_, &meta_state)) {
-        if (expected_metadata_state != kNone && expected_metadata_state == meta_state) {
-          *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
-          return Status::IllegalState(
-              strings::Substitute("Transaction metadata for transaction $0 already set",
-                                  txn_id_));
-        }
-        // We created this transaction as a part of this op (i.e. it was not
-        // already in flight), and there is existing metadata for it.
-        *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
+    DCHECK(expected_metadata_state == kAborted || expected_metadata_state == kCommitted);
+    // If we're not initializing a new transaction, there's an already
+    // in-flight, non-persisted transaction, and we'll check its state
+    // elsewhere.
+    if (PREDICT_TRUE(state_ != kInitializing)) {
+      return Status::OK();
+    }
+    TxnState meta_state;
+    if (tablet_metadata_->HasTxnMetadata(txn_id_, &meta_state)) {
+      if (expected_metadata_state == meta_state) {
+        *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
         return Status::IllegalState(
-            strings::Substitute("Transaction metadata for transaction $0 already exists",
+            strings::Substitute("Transaction metadata for transaction $0 already set",
                                 txn_id_));
       }
-      // TODO(awong): add another code for this?
-      return Status::IllegalState("Transaction hasn't been successfully started");
+      // We created this transaction as a part of this op (i.e. it was not
+      // already in flight), and there is existing metadata for it that doesn't
+      // match the expected state.
+      *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
+      return Status::IllegalState(
+          strings::Substitute("Transaction metadata for transaction $0 already exists",
+                              txn_id_));
+    }
+    // If we don't have metadata for the transaction, it depends on what the
+    // expected state is. E.g. ABORT_TXN ops can and should be replicated even
+    // if we haven't successfully initialized the transaction. This is possible
+    // if we've successfully registered the participant, but haven't yet
+    // replicated the BEGIN_TXN op. The same can't be said for FINALIZE_COMMIT
+    // ops, so return an error here in that case.
+    if (expected_metadata_state == kCommitted) {
+      *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
+      return Status::IllegalState(
+          strings::Substitute("Transaction metadata for transaction $0 does not exist",
+                              txn_id_));
     }
     return Status::OK();
   }

[kudu] 02/02: KUDU-2612: route txn op dispatching errors to write ops

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 78fb2047da74a1ebd051dbfa29c4d188056b47bc
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Wed Mar 24 00:57:14 2021 -0700

    KUDU-2612: route txn op dispatching errors to write ops
    
    This patch routes bad statuses from the TxnOpDispatcher to the write ops
    that initiated the registration, and adjusts the batcher code to handle
    such errors.
    
    This also enables a couple of test cases that were written with this
    behavior in mind; and adjusts some expected errors, addressing some
    TODOs that expected this behavior.
    
    A follow-up patch will make a similar change to the Java client.
    
    Change-Id: Ibf85e0724ee579cb20dac642b82e3228faf90935
    Reviewed-on: http://gerrit.cloudera.org:8080/17217
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/client/batcher.cc                        |  6 +++
 src/kudu/integration-tests/txn_commit-itest.cc    |  2 +-
 src/kudu/integration-tests/txn_write_ops-itest.cc | 60 ++++++++++-------------
 src/kudu/tablet/tablet_metadata.cc                |  6 +--
 src/kudu/tablet/tablet_replica.cc                 | 44 +++++++++--------
 src/kudu/tablet/tablet_replica.h                  | 16 ++++--
 src/kudu/transactions/txn_status_manager.cc       | 24 +++------
 src/kudu/tserver/tablet_service.cc                |  5 +-
 src/kudu/tserver/ts_tablet_manager.cc             | 21 ++++----
 src/kudu/tserver/ts_tablet_manager.h              |  6 +--
 10 files changed, 94 insertions(+), 96 deletions(-)

diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index 93036b2..dc26aed 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -506,6 +506,12 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
     return result;
   }
 
+  if (resp_.has_error() &&
+      resp_.error().code() == tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE) {
+    result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
+    return result;
+  }
+
   // Alternatively, when we get a status code of IllegalState or Aborted, we
   // assume this means that the replica we attempted to write to is not the
   // current leader (maybe it got partitioned or slow and another node took
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc b/src/kudu/integration-tests/txn_commit-itest.cc
index f125f54..4ddc4c0 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -679,7 +679,7 @@ TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) {
   Status completion_status;
   bool is_complete;
   ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
-  ASSERT_TRUE(completion_status.IsIncomplete()) << completion_status.ToString();
+  ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
 }
 
 // Try concurrently beginning to commit a bunch of different transactions.
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index cb7a8f0..2a37aa4 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -659,12 +659,14 @@ TEST_F(TxnWriteOpsITest, WriteOpForNonExistentTxn) {
 
 // Try to write an extra row in the context of a transaction which has already
 // been committed.
-//
-// TODO(aserbin): due to conversion of Status::IllegalState() into
-//                RetriableRpcStatus::REPLICA_NOT_LEADER result code,
-//                these sub-scenarios fail with Status::TimedOut() because
-//                they retry in vain until RPC timeout elapses
-TEST_F(TxnWriteOpsITest, DISABLED_TxnWriteAfterCommit) {
+TEST_F(TxnWriteOpsITest, TxnWriteAfterCommit) {
+  const vector<string> master_flags = {
+    // Enable TxnManager in Kudu masters.
+    // TODO(aserbin): remove this customization once the flag is 'on' by default
+    "--txn_manager_enabled=true",
+  };
+  NO_FATALS(StartCluster({}, master_flags, kNumTabletServers));
+  NO_FATALS(Prepare());
   int idx = 0;
   {
     shared_ptr<KuduTransaction> txn;
@@ -685,10 +687,10 @@ TEST_F(TxnWriteOpsITest, DISABLED_TxnWriteAfterCommit) {
       ASSERT_TRUE(s.IsIOError()) << s.ToString();
       ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
       const auto err_status = GetSingleRowError(session.get());
-      ASSERT_TRUE(err_status.IsInvalidArgument()) << err_status.ToString();
+      ASSERT_TRUE(err_status.IsIllegalState()) << err_status.ToString();
       ASSERT_STR_CONTAINS(err_status.ToString(),
                           "Failed to write batch of 1 ops to tablet");
-      ASSERT_STR_MATCHES(err_status.ToString(), "txn .* is not open");
+      ASSERT_STR_MATCHES(err_status.ToString(), "transaction .* not open");
     }
   }
   // A scenario similar to one above, but restart tablet servers before an
@@ -722,10 +724,10 @@ TEST_F(TxnWriteOpsITest, DISABLED_TxnWriteAfterCommit) {
       ASSERT_TRUE(s.IsIOError()) << s.ToString();
       ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
       const auto err_status = GetSingleRowError(session.get());
-      ASSERT_TRUE(err_status.IsNotFound()) << err_status.ToString();
+      ASSERT_TRUE(err_status.IsIllegalState()) << err_status.ToString();
       ASSERT_STR_CONTAINS(err_status.ToString(),
                           "Failed to write batch of 1 ops to tablet");
-      ASSERT_STR_MATCHES(err_status.ToString(), "txn .* is not open");
+      ASSERT_STR_MATCHES(err_status.ToString(), "transaction .* not open");
     }
   }
 }
@@ -1031,11 +1033,6 @@ TEST_F(TxnOpDispatcherITest, ErrorInParticipantRegistration) {
     // Here a custom client with shorter timeout is used to avoid making
     // too many pointless retries upon receiving Status::IllegalState()
     // from the tablet server.
-    //
-    // TODO(aserbin): stop using the custom client with shorter timeout as soon
-    //                as the issue in client/batcher.cc with the transformation
-    //                of both Status::IllegalState() and Status::Aborted() into
-    //                RetriableRpcStatus::REPLICA_NOT_LEADER result if fixed
     const MonoDelta kCustomTimeout = MonoDelta::FromSeconds(2);
     KuduClientBuilder builder;
     builder.default_admin_operation_timeout(kCustomTimeout);
@@ -1057,7 +1054,7 @@ TEST_F(TxnOpDispatcherITest, ErrorInParticipantRegistration) {
     auto s = InsertRows(txn.get(), 1, &key, &session);
     ASSERT_TRUE(s.IsIOError()) << s.ToString();
     auto row_status = GetSingleRowError(session.get());
-    ASSERT_TRUE(row_status.IsTimedOut()) << row_status.ToString();
+    ASSERT_TRUE(row_status.IsIllegalState()) << row_status.ToString();
     ASSERT_STR_CONTAINS(row_status.ToString(),
                         "Failed to write batch of 1 ops to tablet");
 
@@ -1370,11 +1367,7 @@ TEST_F(TxnOpDispatcherITest, PreliminaryTasksTimeout) {
     } else {
       // This is the case when tablets have been registered as participants.
       // In this case, the transaction should not be able to finalize.
-      //
-      // TODO(aserbin): this should result in IllegalState() after addressing
-      //                the issue with convertion of IllegalState() into
-      //                retriable error status in Kudu client
-      ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+      ASSERT_TRUE(s.IsAborted()) << s.ToString();
     }
 
     // No rows should be persisted.
@@ -1485,13 +1478,7 @@ TEST_F(TxnOpDispatcherITest, CommitWithWriteOpPendingParticipantNotYetRegistered
   const auto s = InsertRows(txn.get(), 1, &key, &session);
   ASSERT_TRUE(s.IsIOError()) << s.ToString();
   const auto row_status = GetSingleRowError(session.get());
-  // TODO(aserbin): due to conversion of Status::IllegalState() into
-  //                RetriableRpcStatus::REPLICA_NOT_LEADER result code,
-  //                the write RPC times out after many retries instead of
-  //                bailing out right away. Update the expected error code after
-  //                the issue is fixed.
-  //ASSERT_TRUE(row_status.IsIllegalState());
-  ASSERT_TRUE(row_status.IsTimedOut()) << s.ToString();
+  ASSERT_TRUE(row_status.IsIllegalState()) << s.ToString();
 
   committer.join();
   cleanup.cancel();
@@ -1521,7 +1508,7 @@ TEST_F(TxnOpDispatcherITest, CommitWithWriteOpPendingParticipantNotYetRegistered
 //
 // TODO(aserbin): enable the scenario after the follow-up for
 //                https://gerrit.cloudera.org/#/c/17127/ is merged
-TEST_F(TxnOpDispatcherITest, DISABLED_CommitWithWriteOpPendingParticipantRegistered) {
+TEST_F(TxnOpDispatcherITest, CommitWithWriteOpPendingParticipantRegistered) {
   SKIP_IF_SLOW_NOT_ALLOWED();
 
   constexpr auto kDelayMs = 1000;
@@ -1545,21 +1532,26 @@ TEST_F(TxnOpDispatcherITest, DISABLED_CommitWithWriteOpPendingParticipantRegiste
 
   shared_ptr<KuduSession> session;
   int64_t key = 0;
-  ASSERT_OK(InsertRows(txn.get(), 1, &key, &session));
+  Status s = InsertRows(txn.get(), 1, &key, &session);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
 
+  // Since we tried to commit without allowing all participants to quiesce ops,
+  // the transaction should automatically fail.
   committer.join();
   cleanup.cancel();
   ASSERT_OK(commit_init_status);
 
   bool is_complete = false;
   Status completion_status;
-  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
-  ASSERT_TRUE(is_complete);
-  ASSERT_OK(completion_status);
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(is_complete);
+    ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+  });
 
   size_t num_rows = 0;
   ASSERT_OK(CountRows(table_.get(), &num_rows));
-  ASSERT_EQ(1, num_rows);
+  ASSERT_EQ(0, num_rows);
 
   // Since the commit has been successfully finalized, there should be no
   // TxnOpDispatcher for the transaction.
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 0d83dad..69fcc84 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -822,8 +822,7 @@ void TabletMetadata::AddTxnMetadata(int64_t txn_id, unique_ptr<MinLogIndexAnchor
 void TabletMetadata::BeginCommitTransaction(int64_t txn_id, Timestamp mvcc_op_timestamp,
                                             unique_ptr<MinLogIndexAnchorer> log_anchor) {
   std::lock_guard<LockType> l(data_lock_);
-  auto txn_metadata = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
-  CHECK(txn_metadata);
+  auto txn_metadata = FindOrDie(txn_metadata_by_txn_id_, txn_id);
   // NOTE: we may already have an MVCC op timestamp if we are bootstrapping and
   // the timestamp was persisted already, in which case, we don't need to
   // anchor the WAL to ensure the timestamp's persistence in metadata.
@@ -836,8 +835,7 @@ void TabletMetadata::BeginCommitTransaction(int64_t txn_id, Timestamp mvcc_op_ti
 void TabletMetadata::AddCommitTimestamp(int64_t txn_id, Timestamp commit_timestamp,
                                         unique_ptr<MinLogIndexAnchorer> log_anchor) {
   std::lock_guard<LockType> l(data_lock_);
-  auto txn_metadata = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
-  CHECK(txn_metadata);
+  auto txn_metadata = FindOrDie(txn_metadata_by_txn_id_, txn_id);
   txn_metadata->set_commit_timestamp(commit_timestamp);
   anchors_needing_flush_.emplace_back(std::move(log_anchor));
 }
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index f4995c5..a50bdb4 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -71,7 +71,6 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
-#include "kudu/util/status_callback.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
@@ -538,7 +537,7 @@ Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) {
 
 Status TabletReplica::SubmitTxnWrite(
     std::unique_ptr<WriteOpState> op_state,
-    const std::function<Status(int64_t txn_id, StatusCallback cb)>& scheduler) {
+    const std::function<Status(int64_t txn_id, RegisteredTxnCallback cb)>& scheduler) {
   DCHECK(op_state);
   DCHECK(op_state->request()->has_txn_id());
 
@@ -576,7 +575,8 @@ Status TabletReplica::UnregisterTxnOpDispatcher(int64_t txn_id,
     auto& dispatcher = it->second;
     unregister_status = dispatcher->MarkUnregistered();
     if (abort_pending_ops) {
-      dispatcher->Cancel(Status::Aborted("operation has been aborted"));
+      dispatcher->Cancel(Status::Aborted("operation has been aborted"),
+                         TabletServerErrorPB::TXN_ILLEGAL_STATE);
       unregister_status = Status::OK();
     }
     if (unregister_status.ok()) {
@@ -1095,7 +1095,7 @@ void TabletReplica::DecreaseTxnCoordinatorTaskCounter() {
 
 class ParticipantBeginTxnCallback : public OpCompletionCallback {
  public:
-  ParticipantBeginTxnCallback(StatusCallback cb,
+  ParticipantBeginTxnCallback(RegisteredTxnCallback cb,
                               unique_ptr<ParticipantRequestPB> req)
       : cb_(std::move(cb)),
         req_(std::move(req)),
@@ -1116,21 +1116,21 @@ class ParticipantBeginTxnCallback : public OpCompletionCallback {
       // a transactional write request arrives to a tablet server which
       // hasn't yet served a write request in the context of the specified
       // transaction.
-      return cb_(Status::OK());
+      return cb_(Status::OK(), TabletServerErrorPB::UNKNOWN_ERROR);
     }
-    return cb_(status_);
+    return cb_(status_, code_);
   }
 
  private:
-  StatusCallback cb_;
+  RegisteredTxnCallback cb_;
   unique_ptr<ParticipantRequestPB> req_;
   const int64_t txn_id_;
 };
 
-void TabletReplica::BeginTxnParticipantOp(int64_t txn_id, StatusCallback cb) {
+void TabletReplica::BeginTxnParticipantOp(int64_t txn_id, RegisteredTxnCallback began_txn_cb) {
   auto s = CheckRunning();
   if (PREDICT_FALSE(!s.ok())) {
-    return cb(s);
+    return began_txn_cb(s, TabletServerErrorPB::UNKNOWN_ERROR);
   }
 
   unique_ptr<ParticipantRequestPB> req(new ParticipantRequestPB);
@@ -1144,7 +1144,7 @@ void TabletReplica::BeginTxnParticipantOp(int64_t txn_id, StatusCallback cb) {
   unique_ptr<ParticipantOpState> op_state(
       new ParticipantOpState(this, tablet()->txn_participant(), req.get()));
   op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
-      new ParticipantBeginTxnCallback(cb, std::move(req))));
+      new ParticipantBeginTxnCallback(began_txn_cb, std::move(req))));
   s = SubmitTxnParticipantOp(std::move(op_state));
   VLOG(3) << Substitute(
       "submitting BEGIN_TXN for participant $0 (txn ID $1): $2",
@@ -1152,7 +1152,7 @@ void TabletReplica::BeginTxnParticipantOp(int64_t txn_id, StatusCallback cb) {
   if (PREDICT_FALSE(!s.ok())) {
     // Now it's time to respond with appropriate error status to the RPCs
     // corresponding to the pending write operations.
-    return cb(s);
+    return began_txn_cb(s, TabletServerErrorPB::UNKNOWN_ERROR);
   }
 }
 
@@ -1174,7 +1174,7 @@ void TabletReplica::MakeUnavailable(const Status& error) {
 
 Status TabletReplica::TxnOpDispatcher::Dispatch(
     std::unique_ptr<WriteOpState> op,
-    const std::function<Status(int64_t txn_id, StatusCallback cb)>& scheduler) {
+    const std::function<Status(int64_t txn_id, RegisteredTxnCallback cb)>& scheduler) {
   const auto txn_id = op->request()->txn_id();
   std::lock_guard<simple_spinlock> guard(lock_);
   if (PREDICT_FALSE(unregistered_)) {
@@ -1204,7 +1204,7 @@ Status TabletReplica::TxnOpDispatcher::Dispatch(
   // tasks. In case of success, the callback is invoked after completion
   // of the preliminary tasks with Status::OK(). In case of any failure down
   // the road, the callback is called with corresponding non-OK status.
-  auto cb = [self = shared_from_this(), txn_id](const Status& s) {
+  auto cb = [self = shared_from_this(), txn_id](const Status& s, TabletServerErrorPB::Code code) {
     if (PREDICT_TRUE(s.ok())) {
       // The all-is-well case: it's time to submit all the write operations
       // accumulated in the queue.
@@ -1218,7 +1218,7 @@ Status TabletReplica::TxnOpDispatcher::Dispatch(
       }
     } else {
       // Something went wrong: cancel all the pending write operations
-      self->Cancel(s);
+      self->Cancel(s, code);
       CHECK_OK(self->replica_->UnregisterTxnOpDispatcher(
           txn_id, false/*abort_pending_ops*/));
     }
@@ -1279,13 +1279,16 @@ Status TabletReplica::TxnOpDispatcher::Submit() {
     std::swap(failed_ops, ops_queue_);
   }
 
-  return RespondWithStatus(failed_status, std::move(failed_ops));
+  return RespondWithStatus(failed_status,
+                           TabletServerErrorPB::UNKNOWN_ERROR,
+                           std::move(failed_ops));
 }
 
-void TabletReplica::TxnOpDispatcher::Cancel(const Status& status) {
+void TabletReplica::TxnOpDispatcher::Cancel(const Status& status,
+                                            TabletServerErrorPB::Code code) {
   CHECK(!status.ok());
-  LOG(WARNING) << Substitute("$0: cancelling pending write operations",
-                             status.ToString());
+  KLOG_EVERY_N_SECS(WARNING, 1) << Substitute("$0: cancelling pending write operations",
+                                              status.ToString());
   decltype(ops_queue_) ops;
   {
     std::lock_guard<simple_spinlock> guard(lock_);
@@ -1293,7 +1296,7 @@ void TabletReplica::TxnOpDispatcher::Cancel(const Status& status) {
     std::swap(ops, ops_queue_);
   }
 
-  RespondWithStatus(status, std::move(ops));
+  RespondWithStatus(status, code, std::move(ops));
 }
 
 Status TabletReplica::TxnOpDispatcher::MarkUnregistered() {
@@ -1329,12 +1332,13 @@ Status TabletReplica::TxnOpDispatcher::EnqueueUnlocked(unique_ptr<WriteOpState>
 
 Status TabletReplica::TxnOpDispatcher::RespondWithStatus(
     const Status& status,
+    TabletServerErrorPB::Code code,
     deque<unique_ptr<WriteOpState>> ops) {
   // Invoke the callback for every operation in the queue.
   for (auto& op : ops) {
     auto* cb = op->completion_callback();
     DCHECK(cb);
-    cb->set_error(status);
+    cb->set_error(status, code);
     cb->OpCompleted();
   }
   return status;
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 711360d..bfcf7a4 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -42,10 +42,10 @@
 #include "kudu/tablet/ops/write_op.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metadata.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
-#include "kudu/util/status_callback.h"
 
 namespace kudu {
 class AlterTableTest;
@@ -90,6 +90,11 @@ class TabletStatusPB;
 class TxnCoordinator;
 class TxnCoordinatorFactory;
 
+// Callback to run once the work to register a participant and start a
+// transaction on the participant has completed (whether successful or not).
+typedef std::function<void(const Status& status, tserver::TabletServerErrorPB::Code code)>
+    RegisteredTxnCallback;
+
 // A replica in a tablet consensus configuration, which coordinates writes to tablets.
 // Each time Write() is called this class appends a new entry to a replicated
 // state machine through a consensus algorithm, which makes sure that other
@@ -156,7 +161,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   // mentioned above.
   Status SubmitTxnWrite(
       std::unique_ptr<WriteOpState> op_state,
-      const std::function<Status(int64_t txn_id, StatusCallback cb)>& scheduler);
+      const std::function<Status(int64_t txn_id, RegisteredTxnCallback cb)>& scheduler);
 
   // Unregister TxnWriteOpDispacher for the specified transaction identifier
   // 'txn_id'. If no pending write requests are accumulated by the dispatcher,
@@ -382,7 +387,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   void DecreaseTxnCoordinatorTaskCounter();
 
   // Submit ParticipantOpPB::BEGIN_TXN operation for the specified transaction.
-  void BeginTxnParticipantOp(int64_t txn_id, StatusCallback cb);
+  void BeginTxnParticipantOp(int64_t txn_id, RegisteredTxnCallback began_txn_cb);
 
  private:
   friend class kudu::AlterTableTest;
@@ -425,7 +430,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
     // invoked to schedule preliminary tasks, if necessary.
     Status Dispatch(std::unique_ptr<WriteOpState> op,
                     const std::function<Status(int64_t txn_id,
-                                               StatusCallback cb)>& scheduler);
+                                               RegisteredTxnCallback cb)>& scheduler);
 
     // Submit all pending operations. Returns OK if all operations have been
     // submitted successfully, or 'inflight_status_' if any of those failed.
@@ -433,7 +438,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
 
     // Invoke callbacks for every buffered operation with the 'status';
     // the 'status' must be a non-OK one.
-    void Cancel(const Status& status);
+    void Cancel(const Status& status, tserver::TabletServerErrorPB::Code code);
 
     // Mark the dispatcher as not accepting any write operations: this is to
     // eventually unregister the dispatcher for the corresponding transaction
@@ -456,6 +461,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
     // Respond to the given write operations with the specified status.
     static Status RespondWithStatus(
         const Status& status,
+        tserver::TabletServerErrorPB::Code code,
         std::deque<std::unique_ptr<WriteOpState>> ops);
 
     // Pointer to the parent TabletReplica instance which keeps this
diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc
index ce8e24d..837272c 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -231,24 +231,14 @@ void CommitTasks::BeginCommitAsyncTask(int participant_idx) {
       BeginCommitAsyncTask(participant_idx);
       return;
     }
-    if (PREDICT_FALSE(s.IsNotFound())) {
-      // If the participant has been deleted, treat it as though it's already
-      // been committed, rather than attempting to abort or something. This is
-      // important to ensure retries of the commit tasks reliably result in the
-      // same operations being performed.
-      LOG(INFO) << Substitute("Participant $0 was not found for BEGIN_COMMIT, aborting: $1",
-                              participant_ids_[participant_idx], s.ToString());
+    if (PREDICT_FALSE(!s.ok())) {
+      // We might see errors if the participant was deleted, or because the
+      // participant didn't successfully start the transaction. In any case,
+      // abort the transaction.
+      LOG(INFO) << Substitute("Participant $0 of txn $1 returned error for BEGIN_COMMIT op, "
+                              "aborting: $2", participant_ids_[participant_idx],
+                              txn_id_.ToString(), s.ToString());
       SetNeedsBeginAbort();
-    } else if (PREDICT_FALSE(!s.ok())) {
-      // For any other kind of error, just exit without completing.
-      // TODO(awong): we're presuming that such errors wouldn't benefit from
-      // just retrying.
-      // TODO(awong): we don't expect them, but if we ever somehow find
-      // ourselves with an aborted transaction on the participant, we should
-      // probably abort here.
-      LOG(WARNING) << Substitute("Participant $0 BEGIN_COMMIT op returned $1",
-                                 participant_ids_[participant_idx], s.ToString());
-      stop_task_ = true;
     }
 
     // If this was the last participant op for this task, we have some cleanup
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index c2d8c5e..4d1c3db 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -110,7 +110,6 @@
 #include "kudu/util/random_util.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
-#include "kudu/util/status_callback.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
@@ -1631,9 +1630,9 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
     // This functor is to schedule preliminary tasks prior to submitting
     // the write operation via TabletReplica::SubmitWrite().
     const auto scheduler = [this, &username, replica, deadline](
-        int64_t txn_id, StatusCallback cb) {
+        int64_t txn_id, tablet::RegisteredTxnCallback began_txn_cb) {
       return server_->tablet_manager()->SchedulePreliminaryTasksForTxnWrite(
-          std::move(replica), txn_id, username, deadline, std::move(cb));
+          std::move(replica), txn_id, username, deadline, std::move(began_txn_cb));
     };
     s = replica->SubmitTxnWrite(std::move(op_state), scheduler);
     VLOG(2) << Substitute("submitting txn write op: $0", s.ToString());
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 989831e..d9b9d0f 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1125,7 +1125,7 @@ void TSTabletManager::RegisterAndBeginParticipantTxnTask(
     int64_t txn_id,
     const string& user,
     MonoTime deadline,
-    StatusCallback cb) {
+    tablet::RegisteredTxnCallback began_txn_cb) {
   DCHECK(txn_system_client);
   // TODO(aserbin): add and update metrics to track how long these calls take
   // TODO(aserbin): a future improvement to reduce overall latency is to use
@@ -1149,16 +1149,18 @@ void TSTabletManager::RegisterAndBeginParticipantTxnTask(
   {
     const auto now = MonoTime::Now();
     if (deadline <= now) {
-      return cb(Status::TimedOut(
-          Substitute("time out prior registering tablet $0 as participant (txn ID $1)",
-          replica->tablet_id(), txn_id)));
+      return began_txn_cb(
+          Status::TimedOut(
+              Substitute("time out prior registering tablet $0 as participant (txn ID $1)",
+                         replica->tablet_id(), txn_id)),
+          TabletServerErrorPB::UNKNOWN_ERROR);
     }
     auto s = txn_system_client->RegisterParticipant(
         txn_id, replica->tablet_id(), user, deadline - now);
     VLOG(2) << Substitute("RegisterParticipant() $0 for txn ID $1 returned $2",
                           replica->tablet_id(), txn_id, s.ToString());
     if (PREDICT_FALSE(!s.ok())) {
-      return cb(s);
+      return began_txn_cb(s, TabletServerErrorPB::TXN_ILLEGAL_STATE);
     }
   }
 
@@ -1167,11 +1169,12 @@ void TSTabletManager::RegisterAndBeginParticipantTxnTask(
   MAYBE_INJECT_FIXED_LATENCY(FLAGS_txn_participant_begin_op_inject_latency_ms);
 
   if (deadline <= MonoTime::Now()) {
-    return cb(Status::TimedOut(Substitute(
+    return began_txn_cb(Status::TimedOut(Substitute(
         "time out prior submitting BEGIN_TXN for participant $0 (txn ID $1)",
-        replica->tablet_id(), txn_id)));
+        replica->tablet_id(), txn_id)),
+        TabletServerErrorPB::UNKNOWN_ERROR);
   }
-  return replica->BeginTxnParticipantOp(txn_id, std::move(cb));
+  return replica->BeginTxnParticipantOp(txn_id, std::move(began_txn_cb));
 }
 
 Status TSTabletManager::StartTabletStateTransitionUnlocked(
@@ -1871,7 +1874,7 @@ Status TSTabletManager::SchedulePreliminaryTasksForTxnWrite(
     int64_t txn_id,
     const string& user,
     MonoTime deadline,
-    StatusCallback cb) {
+    tablet::RegisteredTxnCallback cb) {
   // An important pre-condition to running operations below: the availability
   // of the transaction system client.
   transactions::TxnSystemClient* tsc = nullptr;
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index f5e1f92..1088da7 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -44,7 +44,6 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/rw_mutex.h"
 #include "kudu/util/status.h"
-#include "kudu/util/status_callback.h"
 
 namespace boost {
 template <class T>
@@ -59,6 +58,7 @@ class Partition;
 class PartitionSchema;
 class Schema;
 class ThreadPool;
+
 namespace transactions {
 class TxnSystemClient;
 }  // namespace transactions
@@ -250,7 +250,7 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
       int64_t txn_id,
       const std::string& user,
       MonoTime deadline,
-      StatusCallback cb);
+      tablet::RegisteredTxnCallback cb);
 
  private:
   FRIEND_TEST(LeadershipChangeReportingTest, TestReportStatsDuringLeadershipChange);
@@ -276,7 +276,7 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
       int64_t txn_id,
       const std::string& user,
       MonoTime deadline,
-      StatusCallback cb);
+      tablet::RegisteredTxnCallback began_txn_cb);
 
   // Returns Status::OK() iff state_ == MANAGER_RUNNING.
   Status CheckRunningUnlocked(TabletServerErrorPB::Code* error_code) const;