You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/04/05 01:04:29 UTC

[kudu] 02/02: KUDU-2612: route txn op dispatching errors to write ops

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 78fb2047da74a1ebd051dbfa29c4d188056b47bc
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Wed Mar 24 00:57:14 2021 -0700

    KUDU-2612: route txn op dispatching errors to write ops
    
    This patch routes bad statuses from the TxnOpDispatcher to the write ops
    that initiated the registration, and adjusts the batcher code to handle
    such errors.
    
    This also enables a couple of test cases that were written with this
    behavior in mind; and adjusts some expected errors, addressing some
    TODOs that expected this behavior.
    
    A follow-up patch will make a similar change to the Java client.
    
    Change-Id: Ibf85e0724ee579cb20dac642b82e3228faf90935
    Reviewed-on: http://gerrit.cloudera.org:8080/17217
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/client/batcher.cc                        |  6 +++
 src/kudu/integration-tests/txn_commit-itest.cc    |  2 +-
 src/kudu/integration-tests/txn_write_ops-itest.cc | 60 ++++++++++-------------
 src/kudu/tablet/tablet_metadata.cc                |  6 +--
 src/kudu/tablet/tablet_replica.cc                 | 44 +++++++++--------
 src/kudu/tablet/tablet_replica.h                  | 16 ++++--
 src/kudu/transactions/txn_status_manager.cc       | 24 +++------
 src/kudu/tserver/tablet_service.cc                |  5 +-
 src/kudu/tserver/ts_tablet_manager.cc             | 21 ++++----
 src/kudu/tserver/ts_tablet_manager.h              |  6 +--
 10 files changed, 94 insertions(+), 96 deletions(-)

diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index 93036b2..dc26aed 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -506,6 +506,12 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
     return result;
   }
 
+  if (resp_.has_error() &&
+      resp_.error().code() == tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE) {
+    result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
+    return result;
+  }
+
   // Alternatively, when we get a status code of IllegalState or Aborted, we
   // assume this means that the replica we attempted to write to is not the
   // current leader (maybe it got partitioned or slow and another node took
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc b/src/kudu/integration-tests/txn_commit-itest.cc
index f125f54..4ddc4c0 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -679,7 +679,7 @@ TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) {
   Status completion_status;
   bool is_complete;
   ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
-  ASSERT_TRUE(completion_status.IsIncomplete()) << completion_status.ToString();
+  ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
 }
 
 // Try concurrently beginning to commit a bunch of different transactions.
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index cb7a8f0..2a37aa4 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -659,12 +659,14 @@ TEST_F(TxnWriteOpsITest, WriteOpForNonExistentTxn) {
 
 // Try to write an extra row in the context of a transaction which has already
 // been committed.
-//
-// TODO(aserbin): due to conversion of Status::IllegalState() into
-//                RetriableRpcStatus::REPLICA_NOT_LEADER result code,
-//                these sub-scenarios fail with Status::TimedOut() because
-//                they retry in vain until RPC timeout elapses
-TEST_F(TxnWriteOpsITest, DISABLED_TxnWriteAfterCommit) {
+TEST_F(TxnWriteOpsITest, TxnWriteAfterCommit) {
+  const vector<string> master_flags = {
+    // Enable TxnManager in Kudu masters.
+    // TODO(aserbin): remove this customization once the flag is 'on' by default
+    "--txn_manager_enabled=true",
+  };
+  NO_FATALS(StartCluster({}, master_flags, kNumTabletServers));
+  NO_FATALS(Prepare());
   int idx = 0;
   {
     shared_ptr<KuduTransaction> txn;
@@ -685,10 +687,10 @@ TEST_F(TxnWriteOpsITest, DISABLED_TxnWriteAfterCommit) {
       ASSERT_TRUE(s.IsIOError()) << s.ToString();
       ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
       const auto err_status = GetSingleRowError(session.get());
-      ASSERT_TRUE(err_status.IsInvalidArgument()) << err_status.ToString();
+      ASSERT_TRUE(err_status.IsIllegalState()) << err_status.ToString();
       ASSERT_STR_CONTAINS(err_status.ToString(),
                           "Failed to write batch of 1 ops to tablet");
-      ASSERT_STR_MATCHES(err_status.ToString(), "txn .* is not open");
+      ASSERT_STR_MATCHES(err_status.ToString(), "transaction .* not open");
     }
   }
   // A scenario similar to one above, but restart tablet servers before an
@@ -722,10 +724,10 @@ TEST_F(TxnWriteOpsITest, DISABLED_TxnWriteAfterCommit) {
       ASSERT_TRUE(s.IsIOError()) << s.ToString();
       ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
       const auto err_status = GetSingleRowError(session.get());
-      ASSERT_TRUE(err_status.IsNotFound()) << err_status.ToString();
+      ASSERT_TRUE(err_status.IsIllegalState()) << err_status.ToString();
       ASSERT_STR_CONTAINS(err_status.ToString(),
                           "Failed to write batch of 1 ops to tablet");
-      ASSERT_STR_MATCHES(err_status.ToString(), "txn .* is not open");
+      ASSERT_STR_MATCHES(err_status.ToString(), "transaction .* not open");
     }
   }
 }
@@ -1031,11 +1033,6 @@ TEST_F(TxnOpDispatcherITest, ErrorInParticipantRegistration) {
     // Here a custom client with shorter timeout is used to avoid making
     // too many pointless retries upon receiving Status::IllegalState()
     // from the tablet server.
-    //
-    // TODO(aserbin): stop using the custom client with shorter timeout as soon
-    //                as the issue in client/batcher.cc with the transformation
-    //                of both Status::IllegalState() and Status::Aborted() into
-    //                RetriableRpcStatus::REPLICA_NOT_LEADER result if fixed
     const MonoDelta kCustomTimeout = MonoDelta::FromSeconds(2);
     KuduClientBuilder builder;
     builder.default_admin_operation_timeout(kCustomTimeout);
@@ -1057,7 +1054,7 @@ TEST_F(TxnOpDispatcherITest, ErrorInParticipantRegistration) {
     auto s = InsertRows(txn.get(), 1, &key, &session);
     ASSERT_TRUE(s.IsIOError()) << s.ToString();
     auto row_status = GetSingleRowError(session.get());
-    ASSERT_TRUE(row_status.IsTimedOut()) << row_status.ToString();
+    ASSERT_TRUE(row_status.IsIllegalState()) << row_status.ToString();
     ASSERT_STR_CONTAINS(row_status.ToString(),
                         "Failed to write batch of 1 ops to tablet");
 
@@ -1370,11 +1367,7 @@ TEST_F(TxnOpDispatcherITest, PreliminaryTasksTimeout) {
     } else {
       // This is the case when tablets have been registered as participants.
       // In this case, the transaction should not be able to finalize.
-      //
-      // TODO(aserbin): this should result in IllegalState() after addressing
-      //                the issue with convertion of IllegalState() into
-      //                retriable error status in Kudu client
-      ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+      ASSERT_TRUE(s.IsAborted()) << s.ToString();
     }
 
     // No rows should be persisted.
@@ -1485,13 +1478,7 @@ TEST_F(TxnOpDispatcherITest, CommitWithWriteOpPendingParticipantNotYetRegistered
   const auto s = InsertRows(txn.get(), 1, &key, &session);
   ASSERT_TRUE(s.IsIOError()) << s.ToString();
   const auto row_status = GetSingleRowError(session.get());
-  // TODO(aserbin): due to conversion of Status::IllegalState() into
-  //                RetriableRpcStatus::REPLICA_NOT_LEADER result code,
-  //                the write RPC times out after many retries instead of
-  //                bailing out right away. Update the expected error code after
-  //                the issue is fixed.
-  //ASSERT_TRUE(row_status.IsIllegalState());
-  ASSERT_TRUE(row_status.IsTimedOut()) << s.ToString();
+  ASSERT_TRUE(row_status.IsIllegalState()) << s.ToString();
 
   committer.join();
   cleanup.cancel();
@@ -1521,7 +1508,7 @@ TEST_F(TxnOpDispatcherITest, CommitWithWriteOpPendingParticipantNotYetRegistered
 //
 // TODO(aserbin): enable the scenario after the follow-up for
 //                https://gerrit.cloudera.org/#/c/17127/ is merged
-TEST_F(TxnOpDispatcherITest, DISABLED_CommitWithWriteOpPendingParticipantRegistered) {
+TEST_F(TxnOpDispatcherITest, CommitWithWriteOpPendingParticipantRegistered) {
   SKIP_IF_SLOW_NOT_ALLOWED();
 
   constexpr auto kDelayMs = 1000;
@@ -1545,21 +1532,26 @@ TEST_F(TxnOpDispatcherITest, DISABLED_CommitWithWriteOpPendingParticipantRegiste
 
   shared_ptr<KuduSession> session;
   int64_t key = 0;
-  ASSERT_OK(InsertRows(txn.get(), 1, &key, &session));
+  Status s = InsertRows(txn.get(), 1, &key, &session);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
 
+  // Since we tried to commit without allowing all participants to quiesce ops,
+  // the transaction should automatically fail.
   committer.join();
   cleanup.cancel();
   ASSERT_OK(commit_init_status);
 
   bool is_complete = false;
   Status completion_status;
-  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
-  ASSERT_TRUE(is_complete);
-  ASSERT_OK(completion_status);
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(is_complete);
+    ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+  });
 
   size_t num_rows = 0;
   ASSERT_OK(CountRows(table_.get(), &num_rows));
-  ASSERT_EQ(1, num_rows);
+  ASSERT_EQ(0, num_rows);
 
   // Since the commit has been successfully finalized, there should be no
   // TxnOpDispatcher for the transaction.
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 0d83dad..69fcc84 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -822,8 +822,7 @@ void TabletMetadata::AddTxnMetadata(int64_t txn_id, unique_ptr<MinLogIndexAnchor
 void TabletMetadata::BeginCommitTransaction(int64_t txn_id, Timestamp mvcc_op_timestamp,
                                             unique_ptr<MinLogIndexAnchorer> log_anchor) {
   std::lock_guard<LockType> l(data_lock_);
-  auto txn_metadata = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
-  CHECK(txn_metadata);
+  auto txn_metadata = FindOrDie(txn_metadata_by_txn_id_, txn_id);
   // NOTE: we may already have an MVCC op timestamp if we are bootstrapping and
   // the timestamp was persisted already, in which case, we don't need to
   // anchor the WAL to ensure the timestamp's persistence in metadata.
@@ -836,8 +835,7 @@ void TabletMetadata::BeginCommitTransaction(int64_t txn_id, Timestamp mvcc_op_ti
 void TabletMetadata::AddCommitTimestamp(int64_t txn_id, Timestamp commit_timestamp,
                                         unique_ptr<MinLogIndexAnchorer> log_anchor) {
   std::lock_guard<LockType> l(data_lock_);
-  auto txn_metadata = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
-  CHECK(txn_metadata);
+  auto txn_metadata = FindOrDie(txn_metadata_by_txn_id_, txn_id);
   txn_metadata->set_commit_timestamp(commit_timestamp);
   anchors_needing_flush_.emplace_back(std::move(log_anchor));
 }
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index f4995c5..a50bdb4 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -71,7 +71,6 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
-#include "kudu/util/status_callback.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
@@ -538,7 +537,7 @@ Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) {
 
 Status TabletReplica::SubmitTxnWrite(
     std::unique_ptr<WriteOpState> op_state,
-    const std::function<Status(int64_t txn_id, StatusCallback cb)>& scheduler) {
+    const std::function<Status(int64_t txn_id, RegisteredTxnCallback cb)>& scheduler) {
   DCHECK(op_state);
   DCHECK(op_state->request()->has_txn_id());
 
@@ -576,7 +575,8 @@ Status TabletReplica::UnregisterTxnOpDispatcher(int64_t txn_id,
     auto& dispatcher = it->second;
     unregister_status = dispatcher->MarkUnregistered();
     if (abort_pending_ops) {
-      dispatcher->Cancel(Status::Aborted("operation has been aborted"));
+      dispatcher->Cancel(Status::Aborted("operation has been aborted"),
+                         TabletServerErrorPB::TXN_ILLEGAL_STATE);
       unregister_status = Status::OK();
     }
     if (unregister_status.ok()) {
@@ -1095,7 +1095,7 @@ void TabletReplica::DecreaseTxnCoordinatorTaskCounter() {
 
 class ParticipantBeginTxnCallback : public OpCompletionCallback {
  public:
-  ParticipantBeginTxnCallback(StatusCallback cb,
+  ParticipantBeginTxnCallback(RegisteredTxnCallback cb,
                               unique_ptr<ParticipantRequestPB> req)
       : cb_(std::move(cb)),
         req_(std::move(req)),
@@ -1116,21 +1116,21 @@ class ParticipantBeginTxnCallback : public OpCompletionCallback {
       // a transactional write request arrives to a tablet server which
       // hasn't yet served a write request in the context of the specified
       // transaction.
-      return cb_(Status::OK());
+      return cb_(Status::OK(), TabletServerErrorPB::UNKNOWN_ERROR);
     }
-    return cb_(status_);
+    return cb_(status_, code_);
   }
 
  private:
-  StatusCallback cb_;
+  RegisteredTxnCallback cb_;
   unique_ptr<ParticipantRequestPB> req_;
   const int64_t txn_id_;
 };
 
-void TabletReplica::BeginTxnParticipantOp(int64_t txn_id, StatusCallback cb) {
+void TabletReplica::BeginTxnParticipantOp(int64_t txn_id, RegisteredTxnCallback began_txn_cb) {
   auto s = CheckRunning();
   if (PREDICT_FALSE(!s.ok())) {
-    return cb(s);
+    return began_txn_cb(s, TabletServerErrorPB::UNKNOWN_ERROR);
   }
 
   unique_ptr<ParticipantRequestPB> req(new ParticipantRequestPB);
@@ -1144,7 +1144,7 @@ void TabletReplica::BeginTxnParticipantOp(int64_t txn_id, StatusCallback cb) {
   unique_ptr<ParticipantOpState> op_state(
       new ParticipantOpState(this, tablet()->txn_participant(), req.get()));
   op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
-      new ParticipantBeginTxnCallback(cb, std::move(req))));
+      new ParticipantBeginTxnCallback(began_txn_cb, std::move(req))));
   s = SubmitTxnParticipantOp(std::move(op_state));
   VLOG(3) << Substitute(
       "submitting BEGIN_TXN for participant $0 (txn ID $1): $2",
@@ -1152,7 +1152,7 @@ void TabletReplica::BeginTxnParticipantOp(int64_t txn_id, StatusCallback cb) {
   if (PREDICT_FALSE(!s.ok())) {
     // Now it's time to respond with appropriate error status to the RPCs
     // corresponding to the pending write operations.
-    return cb(s);
+    return began_txn_cb(s, TabletServerErrorPB::UNKNOWN_ERROR);
   }
 }
 
@@ -1174,7 +1174,7 @@ void TabletReplica::MakeUnavailable(const Status& error) {
 
 Status TabletReplica::TxnOpDispatcher::Dispatch(
     std::unique_ptr<WriteOpState> op,
-    const std::function<Status(int64_t txn_id, StatusCallback cb)>& scheduler) {
+    const std::function<Status(int64_t txn_id, RegisteredTxnCallback cb)>& scheduler) {
   const auto txn_id = op->request()->txn_id();
   std::lock_guard<simple_spinlock> guard(lock_);
   if (PREDICT_FALSE(unregistered_)) {
@@ -1204,7 +1204,7 @@ Status TabletReplica::TxnOpDispatcher::Dispatch(
   // tasks. In case of success, the callback is invoked after completion
   // of the preliminary tasks with Status::OK(). In case of any failure down
   // the road, the callback is called with corresponding non-OK status.
-  auto cb = [self = shared_from_this(), txn_id](const Status& s) {
+  auto cb = [self = shared_from_this(), txn_id](const Status& s, TabletServerErrorPB::Code code) {
     if (PREDICT_TRUE(s.ok())) {
       // The all-is-well case: it's time to submit all the write operations
       // accumulated in the queue.
@@ -1218,7 +1218,7 @@ Status TabletReplica::TxnOpDispatcher::Dispatch(
       }
     } else {
       // Something went wrong: cancel all the pending write operations
-      self->Cancel(s);
+      self->Cancel(s, code);
       CHECK_OK(self->replica_->UnregisterTxnOpDispatcher(
           txn_id, false/*abort_pending_ops*/));
     }
@@ -1279,13 +1279,16 @@ Status TabletReplica::TxnOpDispatcher::Submit() {
     std::swap(failed_ops, ops_queue_);
   }
 
-  return RespondWithStatus(failed_status, std::move(failed_ops));
+  return RespondWithStatus(failed_status,
+                           TabletServerErrorPB::UNKNOWN_ERROR,
+                           std::move(failed_ops));
 }
 
-void TabletReplica::TxnOpDispatcher::Cancel(const Status& status) {
+void TabletReplica::TxnOpDispatcher::Cancel(const Status& status,
+                                            TabletServerErrorPB::Code code) {
   CHECK(!status.ok());
-  LOG(WARNING) << Substitute("$0: cancelling pending write operations",
-                             status.ToString());
+  KLOG_EVERY_N_SECS(WARNING, 1) << Substitute("$0: cancelling pending write operations",
+                                              status.ToString());
   decltype(ops_queue_) ops;
   {
     std::lock_guard<simple_spinlock> guard(lock_);
@@ -1293,7 +1296,7 @@ void TabletReplica::TxnOpDispatcher::Cancel(const Status& status) {
     std::swap(ops, ops_queue_);
   }
 
-  RespondWithStatus(status, std::move(ops));
+  RespondWithStatus(status, code, std::move(ops));
 }
 
 Status TabletReplica::TxnOpDispatcher::MarkUnregistered() {
@@ -1329,12 +1332,13 @@ Status TabletReplica::TxnOpDispatcher::EnqueueUnlocked(unique_ptr<WriteOpState>
 
 Status TabletReplica::TxnOpDispatcher::RespondWithStatus(
     const Status& status,
+    TabletServerErrorPB::Code code,
     deque<unique_ptr<WriteOpState>> ops) {
   // Invoke the callback for every operation in the queue.
   for (auto& op : ops) {
     auto* cb = op->completion_callback();
     DCHECK(cb);
-    cb->set_error(status);
+    cb->set_error(status, code);
     cb->OpCompleted();
   }
   return status;
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 711360d..bfcf7a4 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -42,10 +42,10 @@
 #include "kudu/tablet/ops/write_op.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metadata.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
-#include "kudu/util/status_callback.h"
 
 namespace kudu {
 class AlterTableTest;
@@ -90,6 +90,11 @@ class TabletStatusPB;
 class TxnCoordinator;
 class TxnCoordinatorFactory;
 
+// Callback to run once the work to register a participant and start a
+// transaction on the participant has completed (whether successful or not).
+typedef std::function<void(const Status& status, tserver::TabletServerErrorPB::Code code)>
+    RegisteredTxnCallback;
+
 // A replica in a tablet consensus configuration, which coordinates writes to tablets.
 // Each time Write() is called this class appends a new entry to a replicated
 // state machine through a consensus algorithm, which makes sure that other
@@ -156,7 +161,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   // mentioned above.
   Status SubmitTxnWrite(
       std::unique_ptr<WriteOpState> op_state,
-      const std::function<Status(int64_t txn_id, StatusCallback cb)>& scheduler);
+      const std::function<Status(int64_t txn_id, RegisteredTxnCallback cb)>& scheduler);
 
   // Unregister TxnWriteOpDispacher for the specified transaction identifier
   // 'txn_id'. If no pending write requests are accumulated by the dispatcher,
@@ -382,7 +387,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   void DecreaseTxnCoordinatorTaskCounter();
 
   // Submit ParticipantOpPB::BEGIN_TXN operation for the specified transaction.
-  void BeginTxnParticipantOp(int64_t txn_id, StatusCallback cb);
+  void BeginTxnParticipantOp(int64_t txn_id, RegisteredTxnCallback began_txn_cb);
 
  private:
   friend class kudu::AlterTableTest;
@@ -425,7 +430,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
     // invoked to schedule preliminary tasks, if necessary.
     Status Dispatch(std::unique_ptr<WriteOpState> op,
                     const std::function<Status(int64_t txn_id,
-                                               StatusCallback cb)>& scheduler);
+                                               RegisteredTxnCallback cb)>& scheduler);
 
     // Submit all pending operations. Returns OK if all operations have been
     // submitted successfully, or 'inflight_status_' if any of those failed.
@@ -433,7 +438,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
 
     // Invoke callbacks for every buffered operation with the 'status';
     // the 'status' must be a non-OK one.
-    void Cancel(const Status& status);
+    void Cancel(const Status& status, tserver::TabletServerErrorPB::Code code);
 
     // Mark the dispatcher as not accepting any write operations: this is to
     // eventually unregister the dispatcher for the corresponding transaction
@@ -456,6 +461,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
     // Respond to the given write operations with the specified status.
     static Status RespondWithStatus(
         const Status& status,
+        tserver::TabletServerErrorPB::Code code,
         std::deque<std::unique_ptr<WriteOpState>> ops);
 
     // Pointer to the parent TabletReplica instance which keeps this
diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc
index ce8e24d..837272c 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -231,24 +231,14 @@ void CommitTasks::BeginCommitAsyncTask(int participant_idx) {
       BeginCommitAsyncTask(participant_idx);
       return;
     }
-    if (PREDICT_FALSE(s.IsNotFound())) {
-      // If the participant has been deleted, treat it as though it's already
-      // been committed, rather than attempting to abort or something. This is
-      // important to ensure retries of the commit tasks reliably result in the
-      // same operations being performed.
-      LOG(INFO) << Substitute("Participant $0 was not found for BEGIN_COMMIT, aborting: $1",
-                              participant_ids_[participant_idx], s.ToString());
+    if (PREDICT_FALSE(!s.ok())) {
+      // We might see errors if the participant was deleted, or because the
+      // participant didn't successfully start the transaction. In any case,
+      // abort the transaction.
+      LOG(INFO) << Substitute("Participant $0 of txn $1 returned error for BEGIN_COMMIT op, "
+                              "aborting: $2", participant_ids_[participant_idx],
+                              txn_id_.ToString(), s.ToString());
       SetNeedsBeginAbort();
-    } else if (PREDICT_FALSE(!s.ok())) {
-      // For any other kind of error, just exit without completing.
-      // TODO(awong): we're presuming that such errors wouldn't benefit from
-      // just retrying.
-      // TODO(awong): we don't expect them, but if we ever somehow find
-      // ourselves with an aborted transaction on the participant, we should
-      // probably abort here.
-      LOG(WARNING) << Substitute("Participant $0 BEGIN_COMMIT op returned $1",
-                                 participant_ids_[participant_idx], s.ToString());
-      stop_task_ = true;
     }
 
     // If this was the last participant op for this task, we have some cleanup
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index c2d8c5e..4d1c3db 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -110,7 +110,6 @@
 #include "kudu/util/random_util.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
-#include "kudu/util/status_callback.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
@@ -1631,9 +1630,9 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
     // This functor is to schedule preliminary tasks prior to submitting
     // the write operation via TabletReplica::SubmitWrite().
     const auto scheduler = [this, &username, replica, deadline](
-        int64_t txn_id, StatusCallback cb) {
+        int64_t txn_id, tablet::RegisteredTxnCallback began_txn_cb) {
       return server_->tablet_manager()->SchedulePreliminaryTasksForTxnWrite(
-          std::move(replica), txn_id, username, deadline, std::move(cb));
+          std::move(replica), txn_id, username, deadline, std::move(began_txn_cb));
     };
     s = replica->SubmitTxnWrite(std::move(op_state), scheduler);
     VLOG(2) << Substitute("submitting txn write op: $0", s.ToString());
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 989831e..d9b9d0f 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1125,7 +1125,7 @@ void TSTabletManager::RegisterAndBeginParticipantTxnTask(
     int64_t txn_id,
     const string& user,
     MonoTime deadline,
-    StatusCallback cb) {
+    tablet::RegisteredTxnCallback began_txn_cb) {
   DCHECK(txn_system_client);
   // TODO(aserbin): add and update metrics to track how long these calls take
   // TODO(aserbin): a future improvement to reduce overall latency is to use
@@ -1149,16 +1149,18 @@ void TSTabletManager::RegisterAndBeginParticipantTxnTask(
   {
     const auto now = MonoTime::Now();
     if (deadline <= now) {
-      return cb(Status::TimedOut(
-          Substitute("time out prior registering tablet $0 as participant (txn ID $1)",
-          replica->tablet_id(), txn_id)));
+      return began_txn_cb(
+          Status::TimedOut(
+              Substitute("time out prior registering tablet $0 as participant (txn ID $1)",
+                         replica->tablet_id(), txn_id)),
+          TabletServerErrorPB::UNKNOWN_ERROR);
     }
     auto s = txn_system_client->RegisterParticipant(
         txn_id, replica->tablet_id(), user, deadline - now);
     VLOG(2) << Substitute("RegisterParticipant() $0 for txn ID $1 returned $2",
                           replica->tablet_id(), txn_id, s.ToString());
     if (PREDICT_FALSE(!s.ok())) {
-      return cb(s);
+      return began_txn_cb(s, TabletServerErrorPB::TXN_ILLEGAL_STATE);
     }
   }
 
@@ -1167,11 +1169,12 @@ void TSTabletManager::RegisterAndBeginParticipantTxnTask(
   MAYBE_INJECT_FIXED_LATENCY(FLAGS_txn_participant_begin_op_inject_latency_ms);
 
   if (deadline <= MonoTime::Now()) {
-    return cb(Status::TimedOut(Substitute(
+    return began_txn_cb(Status::TimedOut(Substitute(
         "time out prior submitting BEGIN_TXN for participant $0 (txn ID $1)",
-        replica->tablet_id(), txn_id)));
+        replica->tablet_id(), txn_id)),
+        TabletServerErrorPB::UNKNOWN_ERROR);
   }
-  return replica->BeginTxnParticipantOp(txn_id, std::move(cb));
+  return replica->BeginTxnParticipantOp(txn_id, std::move(began_txn_cb));
 }
 
 Status TSTabletManager::StartTabletStateTransitionUnlocked(
@@ -1871,7 +1874,7 @@ Status TSTabletManager::SchedulePreliminaryTasksForTxnWrite(
     int64_t txn_id,
     const string& user,
     MonoTime deadline,
-    StatusCallback cb) {
+    tablet::RegisteredTxnCallback cb) {
   // An important pre-condition to running operations below: the availability
   // of the transaction system client.
   transactions::TxnSystemClient* tsc = nullptr;
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index f5e1f92..1088da7 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -44,7 +44,6 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/rw_mutex.h"
 #include "kudu/util/status.h"
-#include "kudu/util/status_callback.h"
 
 namespace boost {
 template <class T>
@@ -59,6 +58,7 @@ class Partition;
 class PartitionSchema;
 class Schema;
 class ThreadPool;
+
 namespace transactions {
 class TxnSystemClient;
 }  // namespace transactions
@@ -250,7 +250,7 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
       int64_t txn_id,
       const std::string& user,
       MonoTime deadline,
-      StatusCallback cb);
+      tablet::RegisteredTxnCallback cb);
 
  private:
   FRIEND_TEST(LeadershipChangeReportingTest, TestReportStatsDuringLeadershipChange);
@@ -276,7 +276,7 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
       int64_t txn_id,
       const std::string& user,
       MonoTime deadline,
-      StatusCallback cb);
+      tablet::RegisteredTxnCallback began_txn_cb);
 
   // Returns Status::OK() iff state_ == MANAGER_RUNNING.
   Status CheckRunningUnlocked(TabletServerErrorPB::Code* error_code) const;