You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/03/02 23:14:33 UTC

[kudu] branch master updated: KUDU-2612 tablet servers automatically register txn participants

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ceca6ab  KUDU-2612 tablet servers automatically register txn participants
ceca6ab is described below

commit ceca6abc506fc4c1012ced6e8a6564897b3af4ad
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sat Jan 23 09:37:35 2021 -0800

    KUDU-2612 tablet servers automatically register txn participants
    
    With this patch, tablet servers automatically register tablets as
    transaction participants and issue appropriate BEGIN_TXN operations
    upon receiving write operations targeting tablet replicas which they
    host.
    
    Internally, the newly introduced logic is mostly embedded into the
    TxnOpDispatcher class.  Every TxnOpDispatcher object is allowed to
    accumulate up to a certain number of pending write requests in its queue
    while awaiting for the completion of the preliminary tasks mentioned
    above.  Once the TxnOpDispatcher's queue is at capacity, a tablet server
    starts responding with the ErrorStatusPB::ERROR_SERVER_TOO_BUSY error
    code to incoming write requests in the context of the corresponding
    transaction, and Kudu clients automatically retry requests, as expected.
    The capacity of the TxnOpDispatcher's queue is controlled by a newly
    introduced --tablet_max_pending_txn_write_ops flag.  By default, the
    flag is set to 2.  Buffering a few write requests should help to avoid
    needless retries in case if a client packs many operations into one
    write request: that's exactly the behavior for Kudu client sessions
    using the AUTO_FLUSH_BACKGROUND mode.  If such buffering isn't desired
    for some reason, set --tablet_max_pending_txn_write_ops=0: in that case
    a client will retry the very first operation sent to a tablet server
    in the context of a transaction until the tablet server completes the
    preliminary tasks mentioned above.  The flag has runtime semantics,
    so no restart of a tablet server is required upon modification of the
    flag's setting.
    
    Each tablet replica maintains a txn_id --> TxnOpDispatcher map,
    with an entry's lifecycle as below:
      * An entry is added upon receiving write request in the context
        of a multi-row transaction.
      * An entry is removed upon applying either ParticipantOpPB::ABORT_TXN
        or ParticipantOpPB::FINALIZE_COMMIT operation.
      * If a write request is received after transaction has been committed
        or aborted, the entry is automatically removed once receiving
        corresponding error response from any of the following components:
          ** from TxnStatusManager in at attempts to register a participant
             in the context of committed/aborted transaction
          ** from the replica itself in an attempt to add
             ParticipantOpPB::BEGIN_COMMIT operation
    In other words, the system automatically gets rid of no-longer-needed
    TxnOpDispatcher entries.
    
    This patch also contains several test scenarios to cover the newly
    introduced functionality.  I also updated other related tests to remove
    the artificial registration of corresponding transaction participants,
    so those tests now rely on the newly introduced functionality.
    
    Change-Id: Ia383f7afd208c44695c57aab82e3818fa1712ce6
    Reviewed-on: http://gerrit.cloudera.org:8080/17037
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/batcher.cc                        |    6 +
 src/kudu/client/client-test.cc                    |   32 +-
 src/kudu/integration-tests/CMakeLists.txt         |    1 +
 src/kudu/integration-tests/fuzz-itest.cc          |    4 +
 src/kudu/integration-tests/txn_commit-itest.cc    |   88 +-
 src/kudu/integration-tests/txn_write_ops-itest.cc | 1614 +++++++++++++++++++++
 src/kudu/tablet/ops/participant_op.cc             |   36 +-
 src/kudu/tablet/tablet_replica.cc                 |  324 ++++-
 src/kudu/tablet/tablet_replica.h                  |  158 +-
 src/kudu/tserver/tablet_service.cc                |   51 +-
 src/kudu/tserver/ts_tablet_manager.cc             |  104 ++
 src/kudu/tserver/ts_tablet_manager.h              |   27 +-
 12 files changed, 2318 insertions(+), 127 deletions(-)

diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index 6fbb78d..93036b2 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -516,6 +516,12 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
   // require some server-side changes). For example, IllegalState is
   // obviously way too broad an error category for this case.
   if (result.status.IsIllegalState() || result.status.IsAborted()) {
+    // TODO(aserbin): this very broad transformation of Status::IllegalState()
+    //                becomes a real issue when handling responses to write
+    //                operations in the context of multi-row transactions.
+    //                For example, Status::IllegalState() originated from
+    //                TabletServerErrorPB::TXN_ILLEGAL_STATE responses are
+    //                needlessly retried.
     result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
     return result;
   }
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index b9cde36..9f8392a 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -104,7 +104,6 @@
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/txn_coordinator.h"
-#include "kudu/tablet/txn_participant-test-util.h"
 #include "kudu/transactions/transactions.pb.h"
 #include "kudu/transactions/txn_status_manager.h"
 #include "kudu/tserver/mini_tablet_server.h"
@@ -113,7 +112,6 @@
 #include "kudu/tserver/tablet_server_options.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver.pb.h"
-#include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/util/array_view.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/barrier.h"
@@ -210,9 +208,6 @@ using kudu::tablet::TabletReplica;
 using kudu::transactions::TxnStatusManager;
 using kudu::transactions::TxnTokenPB;
 using kudu::tserver::MiniTabletServer;
-using kudu::tserver::ParticipantOpPB;
-using kudu::tserver::ParticipantRequestPB;
-using kudu::tserver::ParticipantResponsePB;
 using std::function;
 using std::map;
 using std::pair;
@@ -391,24 +386,6 @@ class ClientTest : public KuduTest {
     }
   }
 
-  // TODO(awong): automatically begin transactions when trying to write to a
-  //              transaction for the first time.
-  void BeginTxnOnParticipants(int64_t txn_id) {
-    for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
-      auto* tm = cluster_->mini_tablet_server(i)->server()->tablet_manager();
-      vector<scoped_refptr<TabletReplica>> replicas;
-      tm->GetTabletReplicas(&replicas);
-      for (auto& r : replicas) {
-        // Skip partitions of the transaction status manager.
-        if (r->txn_coordinator()) continue;
-        ParticipantResponsePB resp;
-        WARN_NOT_OK(CallParticipantOp(
-            r.get(), txn_id, ParticipantOpPB::BEGIN_TXN, -1, &resp),
-            "failed to start transaction on participant");;
-      }
-    }
-  }
-
   // TODO(aserbin): consider removing this method and update the scenarios it
   //                was used once the transaction orchestration is implemented
   Status FinalizeCommitTransaction(int64_t txn_id) {
@@ -464,7 +441,7 @@ class ClientTest : public KuduTest {
   // Inserts given number of tests rows into the specified table
   // in the context of the session.
   static void InsertTestRows(KuduTable* table, KuduSession* session,
-                      int num_rows, int first_row = 0) {
+                             int num_rows, int first_row = 0) {
     for (int i = first_row; i < num_rows + first_row; ++i) {
       unique_ptr<KuduInsert> insert(BuildTestInsert(table, i));
       ASSERT_OK(session->Apply(insert.release()));
@@ -7153,9 +7130,6 @@ TEST_F(ClientTest, TxnBasicOperations) {
     ASSERT_STR_CONTAINS(s.ToString(), "is not open: state: ABORT");
   }
 
-  // TODO(aserbin): uncomment this when other parts of transaction lifecycle
-  //                are properly implemented
-#if 0
   // Insert rows in a transactional session, then rollback the transaction
   // and make sure the rows are gone.
   {
@@ -7182,7 +7156,6 @@ TEST_F(ClientTest, TxnBasicOperations) {
         client_table_.get(), KuduScanner::READ_YOUR_WRITES));
     ASSERT_EQ(0, session->CountPendingErrors());
   }
-#endif
 }
 
 // Verify the basic functionality of the KuduTransaction::Commit() and
@@ -7354,9 +7327,6 @@ TEST_F(ClientTest, TxnToken) {
   ASSERT_OK(serdes_txn->Serialize(&serdes_txn_token));
   ASSERT_EQ(txn_token, serdes_txn_token);
 
-  // TODO(awong): remove once we register participants automatically before
-  // inserting.
-  BeginTxnOnParticipants(txn_id);
   {
     static constexpr auto kNumRows = 10;
     shared_ptr<KuduSession> session;
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index c4c8902..3b4c184 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -136,6 +136,7 @@ ADD_KUDU_TEST(txn_commit-itest)
 ADD_KUDU_TEST(txn_participant-itest)
 ADD_KUDU_TEST(txn_status_table-itest)
 ADD_KUDU_TEST(txn_status_manager-itest)
+ADD_KUDU_TEST(txn_write_ops-itest)
 ADD_KUDU_TEST(location_assignment-itest
   DATA_FILES ../scripts/assign-location.py)
 ADD_KUDU_TEST(ts_authz-itest NUM_SHARDS 2)
diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index 499454f..86646a5 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -80,6 +80,7 @@ DEFINE_int32(keyspace_size, 5,  "number of distinct primary keys to test with");
 DEFINE_int32(max_open_txns, 5,  "maximum number of open transactions to test with");
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(scanner_allow_snapshot_scans_with_logical_timestamps);
+DECLARE_bool(tserver_txn_write_op_handling_enabled);
 DECLARE_bool(use_hybrid_clock);
 
 using boost::optional;
@@ -314,6 +315,9 @@ class FuzzTest : public KuduTest {
     FLAGS_enable_maintenance_manager = false;
     FLAGS_use_hybrid_clock = false;
     FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps = true;
+    // The scenarios of this test do not assume using the standard control path
+    // for txn-enabled write operations.
+    FLAGS_tserver_txn_write_op_handling_enabled = false;
   }
 
   void CreateTabletAndStartClusterWithSchema(const Schema& schema) {
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc b/src/kudu/integration-tests/txn_commit-itest.cc
index 17f2078..4f2ea2b 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -50,7 +50,6 @@
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/txn_coordinator.h"
-#include "kudu/tablet/txn_participant-test-util.h"
 #include "kudu/tablet/txn_participant.h"
 #include "kudu/transactions/transactions.pb.h"
 #include "kudu/transactions/txn_system_client.h"
@@ -160,8 +159,10 @@ class TxnCommitITest : public KuduTest {
     table_name_ = w.table_name();
     initial_row_count_ = w.rows_inserted();
 
-    // TODO(awong): until we start registering participants automatically, we
-    // need to manually register them, so keep track of what tablets exist.
+    // Since the test table uses the hash partitioning scheme, every tablet gets
+    // at least one write operation when inserting several rows into the test
+    // table. So, for every transaction inserting several rows into the test
+    // table, it's easy to build the list of transaction participants.
     unordered_set<string> tablet_ids;
     for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
       auto* ts = cluster_->mini_tablet_server(i);
@@ -176,36 +177,14 @@ class TxnCommitITest : public KuduTest {
     }
   }
 
-  // TODO(awong): register participants automatically as a part of writing to
-  // tablets for the first time.
-  Status RegisterParticipants(const TxnId& txn_id, vector<string> tablet_ids) {
-    for (const auto& prt_id : tablet_ids) {
-      RETURN_NOT_OK(txn_client_->RegisterParticipant(txn_id.value(), prt_id, client_user_));
-      RETURN_NOT_OK(txn_client_->ParticipateInTransaction(
-          prt_id,
-          tablet::MakeParticipantOp(txn_id.value(), tserver::ParticipantOpPB::BEGIN_TXN),
-          kTimeout));
-    }
-    return Status::OK();
-  }
-
   // Start a transaction, manually registering the given participants, and
   // returning the associated transaction and session handles.
-  Status BeginTransaction(const vector<string>& participant_ids,
-                          shared_ptr<KuduTransaction>* txn,
+  Status BeginTransaction(shared_ptr<KuduTransaction>* txn,
                           shared_ptr<KuduSession>* session) {
     shared_ptr<KuduTransaction> txn_local;
     RETURN_NOT_OK(client_->NewTransaction(&txn_local));
     shared_ptr<KuduSession> txn_session_local;
     RETURN_NOT_OK(txn_local->CreateSession(&txn_session_local));
-
-    string txn_token;
-    RETURN_NOT_OK(txn_local->Serialize(&txn_token));
-    TxnTokenPB token;
-    CHECK(token.ParseFromString(txn_token));
-    CHECK(token.has_txn_id());
-
-    RETURN_NOT_OK(RegisterParticipants(token.txn_id(), participant_ids_));
     *txn = std::move(txn_local);
     *session = std::move(txn_session_local);
     return Status::OK();
@@ -280,17 +259,15 @@ class TxnCommitITest : public KuduTest {
   string table_name_;
   int initial_row_count_;
 
-  // TODO(awong): Only needed until we start registering participants
-  // automatically.
+  // Needed for checking on internals of txn participants.
   string tsm_id_;
   vector<string> participant_ids_;
-  int cur_txn_id_ = 0;
 };
 
 TEST_F(TxnCommitITest, TestBasicCommits) {
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
 
   // Even though we've inserted, we shouldn't be able to see any new rows until
@@ -315,7 +292,7 @@ TEST_F(TxnCommitITest, TestBasicCommits) {
 TEST_F(TxnCommitITest, TestBasicAborts) {
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
 
   int num_rows = 0;
@@ -351,7 +328,7 @@ TEST_F(TxnCommitITest, TestAbortInProgress) {
   FLAGS_txn_schedule_background_tasks = false;
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction({}, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
   ASSERT_OK(txn->Rollback());
 
@@ -384,7 +361,7 @@ TEST_F(TxnCommitITest, TestBackgroundAborts) {
   {
     shared_ptr<KuduTransaction> txn;
     shared_ptr<KuduSession> txn_session;
-    ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+    ASSERT_OK(BeginTransaction(&txn, &txn_session));
     ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
 
     int num_rows = 0;
@@ -420,7 +397,7 @@ TEST_F(TxnCommitITest, TestCommitWhileDeletingTxnStatusManager) {
   SKIP_IF_SLOW_NOT_ALLOWED();
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
 
   ASSERT_OK(txn->Commit(/*wait*/false));
@@ -436,7 +413,7 @@ TEST_F(TxnCommitITest, TestCommitWhileDeletingTxnStatusManager) {
 TEST_F(TxnCommitITest, TestCommitAfterDeletingParticipant) {
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
   ASSERT_OK(client_->DeleteTable(table_name_));
   ASSERT_OK(txn->Commit(/*wait*/false));
@@ -455,7 +432,7 @@ TEST_F(TxnCommitITest, TestCommitAfterDeletingParticipant) {
 TEST_F(TxnCommitITest, TestCommitAfterDroppingRangeParticipant) {
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
   ASSERT_OK(client_->DeleteTable(table_name_));
   const auto& schema = client::KuduSchema::FromSchema(GetSimpleTestSchema());
@@ -479,7 +456,7 @@ TEST_F(TxnCommitITest, TestCommitAfterDroppingRangeParticipant) {
 TEST_F(TxnCommitITest, TestRestartingWhileCommitting) {
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
   FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000;
   ASSERT_OK(txn->Commit(/*wait*/false));
   // Stop the tserver without allowing the finalize commit to complete.
@@ -526,18 +503,10 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) {
     SleepFor(MonoDelta::FromMilliseconds(50));
   }
   w.StopAndJoin();
-  unordered_set<string> participant_ids;
-  auto* mts = cluster_->mini_tablet_server(0);
-  for (const auto& tablet_id : mts->ListTablets()) {
-    if (tablet_id != tsm_id_) {
-      participant_ids.emplace(tablet_id);
-    }
-  }
-  vector<string> both_tables_participant_ids(participant_ids.begin(), participant_ids.end());
 
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(both_tables_participant_ids, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
   FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000;
   ASSERT_OK(txn->Commit(/*wait*/false));
@@ -546,6 +515,7 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) {
   SleepFor(MonoDelta::FromMilliseconds(1000));
 
   // Shut down without giving time for the commit to complete.
+  auto* mts = cluster_->mini_tablet_server(0);
   mts->Shutdown();
   ASSERT_OK(mts->Restart());
   ASSERT_OK(mts->server()->tablet_manager()->WaitForAllBootstrapsToFinish());
@@ -587,20 +557,20 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) {
 TEST_F(TxnCommitITest, TestLoadTxnStatusManagerWhenNoMasters) {
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
 
   cluster_->mini_master()->Shutdown();
   cluster_->mini_tablet_server(0)->Shutdown();
   ASSERT_OK(cluster_->mini_tablet_server(0)->Restart());
 
   // While the master is down, we can't contact the TxnManager.
-  Status s = BeginTransaction(participant_ids_, &txn, &txn_session);
+  Status s = BeginTransaction(&txn, &txn_session);
   ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
 
   // Once restarted, it should be business as usual.
   ASSERT_OK(cluster_->mini_master()->Restart());
   ASSERT_EVENTUALLY([&] {
-    ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+    ASSERT_OK(BeginTransaction(&txn, &txn_session));
   });
   scoped_refptr<tablet::TabletReplica> tsm_replica;
   auto* tablet_manager = cluster_->mini_tablet_server(0)->server()->tablet_manager();
@@ -618,7 +588,7 @@ TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) {
   SKIP_IF_SLOW_NOT_ALLOWED();
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
 
   // Send an ABORT_TXN op to the participant.
@@ -645,7 +615,7 @@ TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
   int row_start = initial_row_count_;
   for (int i = 0; i < kNumTxns; i++) {
     shared_ptr<KuduSession> txn_session;
-    ASSERT_OK(BeginTransaction(participant_ids_, &txns[i], &txn_session));
+    ASSERT_OK(BeginTransaction(&txns[i], &txn_session));
     ASSERT_OK(InsertToSession(txn_session, row_start, kNumRowsPerTxn));
     row_start += kNumRowsPerTxn;
   }
@@ -684,7 +654,7 @@ TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
 TEST_F(TxnCommitITest, TestConcurrentRepeatedCommitCalls) {
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
   int num_rows = 0;
   ASSERT_OK(CountRows(&num_rows));
@@ -721,7 +691,7 @@ TEST_F(TxnCommitITest, TestDontAbortIfCommitInProgress) {
   {
     shared_ptr<KuduTransaction> txn;
     shared_ptr<KuduSession> txn_session;
-    ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+    ASSERT_OK(BeginTransaction(&txn, &txn_session));
     ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
     ASSERT_OK(txn->Commit(/*wait*/false));
     ASSERT_OK(txn->Serialize(&serialized_txn));
@@ -788,7 +758,7 @@ TEST_F(TwoNodeTxnCommitITest, TestCommitWhenParticipantsAreDown) {
   SKIP_IF_SLOW_NOT_ALLOWED();
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
   prt_ts_->Shutdown();
   ASSERT_OK(txn->Commit(/*wait*/false));
@@ -822,13 +792,13 @@ TEST_F(TwoNodeTxnCommitITest, TestStartTasksDuringStartup) {
   shared_ptr<KuduTransaction> committed_txn;
   {
     shared_ptr<KuduSession> txn_session;
-    ASSERT_OK(BeginTransaction(participant_ids_, &committed_txn, &txn_session));
+    ASSERT_OK(BeginTransaction(&committed_txn, &txn_session));
     ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
   }
   shared_ptr<KuduTransaction> aborted_txn;
   {
     shared_ptr<KuduSession> txn_session;
-    ASSERT_OK(BeginTransaction(participant_ids_, &aborted_txn, &txn_session));
+    ASSERT_OK(BeginTransaction(&aborted_txn, &txn_session));
     ASSERT_OK(InsertToSession(txn_session, initial_row_count_ + kNumRowsPerTxn, kNumRowsPerTxn));
   }
 
@@ -879,7 +849,7 @@ TEST_F(TwoNodeTxnCommitITest, TestStartTasksDuringStartup) {
 TEST_F(TwoNodeTxnCommitITest, TestCommitWhileShuttingDownTxnStatusManager) {
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction({}, &txn, &txn_session));
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
 
   ASSERT_OK(txn->Commit(/*wait*/false));
   tsm_ts_->Shutdown();
@@ -922,12 +892,12 @@ TEST_F(ThreeNodeTxnCommitITest, TestCommitTasksReloadOnLeadershipChange) {
   shared_ptr<KuduTransaction> aborted_txn;
   {
     shared_ptr<KuduSession> txn_session;
-    ASSERT_OK(BeginTransaction(participant_ids_, &committed_txn, &txn_session));
+    ASSERT_OK(BeginTransaction(&committed_txn, &txn_session));
     ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
   }
   {
     shared_ptr<KuduSession> txn_session;
-    ASSERT_OK(BeginTransaction(participant_ids_, &aborted_txn, &txn_session));
+    ASSERT_OK(BeginTransaction(&aborted_txn, &txn_session));
     ASSERT_OK(InsertToSession(txn_session, initial_row_count_ + kNumRowsPerTxn, kNumRowsPerTxn));
   }
   ASSERT_OK(committed_txn->Commit(/*wait*/ false));
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
new file mode 100644
index 0000000..0dcf553
--- /dev/null
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -0,0 +1,1614 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <deque>
+#include <functional>
+#include <iterator>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <set>
+#include <string>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/client.pb.h"
+#include "kudu/client/scan_batch.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/row_operations.h"
+#include "kudu/common/schema.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_metadata.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/transactions/transactions.pb.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using kudu::KuduPartialRow;
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduError;
+using kudu::client::KuduInsert;
+using kudu::client::KuduScanBatch;
+using kudu::client::KuduScanner;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using kudu::client::KuduTransaction;
+using kudu::client::KuduUpdate;
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
+using kudu::cluster::TabletIdAndTableName;
+using kudu::rpc::ErrorStatusPB;
+using kudu::rpc::RpcController;
+using kudu::tablet::TabletReplica;
+using kudu::transactions::TxnTokenPB;
+using kudu::tserver::WriteRequestPB;
+using kudu::tserver::WriteResponsePB;
+using std::atomic;
+using std::deque;
+using std::map;
+using std::set;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+DECLARE_bool(tserver_txn_write_op_handling_enabled);
+DECLARE_bool(txn_manager_enabled);
+DECLARE_bool(txn_manager_lazily_initialized);
+DECLARE_int32(txn_participant_begin_op_inject_latency_ms);
+DECLARE_int32(txn_participant_registration_inject_latency_ms);
+DECLARE_uint32(tablet_max_pending_txn_write_ops);
+DECLARE_uint32(txn_manager_status_table_num_replicas);
+
+namespace kudu {
+
+namespace {
+
+Status BuildSchema(KuduSchema* schema) {
+  KuduSchemaBuilder b;
+  b.AddColumn("key")->Type(KuduColumnSchema::INT64)->NotNull()->PrimaryKey();
+  b.AddColumn("int_val")->Type(KuduColumnSchema::INT32);
+  return b.Build(schema);
+}
+
+unique_ptr<KuduInsert> BuildInsert(KuduTable* table, int64_t key) {
+  unique_ptr<KuduInsert> op(table->NewInsert());
+  KuduPartialRow* row = op->mutable_row();
+  CHECK_OK(row->SetInt64(0, key));
+  return op;
+}
+
+int64_t GetTxnId(const shared_ptr<KuduTransaction>& txn) {
+  string txn_token;
+  CHECK_OK(txn->Serialize(&txn_token));
+  TxnTokenPB token;
+  CHECK(token.ParseFromString(txn_token));
+  CHECK(token.has_txn_id());
+  return token.txn_id();
+}
+
+Status CountRows(KuduTable* table, size_t* num_rows) {
+  KuduScanner scanner(table);
+  RETURN_NOT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
+  RETURN_NOT_OK(scanner.Open());
+  size_t count = 0;
+  while (scanner.HasMoreRows()) {
+    KuduScanBatch batch;
+    RETURN_NOT_OK(scanner.NextBatch(&batch));
+    count += batch.NumRows();
+  }
+  *num_rows = count;
+  return Status::OK();
+}
+
+Status GetSingleRowError(KuduSession* session) {
+  vector<KuduError*> errors;
+  ElementDeleter drop(&errors);
+  bool overflowed;
+  session->GetPendingErrors(&errors, &overflowed);
+  CHECK(!overflowed);
+  CHECK_EQ(1, errors.size());
+  return errors.front()->status();
+}
+
+void InsertRows(KuduTable* table, KuduSession* session,
+                int64_t count, int64_t start_key = 0) {
+  for (int64_t key = start_key; key < start_key + count; ++key) {
+    unique_ptr<KuduInsert> insert(BuildInsert(table, key));
+    ASSERT_OK(session->Apply(insert.release()));
+  }
+}
+
+} // anonymous namespace
+
+class TxnWriteOpsITest : public ExternalMiniClusterITestBase {
+ protected:
+  TxnWriteOpsITest()
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
+      : hb_interval_ms_(64),
+        run_time_seconds_(3)
+#else
+      : hb_interval_ms_(16),
+        run_time_seconds_(AllowSlowTests() ? 60 : 3)
+#endif
+  {
+    CHECK_OK(BuildSchema(&schema_));
+  }
+
+  Status CreateTable() {
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    RETURN_NOT_OK(table_creator->table_name(kTableName)
+                  .schema(&schema_)
+                  .add_hash_partitions({ "key" }, kNumPartitions)
+                  .num_replicas(kNumTabletServers)
+                  .Create());
+    return client_->OpenTable(kTableName, &table_);
+  }
+
+  // Create a test table and wait all replicas of its tablets running.
+  // Output the UUIDs of tablets into the 'tablet_uuids', if it's non-null.
+  void Prepare(vector<string>* tablet_uuids = nullptr) {
+    ASSERT_OK(CreateTable());
+    // In this test, replication factor is set to kNumTabletServers.
+    const size_t total_replicas_num = kNumPartitions * kNumTabletServers;
+    NO_FATALS(WaitForAllTabletsRunning(total_replicas_num, tablet_uuids));
+  }
+
+  // Assuming there is only one table in the system, this method awaits for
+  // all replicas of that test table to be up and running.
+  void WaitForAllTabletsRunning(size_t expected_total_replicas_num,
+                                vector<string>* tablet_uuids = nullptr) {
+    set<string> uuids;
+    ASSERT_EVENTUALLY([&] {
+      uuids.clear();
+      size_t total_tablet_replicas_num = 0;
+      for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+        vector<TabletIdAndTableName> tablets_info;
+        ASSERT_OK(cluster_->WaitForTabletsRunning(
+            cluster_->tablet_server(i), 0, kTimeout, &tablets_info));
+        for (auto& info : tablets_info) {
+          if (info.table_name != kTableName) {
+            // There might be txn status tablets as well: skip those.
+            continue;
+          }
+          ++total_tablet_replicas_num;
+          EmplaceIfNotPresent(&uuids, info.tablet_id);
+        }
+      }
+      ASSERT_EQ(expected_total_replicas_num, total_tablet_replicas_num);
+      ASSERT_EQ(kNumPartitions, uuids.size());
+    });
+    if (tablet_uuids) {
+      tablet_uuids->reserve(uuids.size());
+      std::move(uuids.begin(), uuids.end(), std::back_inserter(*tablet_uuids));
+    }
+  }
+
+ protected:
+  static constexpr auto kNumRowsPerTxn = 8;
+  static constexpr auto kNumTabletServers = 3;
+  static constexpr auto kNumPartitions = 2;
+  static constexpr const char* const kTableName = "txn_write_ops_test";
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  const int hb_interval_ms_;
+  const int64_t run_time_seconds_;
+  KuduSchema schema_;
+  shared_ptr<KuduTable> table_;
+  string tablet_uuid_;
+};
+
+// Send multiple one-row write operations to a tablet server in the context of a
+// multi-row transaction, and commit the transaction. This scenario verifies
+// that tablet servers are able to accept high number of write requests
+// from a client while automatically registering corresponding tablets as
+// transaction participants.
+TEST_F(TxnWriteOpsITest, TxnMultipleSingleRowWritesCommit) {
+  static constexpr int kRowsNum = 1000;
+  const vector<string> master_flags = {
+    // Enable TxnManager in Kudu masters.
+    // TODO(aserbin): remove this customization once the flag is 'on' by default
+    "--txn_manager_enabled=true",
+
+    // Scenarios based on this test fixture assume the txn status table
+    // is created at start, not on first transaction-related operation.
+    "--txn_manager_lazily_initialized=false",
+  };
+  NO_FATALS(StartCluster({}, master_flags, kNumTabletServers));
+
+  vector<string> tablets_uuids;
+  NO_FATALS(Prepare(&tablets_uuids));
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+  shared_ptr<KuduSession> session;
+  ASSERT_OK(txn->CreateSession(&session));
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+  NO_FATALS(InsertRows(table_.get(), session.get(), kRowsNum));
+  ASSERT_OK(txn->Commit());
+  size_t count;
+  ASSERT_OK(CountRows(table_.get(), &count));
+  ASSERT_EQ(kRowsNum, count);
+  ASSERT_EQ(0, session->CountPendingErrors());
+}
+
+// This scenario induces high rate of leader elections while starting many
+// multi-row transactions, writing few rows per transaction. The essence of this
+// scenario is to make sure that tablet servers are able to automatically
+// register corresponding tablets as transaction participants even if leadership
+// transfer happens when a tablet tries to push BEGIN_TXN operation as a part
+// of preparing to apply incoming write request from a client.
+TEST_F(TxnWriteOpsITest, FrequentElections) {
+  static constexpr auto kNumThreads = 8;
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  const vector<string> ts_flags = {
+    // Disabling pre-elections to make manual election request to take effect.
+    "--raft_enable_pre_election=false",
+
+    // Custom settings for heartbeat interval helps to complete Raft elections
+    // rounds faster than with the default settings.
+    Substitute("--heartbeat_interval_ms=$0", hb_interval_ms_),
+  };
+  const vector<string> master_flags = {
+    // Enable TxnManager in Kudu masters.
+    // TODO(aserbin): remove this customization once the flag is 'on' by default
+    "--txn_manager_enabled=true",
+
+    // Scenarios based on this test fixture assume the txn status table
+    // is created at start, not on first transaction-related operation.
+    "--txn_manager_lazily_initialized=false",
+  };
+  NO_FATALS(StartCluster(ts_flags, master_flags, kNumTabletServers));
+
+  vector<string> tablets_uuids;
+  NO_FATALS(Prepare(&tablets_uuids));
+
+  // Using deque instead of vector to avoid too many reallocations.
+  deque<shared_ptr<KuduTransaction>> transactions;
+  simple_spinlock transactions_lock;
+
+  atomic<bool> done = false;
+  atomic<size_t> row_count = 0;
+  vector<thread> writers;
+  writers.reserve(kNumThreads);
+  for (auto thread_idx = 0; thread_idx < kNumThreads; ++thread_idx) {
+    writers.emplace_back([&, thread_idx] {
+      for (int64_t iter = 0; !done; ++iter) {
+        if (done) {
+          break;
+        }
+        shared_ptr<KuduTransaction> txn;
+        CHECK_OK(client_->NewTransaction(&txn));
+        shared_ptr<KuduSession> session;
+        CHECK_OK(txn->CreateSession(&session));
+        CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+        int64_t start_key = kNumRowsPerTxn * (kNumThreads * iter + thread_idx);
+        for (auto i = 0; i < kNumRowsPerTxn; ++i) {
+          unique_ptr<KuduInsert> op(BuildInsert(table_.get(), start_key + i));
+          CHECK_OK(session->Apply(op.release()));
+        }
+        if (iter % 8 == 0) {
+          CHECK_OK(txn->Commit(false/*wait*/));
+          row_count += kNumRowsPerTxn;
+        } else {
+          CHECK_OK(txn->Rollback());
+        }
+        {
+          std::lock_guard<simple_spinlock> guard(transactions_lock);
+          transactions.emplace_back(std::move(txn));
+        }
+      }
+    });
+  }
+  auto cleanup = MakeScopedCleanup([&]() {
+    done = true;
+    std::for_each(writers.begin(), writers.end(), [](thread& t) { t.join(); });
+  });
+
+  // The main thread induces election by issuing step-down requests.
+  const auto run_until =
+      MonoTime::Now() + MonoDelta::FromSeconds(run_time_seconds_);
+  double max_sleep_ms = 1;
+  while (!done) {
+    for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+      LOG(INFO) << "attempting to promote replicas at tserver " << i;
+      consensus::ConsensusServiceProxy proxy(
+          cluster_->messenger(),
+          cluster_->tablet_server(i)->bound_rpc_addr(),
+          "tserver");
+      for (const auto& uuid : tablets_uuids) {
+        consensus::RunLeaderElectionRequestPB req;
+        consensus::RunLeaderElectionResponsePB resp;
+        RpcController rpc;
+        req.set_tablet_id(uuid);
+        req.set_dest_uuid(cluster_->tablet_server(i)->uuid());
+        rpc.set_timeout(MonoDelta::FromSeconds(5));
+        // A best effort call: the replica might already be a leader or electing
+        // a new leader might fail, etc.
+        proxy.RunLeaderElection(req, &resp, &rpc);
+      }
+      int sleep_time = rand() % static_cast<int>(max_sleep_ms);
+      if (MonoTime::Now() > run_until) {
+        done = true;
+        break;
+      }
+      SleepFor(MonoDelta::FromMilliseconds(sleep_time));
+      max_sleep_ms = std::min(max_sleep_ms * 1.1, 1000.0);
+    }
+  }
+  std::for_each(writers.begin(), writers.end(), [](thread& t) { t.join(); });
+  cleanup.cancel();
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+  for (auto& txn : transactions) {
+    ASSERT_EVENTUALLY([&txn] {
+      bool is_complete = false;
+      Status completion_status;
+      ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+      ASSERT_TRUE(is_complete);
+    });
+  }
+
+  // Check for the number of inserted rows: all write operations successfully
+  // passed through many TxnOpDispatcher instances should be persisted.
+  size_t count;
+  ASSERT_OK(CountRows(table_.get(), &count));
+  ASSERT_EQ(row_count, count);
+}
+
+// Send a write operation to a tablet server in the context of non-existent
+// transaction. The server should respond back with appropriate error status.
+TEST_F(TxnWriteOpsITest, WriteOpForNonExistentTxn) {
+  const vector<string> master_flags = {
+    // Enable TxnManager in Kudu masters.
+    // TODO(aserbin): remove this customization once the flag is 'on' by default
+    "--txn_manager_enabled=true",
+  };
+  NO_FATALS(StartCluster({}, master_flags, kNumTabletServers));
+  NO_FATALS(Prepare());
+
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+
+  string txn_token;
+  ASSERT_OK(txn->Serialize(&txn_token));
+  TxnTokenPB token;
+  ASSERT_TRUE(token.ParseFromString(txn_token));
+  ASSERT_TRUE(token.has_txn_id());
+
+  const auto fake_txn_id = token.txn_id() + 100;
+  token.set_txn_id(fake_txn_id);
+  string fake_token;
+  ASSERT_TRUE(token.SerializeToString(&fake_token));
+
+  shared_ptr<KuduTransaction> fake_txn;
+  ASSERT_OK(KuduTransaction::Deserialize(client_, fake_token, &fake_txn));
+  shared_ptr<KuduSession> session;
+  ASSERT_OK(fake_txn->CreateSession(&session));
+
+  {
+    ASSERT_FALSE(session->HasPendingOperations());
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+    unique_ptr<KuduInsert> insert(table_->NewInsert());
+    ASSERT_OK(insert->mutable_row()->SetInt64("key", 12345));
+    ASSERT_OK(insert->mutable_row()->SetInt32("int_val", 67890));
+
+    const auto s = session->Apply(insert.release());
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+    const auto err_status = GetSingleRowError(session.get());
+    ASSERT_TRUE(err_status.IsNotFound()) << err_status.ToString();
+    ASSERT_STR_CONTAINS(err_status.ToString(),
+                        "Failed to write batch of 1 ops to tablet");
+    ASSERT_STR_CONTAINS(err_status.ToString(),
+                        Substitute("transaction ID $0 not found", fake_txn_id));
+  }
+}
+
+// Try to write an extra row in the context of a transaction which has already
+// been committed.
+//
+// TODO(aserbin): due to conversion of Status::IllegalState() into
+//                RetriableRpcStatus::REPLICA_NOT_LEADER result code,
+//                these sub-scenarios fail with Status::TimedOut() because
+//                they retry in vain until RPC timeout elapses
+TEST_F(TxnWriteOpsITest, DISABLED_TxnWriteAfterCommit) {
+  int idx = 0;
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    shared_ptr<KuduSession> session;
+    ASSERT_OK(txn->CreateSession(&session));
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+    {
+      unique_ptr<KuduInsert> insert(BuildInsert(table_.get(), idx++));
+      ASSERT_OK(session->Apply(insert.release()));
+    }
+    ASSERT_OK(txn->Commit());
+
+    {
+      unique_ptr<KuduInsert> insert(BuildInsert(table_.get(), idx++));
+      auto s = session->Apply(insert.release());
+      ASSERT_TRUE(s.IsIOError()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+      const auto err_status = GetSingleRowError(session.get());
+      ASSERT_TRUE(err_status.IsInvalidArgument()) << err_status.ToString();
+      ASSERT_STR_CONTAINS(err_status.ToString(),
+                          "Failed to write batch of 1 ops to tablet");
+      ASSERT_STR_MATCHES(err_status.ToString(), "txn .* is not open");
+    }
+  }
+  // A scenario similar to one above, but restart tablet servers before an
+  // attempt to write an extra row for the transaction which has already been
+  // committed.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    shared_ptr<KuduSession> session;
+    ASSERT_OK(txn->CreateSession(&session));
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+    {
+      unique_ptr<KuduInsert> insert(BuildInsert(table_.get(), idx++));
+      ASSERT_OK(session->Apply(insert.release()));
+    }
+    ASSERT_OK(txn->Commit());
+
+    // Restart all tablet servers. This is to clear run-time information
+    // in tablet servers which is used to serve write operations in the context
+    // of a multi-row transaction.
+    for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+      auto* ts = cluster_->tablet_server(i);
+      ts->Shutdown();
+      ASSERT_OK(ts->Restart());
+    }
+
+    {
+      unique_ptr<KuduInsert> insert(BuildInsert(table_.get(), idx++));
+      auto s = session->Apply(insert.release());
+      ASSERT_TRUE(s.IsIOError()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+      const auto err_status = GetSingleRowError(session.get());
+      ASSERT_TRUE(err_status.IsNotFound()) << err_status.ToString();
+      ASSERT_STR_CONTAINS(err_status.ToString(),
+                          "Failed to write batch of 1 ops to tablet");
+      ASSERT_STR_MATCHES(err_status.ToString(), "txn .* is not open");
+    }
+  }
+}
+
+// Test to peek into TxnOpDispatcher's internals.
+class TxnOpDispatcherITest : public KuduTest {
+ public:
+  TxnOpDispatcherITest() {
+    CHECK_OK(BuildSchema(&schema_));
+  }
+
+  void Prepare(int num_tservers) {
+    FLAGS_txn_manager_enabled = true;
+    FLAGS_txn_manager_lazily_initialized = false;
+    FLAGS_txn_manager_status_table_num_replicas = num_tservers;
+
+    InternalMiniClusterOptions opts;
+    opts.num_tablet_servers = num_tservers;
+    cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
+    ASSERT_OK(cluster_->StartSync());
+
+    KuduClientBuilder builder;
+    builder.default_admin_operation_timeout(kTimeout);
+    ASSERT_OK(cluster_->CreateClient(&builder, &client_));
+    ASSERT_OK(CreateTable(num_tservers));
+    for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+      auto* ts = cluster_->mini_tablet_server(i);
+      ASSERT_OK(ts->WaitStarted());
+    }
+  }
+
+  Status CreateTable(int num_replicas) {
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    RETURN_NOT_OK(table_creator->table_name(kTableName)
+                  .schema(&schema_)
+                  .add_hash_partitions({ "key" }, kNumPartitions)
+                  .num_replicas(num_replicas)
+                  .Create());
+    return client_->OpenTable(kTableName, &table_);
+  }
+
+  // Insert rows in a context of the specified transaction; if the 'txn' is
+  // nullptr, use non-transactional session for inserts. The result session
+  // is output into the 'session_out' parameter if it's set to non-null.
+  Status InsertRows(KuduTransaction* txn,
+                    int num_rows,
+                    int64_t* key,
+                    shared_ptr<KuduSession>* session_out = nullptr) {
+    shared_ptr<KuduSession> session;
+    if (txn) {
+      RETURN_NOT_OK(txn->CreateSession(&session));
+    } else {
+      session = client_->NewSession();
+    }
+    if (session_out) {
+      *session_out = session;
+    }
+    RETURN_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+    for (auto i = 0; i < num_rows; ++i) {
+      unique_ptr<KuduInsert> ins = BuildInsert(table_.get(), (*key)++);
+      RETURN_NOT_OK(session->Apply(ins.release()));
+    }
+    return Status::OK();
+  }
+
+  // Get all replicas of the test table.
+  vector<scoped_refptr<TabletReplica>> GetAllReplicas() const {
+    vector<scoped_refptr<TabletReplica>> result;
+    for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+      auto* server = cluster_->mini_tablet_server(i)->server();
+      vector<scoped_refptr<TabletReplica>> replicas;
+      server->tablet_manager()->GetTabletReplicas(&replicas);
+      for (auto& r : replicas) {
+        if (r->tablet()->metadata()->table_name() == kTableName) {
+          result.emplace_back(std::move(r));
+        }
+      }
+    }
+    return result;
+  }
+
+  size_t GetTxnOpDispatchersTotalCount(
+      vector<scoped_refptr<TabletReplica>> replicas = {}) {
+    if (replicas.empty()) {
+      // No replicas were specified, get the list of all test table's replicas.
+      replicas = GetAllReplicas();
+    }
+    size_t elem_count = 0;
+    for (auto& r : replicas) {
+      std::lock_guard<simple_spinlock> guard(r->txn_op_dispatchers_lock_);
+      elem_count += r->txn_op_dispatchers_.size();
+    }
+    return elem_count;
+  }
+
+  std::shared_ptr<typename TabletReplica::TxnOpDispatcher>
+      GetSingleTxnOpDispatcher() {
+    auto replicas = GetAllReplicas();
+    std::shared_ptr<typename TabletReplica::TxnOpDispatcher> d;
+    size_t count = 0;
+    for (auto& r : replicas) {
+      std::lock_guard<simple_spinlock> guard(r->txn_op_dispatchers_lock_);
+      auto& dispatchers = r->txn_op_dispatchers_;
+      if (!dispatchers.empty()) {
+        d = dispatchers.begin()->second;
+        ++count;
+      }
+    }
+    CHECK_EQ(1, count);
+    return CHECK_NOTNULL(d);
+  }
+
+  typedef vector<std::shared_ptr<typename TabletReplica::TxnOpDispatcher>>
+      OpDispatchers;
+  typedef map<int64_t, OpDispatchers> OpDispatchersPerTxnId;
+  OpDispatchersPerTxnId GetTxnOpDispatchers() {
+    auto replicas = GetAllReplicas();
+    OpDispatchersPerTxnId result;
+    for (auto& r : replicas) {
+      std::lock_guard<simple_spinlock> guard(r->txn_op_dispatchers_lock_);
+      auto& dispatchers = r->txn_op_dispatchers_;
+      for (auto& [txn_id, d] : dispatchers) {
+        auto& dispatchers = LookupOrEmplace(&result, txn_id, OpDispatchers());
+        dispatchers.emplace_back(d);
+      }
+    }
+    return result;
+  }
+
+ protected:
+  static constexpr const char* const kTableName = "txn_op_dispatcher_test";
+  static constexpr const int kNumPartitions = 2;
+  static const MonoDelta kTimeout;
+
+  KuduSchema schema_;
+  unique_ptr<InternalMiniCluster> cluster_;
+  shared_ptr<KuduClient> client_;
+  shared_ptr<KuduTable> table_;
+};
+const MonoDelta TxnOpDispatcherITest::kTimeout = MonoDelta::FromSeconds(10);
+
+// A scenario to verify basic pre- and post-conditions of the TxnOpDispatcher's
+// lifecycle.
+TEST_F(TxnOpDispatcherITest, LifecycleBasic) {
+  NO_FATALS(Prepare(1));
+
+  // Next value for the primary key column in the test table.
+  int64_t key = 0;
+
+  vector<scoped_refptr<TabletReplica>> replicas = GetAllReplicas();
+  ASSERT_EQ(kNumPartitions, replicas.size());
+
+  // At first, there should be no TxnOpDispatchers across all tablet replicas.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+  // Start and commit an empty transaction.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+    ASSERT_OK(txn->Commit());
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+  }
+
+  // Start and rollback an empty transaction.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+    ASSERT_OK(txn->Rollback());
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+  }
+
+  // Start a single transaction and commit it after inserting a few rows.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+
+    // There should be no TxnOpDispatchers yet because not a single write
+    // operations has been sent to tablet servers yet.
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+    // Insert a single row.
+    ASSERT_OK(InsertRows(txn.get(), 1, &key));
+
+    // Only one tablet replica should get the txn write request and register
+    // TxnOpDispatcher for the transaction.
+    ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+
+    // Write some more rows ensuring all hash buckets of the table's partition
+    // will get at least one element.
+    ASSERT_OK(InsertRows(txn.get(), 5, &key));
+
+    // Now all tablet replicas should get one TxnOpDispatcher.
+    for (auto& r : replicas) {
+      ASSERT_EQ(1, r->txn_op_dispatchers_.size());
+    }
+
+    const auto ref_txn_id = GetTxnId(txn);
+
+    // Since all write operations inserts were successfully processed, all
+    // TxnOpDispatchers should not be buffering any write operations.
+    for (auto& r : replicas) {
+      for (const auto& [txn_id, dispatcher] : r->txn_op_dispatchers_) {
+        ASSERT_EQ(ref_txn_id, txn_id);
+        {
+          std::lock_guard<simple_spinlock> guard(dispatcher->lock_);
+          ASSERT_TRUE(dispatcher->preliminary_tasks_completed_);
+          ASSERT_TRUE(dispatcher->ops_queue_.empty());
+          ASSERT_FALSE(dispatcher->unregistered_);
+          ASSERT_OK(dispatcher->inflight_status_);
+        }
+      }
+    }
+
+    // Now, commit the transaction.
+    ASSERT_OK(txn->Commit());
+
+    // All dispatchers should be unregistered once the transaction is committed.
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+  }
+
+  // Start a single transaction and roll it back after inserting a few rows.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    ASSERT_OK(InsertRows(txn.get(), 8, &key));
+    ASSERT_EQ(kNumPartitions, GetTxnOpDispatchersTotalCount());
+    ASSERT_OK(txn->Rollback());
+
+    // Since KuduTransaction::Rollback() just schedules the transaction abort,
+    // wait for the rollback to finalize.
+    ASSERT_EVENTUALLY([&] {
+      Status status;
+      bool complete = false;
+      ASSERT_OK(txn->IsCommitComplete(&complete, &status));
+      ASSERT_TRUE(complete);
+      ASSERT_TRUE(status.IsAborted()) << status.ToString();
+    });
+    // No dispatchers should be registered once the transaction is rolled back.
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+  }
+}
+
+// A scenario to verify TxnOpDispatcher lifecycle when there is an error
+// while trying to register a tablet as a participant in a transaction.
+TEST_F(TxnOpDispatcherITest, ErrorInParticipantRegistration) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  NO_FATALS(Prepare(1));
+
+  // It's a clean slate: no TxnOpDispatchers should be around yet.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+  // Next value for the primary key column in the test table.
+  int64_t key = 0;
+
+  // This sub-scenario tries to submit a write operation as a part
+  // of a nonexistent transaction.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+
+    string txn_token_str;
+    ASSERT_OK(txn->Serialize(&txn_token_str));
+    TxnTokenPB txn_token;
+    ASSERT_TRUE(txn_token.ParseFromString(txn_token_str));
+    ASSERT_TRUE(txn_token.has_txn_id());
+    const int64_t txn_id = txn_token.txn_id();
+    const int64_t fake_txn_id = txn_id + 10;
+    txn_token.set_txn_id(fake_txn_id);
+
+    string fake_txn_token_str;
+    ASSERT_TRUE(txn_token.SerializeToString(&fake_txn_token_str));
+
+    shared_ptr<KuduTransaction> fake_txn;
+    ASSERT_OK(KuduTransaction::Deserialize(client_, fake_txn_token_str, &fake_txn));
+
+    shared_ptr<KuduSession> session;
+    auto s = InsertRows(fake_txn.get(), 1, &key, &session);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    auto row_status = GetSingleRowError(session.get());
+    ASSERT_TRUE(row_status.IsNotFound()) << row_status.ToString();
+    ASSERT_STR_CONTAINS(row_status.ToString(),
+                        "transaction ID 10 not found, current highest txn ID");
+
+    // There should be no TxnOpDispatchers: they should be cleaned up when
+    // getting an error upon registering a tablet as a participant of a
+    // non-existent transaction.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+    });
+
+    // Make sure nothing unexpected happens when committing the original empty
+    // transaction.
+    ASSERT_OK(txn->Commit());
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+  }
+
+  // This sub-scenario tries to submit a write operation as a part
+  // of already committed transaction.
+  {
+    // Here a custom client with shorter timeout is used to avoid making
+    // too many pointless retries upon receiving Status::IllegalState()
+    // from the tablet server.
+    //
+    // TODO(aserbin): stop using the custom client with shorter timeout as soon
+    //                as the issue in client/batcher.cc with the transformation
+    //                of both Status::IllegalState() and Status::Aborted() into
+    //                RetriableRpcStatus::REPLICA_NOT_LEADER result if fixed
+    const MonoDelta kCustomTimeout = MonoDelta::FromSeconds(2);
+    KuduClientBuilder builder;
+    builder.default_admin_operation_timeout(kCustomTimeout);
+    builder.default_rpc_timeout(kCustomTimeout);
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(cluster_->CreateClient(&builder, &client));
+    shared_ptr<KuduTransaction> txn_orig_handle;
+    ASSERT_OK(client->NewTransaction(&txn_orig_handle));
+
+    string txn_token;
+    ASSERT_OK(txn_orig_handle->Serialize(&txn_token));
+
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &txn));
+    ASSERT_OK(txn_orig_handle->Commit());
+
+    int64_t key = 0;
+    shared_ptr<KuduSession> session;
+    auto s = InsertRows(txn.get(), 1, &key, &session);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    auto row_status = GetSingleRowError(session.get());
+    ASSERT_TRUE(row_status.IsTimedOut()) << row_status.ToString();
+    ASSERT_STR_CONTAINS(row_status.ToString(),
+                        "Failed to write batch of 1 ops to tablet");
+
+    // There should be no TxnOpDispatchers: they should be cleaned up upon getting
+    // Status::IllegalState() error when registering a tablet as a participant
+    // of a no-longer-open transaction.
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+  }
+}
+
+TEST_F(TxnOpDispatcherITest, ErrorInProcessingWriteOp) {
+  NO_FATALS(Prepare(1));
+
+  // It's a clean slate: no TxnOpDispatchers should be around yet.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+  // Next value for the primary key column in the test table.
+  int64_t key = 0;
+
+  // Try to submit an update (not an insert): it's not yet possible to have
+  // an update as a part of a multi-row transaction in Kudu.
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+
+  shared_ptr<KuduSession> session;
+  ASSERT_OK(txn->CreateSession(&session));
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+  unique_ptr<KuduUpdate> op(table_->NewUpdate());
+  KuduPartialRow* row = op->mutable_row();
+  ASSERT_OK(row->SetInt64(0, key));
+  ASSERT_OK(row->SetInt32(1, 1));
+  auto s = session->Apply(op.release());
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  auto row_status = GetSingleRowError(session.get());
+  ASSERT_TRUE(row_status.IsNotSupported()) << row_status.ToString();
+  ASSERT_STR_CONTAINS(row_status.ToString(), "transactions may only insert");
+
+  // The corresponding TxnOpDispatcher instance should be still registered. It
+  // will be gone after committing or rolling back the transaction (see below).
+  ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+
+  // Find and examine the TxnOpDispatcher instance.
+  auto dispatcher = GetSingleTxnOpDispatcher();
+  ASSERT_EVENTUALLY([&] {
+    std::lock_guard<simple_spinlock> guard(dispatcher->lock_);
+    // Due to scheduling anomalies, the operation above might be responded,
+    // but TxnOpDispatcher::preliminary_tasks_completed_ field is not updated
+    // yet. So, using ASSERT_EVENTUALLY here.
+    ASSERT_TRUE(dispatcher->preliminary_tasks_completed_);
+    ASSERT_FALSE(dispatcher->unregistered_);
+    ASSERT_OK(dispatcher->inflight_status_);
+    ASSERT_TRUE(dispatcher->ops_queue_.empty());
+  });
+
+  // It should be still possible to successfully insert rows in the context of
+  // current transaction.
+  static constexpr const size_t kNumRows = 8;
+  ASSERT_OK(InsertRows(txn.get(), kNumRows, &key));
+
+  // Every tablet should get at least one write operation, so there should be
+  // total of two TxnOpDispachers.
+  ASSERT_EQ(2, GetTxnOpDispatchersTotalCount());
+
+  // Try to insert rows with duplicate keys.
+  int64_t duplicate_key = 0;
+  s = InsertRows(txn.get(), kNumRows, &duplicate_key);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+
+  // Same TxnOpDispatchers should be handling all the write operations.
+  ASSERT_EQ(2, GetTxnOpDispatchersTotalCount());
+
+  const auto dispatchers_per_txn_id = GetTxnOpDispatchers();
+  ASSERT_EQ(1, dispatchers_per_txn_id.size());
+  const OpDispatchers& dispatchers = dispatchers_per_txn_id.begin()->second;
+  ASSERT_EQ(2, dispatchers.size());
+
+  for (auto& d : dispatchers) {
+    std::lock_guard<simple_spinlock> guard(d->lock_);
+    ASSERT_TRUE(d->preliminary_tasks_completed_);
+    ASSERT_FALSE(d->unregistered_);
+    ASSERT_OK(d->inflight_status_);
+    ASSERT_TRUE(d->ops_queue_.empty());
+  }
+
+  // Now commit the transaction.
+  ASSERT_OK(txn->Commit());
+  // Upon committing, the TxnOpDispatcher should be automatically unregistered.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+  // Make sure all rows which were successfully written through TxnOpDispatcher
+  // are persisted.
+  size_t row_count;
+  ASSERT_OK(CountRows(table_.get(), &row_count));
+  ASSERT_EQ(kNumRows, row_count);
+
+  // TODO(aserbin): stop the replica and try to submit a write operations
+  // Check for the number of inserted rows.
+}
+
+// Make sure TxnOpDispatcher's logic works as expected when the maximum number
+// of buffered/pending write operations is set to 0. In this case, all
+// transactional write requests from a client are responded with
+// Status::ServiceUnavailable() until all preliminary work of registering
+// the tablet as a participant and issuing in TXN_BEGIN operation is complete.
+TEST_F(TxnOpDispatcherITest, NoPendingWriteOps) {
+  FLAGS_tablet_max_pending_txn_write_ops = 0;
+  NO_FATALS(Prepare(1));
+
+  // It's a clean slate: no TxnOpDispatchers should be around yet.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+  const auto txn_id = GetTxnId(txn);
+  string authn_creds;
+  ASSERT_OK(client_->ExportAuthenticationCredentials(&authn_creds));
+  client::AuthenticationCredentialsPB pb;
+  ASSERT_TRUE(pb.ParseFromString(authn_creds));
+  ASSERT_TRUE(pb.has_real_user());
+  string client_user = pb.real_user();
+
+  auto proxy = cluster_->tserver_proxy(0);
+  ASSERT_NE(nullptr, proxy.get());
+
+  const auto replicas = GetAllReplicas();
+  ASSERT_EQ(kNumPartitions, replicas.size());
+  for (auto i = 0; i < kNumPartitions; ++i) {
+    const auto& tablet_id = replicas[i]->tablet_id();
+    const auto schema = KuduSchema::ToSchema(schema_);
+    KuduPartialRow row(&schema);
+    ASSERT_OK(row.SetInt64("key", 0));
+    ASSERT_OK(row.SetInt32("int_val", 1));
+
+    // This isn't a well formed write request, but it shouldn't be submitted
+    // into the tablet server's prepare queue anyway since
+    // FLAGS_tablet_max_pending_txn_write_ops is set to 0 and a not-yet-ready
+    // TxnOpDispatcher should immediately respond back with ServiceUnavailable.
+    WriteRequestPB req;
+    req.set_tablet_id(tablet_id);
+    req.set_txn_id(txn_id);
+
+    RowOperationsPBEncoder enc(req.mutable_row_operations());
+    enc.Add(RowOperationsPB::INSERT, row);
+    RpcController ctl;
+    ctl.set_timeout(kTimeout);
+
+    WriteResponsePB resp;
+    auto s = proxy->Write(req, &resp, &ctl);
+    ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
+    ASSERT_FALSE(resp.has_error());
+    const auto* err_status_pb = ctl.error_response();
+    ASSERT_NE(nullptr, err_status_pb);
+    ASSERT_TRUE(err_status_pb->has_code());
+    ASSERT_EQ(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, err_status_pb->code());
+    ASSERT_TRUE(err_status_pb->has_message());
+    ASSERT_STR_CONTAINS(err_status_pb->message(),
+                        "pending operations queue is at capacity");
+  }
+
+  const auto dmap = GetTxnOpDispatchers();
+  ASSERT_EQ(1, dmap.size());
+  auto& [dmap_txn_id, dispatchers] = *(dmap.begin());
+  ASSERT_EQ(txn_id, dmap_txn_id);
+  for (auto& d : dispatchers) {
+    ASSERT_EVENTUALLY([d] {
+      // Eventually, every involved tablet should be registered as a transaction
+      // participant and BEGIN_TXN should be issued.
+      std::lock_guard<simple_spinlock> guard(d->lock_);
+      ASSERT_TRUE(d->ops_queue_.empty());
+      ASSERT_FALSE(d->unregistered_);
+      ASSERT_OK(d->inflight_status_);
+      ASSERT_TRUE(d->preliminary_tasks_completed_);
+    });
+  }
+
+  // Now, insert several rows into the table.
+  int64_t key = 0;
+  constexpr const auto kNumRows = 8;
+  ASSERT_OK(InsertRows(txn.get(), kNumRows, &key));
+
+  // Now commit the transaction and make sure the rows are persisted.
+  ASSERT_OK(txn->Commit());
+  size_t num_rows = 0;
+  ASSERT_OK(CountRows(table_.get(), &num_rows));
+  ASSERT_EQ(kNumRows, num_rows);
+
+  // No dispatchers should be there after the transaction is committed.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+}
+
+// Make sure TxnOpDispatchers are not engaged when a tablet server processes
+// non-transactional write requests.
+TEST_F(TxnOpDispatcherITest, NonTransactionalWrites) {
+  NO_FATALS(Prepare(1));
+
+  // It's a clean slate: no TxnOpDispatchers should be around.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+  int64_t key = 0;
+  ASSERT_OK(InsertRows(nullptr, 2, &key));
+
+  // No dispatchers should be around: those were non-transactional write ops.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+}
+
+// Make sure TxnOpDispatcher responds back with Status::TimedOut() status for
+// all pending operations if it takes too long to perform the preliminary tasks
+// of registering a tablet as a participant and issuing BEGIN_TXN operation
+// for target tablet replica.
+TEST_F(TxnOpDispatcherITest, PreliminaryTasksTimeout) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  constexpr auto kNumThreads = 8;
+  constexpr auto kNumRowsPerThread = 1;
+  const auto kShortTimeout = MonoDelta::FromMilliseconds(1000);
+  const auto kInjectedDelayMs = 2 * kShortTimeout.ToMilliseconds();
+
+  // This should be enough to accommodate all write operations below, even with
+  // some margin since there are two tablets in a hash-partitioned test table.
+  FLAGS_tablet_max_pending_txn_write_ops = kNumThreads * kNumRowsPerThread;
+
+  NO_FATALS(Prepare(1));
+
+  for (auto iteration = 0; iteration < 2; ++iteration) {
+    FLAGS_txn_participant_registration_inject_latency_ms =
+        iteration == 0 ? kInjectedDelayMs : 0;
+    FLAGS_txn_participant_begin_op_inject_latency_ms =
+        iteration == 1 ? kInjectedDelayMs : 0;
+
+    shared_ptr<KuduTransaction> txn_orig_client;
+    ASSERT_OK(client_->NewTransaction(&txn_orig_client));
+
+    // Create an instance of client with shorter timeouts to work with
+    // transactional sessions.
+    shared_ptr<KuduClient> c;
+    {
+      KuduClientBuilder builder;
+      builder.default_admin_operation_timeout(kShortTimeout);
+      builder.default_rpc_timeout(kShortTimeout);
+      ASSERT_OK(cluster_->CreateClient(&builder, &c));
+    }
+    ASSERT_NE(nullptr, c.get());
+
+    // To switch to the txn operations bound to the client with short timeout
+    // for operations, serialize/deserialize the txn handle.
+    string txn_token;
+    ASSERT_OK(txn_orig_client->Serialize(&txn_token));
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(KuduTransaction::Deserialize(c, txn_token, &txn));
+
+    // Now, make writer threads issue single-row write operations using the
+    // handle bound to the client with shorter RPC timeout.
+    Barrier barrier(kNumThreads + 1);
+    vector<thread> writers;
+    writers.reserve(kNumThreads);
+    vector<shared_ptr<KuduSession>> sessions(kNumThreads);
+    vector<Status> statuses(kNumThreads);
+    for (auto thread_idx = 0; thread_idx < kNumThreads; ++thread_idx) {
+      writers.emplace_back([&, thread_idx] {
+        shared_ptr<KuduSession> session;
+        CHECK_OK(txn->CreateSession(&session));
+        CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+        int64_t key = thread_idx;
+        unique_ptr<KuduInsert> ins = BuildInsert(table_.get(), key);
+        barrier.Wait();
+        auto s = session->Apply(ins.release());
+        sessions[thread_idx] = std::move(session);
+        statuses[thread_idx] = std::move(s);
+      });
+    }
+    // Signal writer threads to send their operations.
+    barrier.Wait();
+
+    // Wait for all write operations to be responded.
+    std::for_each(writers.begin(), writers.end(), [](thread& t) { t.join(); });
+
+    // Check for the statuses reported by each of the writer threads.
+    for (auto i = 0; i < kNumThreads; ++i) {
+      SCOPED_TRACE(Substitute("writer thread idx $0", i));
+
+      const auto& s = statuses[i];
+      ASSERT_TRUE(s.IsIOError()) << s.ToString();
+
+      auto& session = sessions[i];
+      const auto row_status = GetSingleRowError(session.get());
+      ASSERT_TRUE(row_status.IsTimedOut()) << row_status.ToString();
+      ASSERT_STR_CONTAINS(row_status.ToString(),
+                          "Failed to write batch of 1 ops to tablet");
+    }
+
+    // Eventually, all TxnOpDispatchers should be gone: after about
+    // FLAGS_txn_participant_registration_inject_latency_ms milliseconds
+    // corresponding callbacks should be called and TxnOpDispatchers should be
+    // unregistered.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+    });
+
+    // Just in case, try to commit the transaction to make sure the operations
+    // didn't get through. There is a couple of expected outcomes, depending
+    // whether the tablet has or hasn't been registered as a participant in the
+    // transaction.
+    const auto s = txn->Commit();
+    if (iteration == 0) {
+      // This is the case when not a single tablet was registered as participant
+      // in the transaction. In this case, the should be able to commit
+      // with no issues.
+      ASSERT_OK(s);
+    } else {
+      // This is the case when tablets have been registered as participants.
+      // In this case, the transaction should not be able to finalize.
+      //
+      // TODO(aserbin): this should result in IllegalState() after addressing
+      //                the issue with convertion of IllegalState() into
+      //                retriable error status in Kudu client
+      ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+    }
+
+    // No rows should be persisted.
+    size_t row_count;
+    ASSERT_OK(CountRows(table_.get(), &row_count));
+    ASSERT_EQ(0, row_count);
+  }
+}
+
+// This scenario makes sure that TxnOpDispatcher's logic behaves as expected
+// when txn participant registration takes too long for first few write
+// operations which time out. However, after the 'unfreeze', future operations
+// sent in the context of the same transaction should be successful.
+TEST_F(TxnOpDispatcherITest, DuplicateTxnParticipantRegistration) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  const auto kShortTimeout = MonoDelta::FromMilliseconds(1000);
+  FLAGS_txn_participant_begin_op_inject_latency_ms =
+      2 * kShortTimeout.ToMilliseconds();
+
+  NO_FATALS(Prepare(1));
+
+  // Create a custom instance of client with shorter timeouts to work with
+  // transactional sessions, and create a txn handle bound to the custom client.
+  shared_ptr<KuduTransaction> txn_orig_client;
+  ASSERT_OK(client_->NewTransaction(&txn_orig_client));
+  shared_ptr<KuduClient> c;
+  {
+    KuduClientBuilder builder;
+    builder.default_admin_operation_timeout(kShortTimeout);
+    builder.default_rpc_timeout(kShortTimeout);
+    ASSERT_OK(cluster_->CreateClient(&builder, &c));
+  }
+  string txn_token;
+  ASSERT_OK(txn_orig_client->Serialize(&txn_token));
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(KuduTransaction::Deserialize(c, txn_token, &txn));
+
+  shared_ptr<KuduSession> session;
+  int64_t key = 0;
+  auto s = InsertRows(txn.get(), 1, &key, &session);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  auto row_status = GetSingleRowError(session.get());
+  ASSERT_TRUE(row_status.IsTimedOut()) << row_status.ToString();
+  ASSERT_STR_CONTAINS(row_status.ToString(),
+                      "Failed to write batch of 1 ops to tablet");
+
+  // Now remove the injected latency.
+  FLAGS_txn_participant_begin_op_inject_latency_ms = 0;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+  });
+
+  constexpr auto kRowsNum = 8;
+  ASSERT_OK(InsertRows(txn.get(), kRowsNum, &key));
+
+  {
+    const auto dmap = GetTxnOpDispatchers();
+    ASSERT_EQ(1, dmap.size());
+    auto& [dmap_txn_id, dispatchers] = *(dmap.begin());
+    ASSERT_EQ(GetTxnId(txn), dmap_txn_id);
+    for (auto& d : dispatchers) {
+      std::lock_guard<simple_spinlock> guard(d->lock_);
+      ASSERT_TRUE(d->ops_queue_.empty());
+      ASSERT_FALSE(d->unregistered_);
+      ASSERT_OK(d->inflight_status_);
+      ASSERT_TRUE(d->preliminary_tasks_completed_);
+    }
+  }
+
+  // Commit the transaction and verify the row count.
+  ASSERT_OK(txn_orig_client->Commit());
+  size_t row_count;
+  ASSERT_OK(CountRows(table_.get(), &row_count));
+  ASSERT_EQ(kRowsNum, row_count);
+
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+}
+
+// This scenario exercises the case when a request to commit a transaction
+// arrives while TxnOpDispatcher still has pending write requests in its queue.
+// Neither the registration of the txn participant is complete nor BEGIN_TXN is
+// sent when client issues the commit request.
+TEST_F(TxnOpDispatcherITest, CommitWithWriteOpPendingParticipantNotYetRegistered) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  constexpr auto kDelayMs = 1000;
+  FLAGS_txn_participant_registration_inject_latency_ms = 2 * kDelayMs;
+
+  NO_FATALS(Prepare(1));
+
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+
+  Status commit_init_status;
+  thread committer([&txn, &commit_init_status]{
+    SleepFor(MonoDelta::FromMilliseconds(kDelayMs));
+    // Initiate committing the transaction after the delay, but don't wait
+    // for the commit to finalize.
+    commit_init_status = txn->Commit(false/*wait*/);
+  });
+  auto cleanup = MakeScopedCleanup([&]() {
+    committer.join();
+  });
+
+  shared_ptr<KuduSession> session;
+  int64_t key = 0;
+  const auto s = InsertRows(txn.get(), 1, &key, &session);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  const auto row_status = GetSingleRowError(session.get());
+  // TODO(aserbin): due to conversion of Status::IllegalState() into
+  //                RetriableRpcStatus::REPLICA_NOT_LEADER result code,
+  //                the write RPC times out after many retries instead of
+  //                bailing out right away. Update the expected error code after
+  //                the issue is fixed.
+  //ASSERT_TRUE(row_status.IsIllegalState());
+  ASSERT_TRUE(row_status.IsTimedOut()) << s.ToString();
+
+  committer.join();
+  cleanup.cancel();
+  ASSERT_OK(commit_init_status);
+
+  bool is_complete = false;
+  Status completion_status;
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_TRUE(is_complete);
+  ASSERT_OK(completion_status);
+
+  size_t num_rows;
+  ASSERT_OK(CountRows(table_.get(), &num_rows));
+  ASSERT_EQ(0, num_rows);
+
+  // Since the write operation has been responded with an error, a proper
+  // clean up should be run and there should be no TxnOpDispatcher registered
+  // for the transaction.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+}
+
+// This scenario exercises the case when a request to commit a transaction
+// arrives while TxnOpDispatcher still has pending write requests in its queue
+// but it has already completed the registration of the txn participant.
+// BEGIN_TXN hasn't yet been sent for the participant tablet when the commit
+// request is issued by the client.
+//
+// TODO(aserbin): enable the scenario after the follow-up for
+//                https://gerrit.cloudera.org/#/c/17127/ is merged
+TEST_F(TxnOpDispatcherITest, DISABLED_CommitWithWriteOpPendingParticipantRegistered) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  constexpr auto kDelayMs = 1000;
+  FLAGS_txn_participant_begin_op_inject_latency_ms = 2 * kDelayMs;
+
+  NO_FATALS(Prepare(1));
+
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+
+  Status commit_init_status;
+  thread committer([&txn, &commit_init_status]{
+    SleepFor(MonoDelta::FromMilliseconds(kDelayMs));
+    // Initiate committing the transaction after the delay, but don't wait
+    // for the commit to finalize.
+    commit_init_status = txn->Commit(false/*wait*/);
+  });
+  auto cleanup = MakeScopedCleanup([&]() {
+    committer.join();
+  });
+
+  shared_ptr<KuduSession> session;
+  int64_t key = 0;
+  ASSERT_OK(InsertRows(txn.get(), 1, &key, &session));
+
+  committer.join();
+  cleanup.cancel();
+  ASSERT_OK(commit_init_status);
+
+  bool is_complete = false;
+  Status completion_status;
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_TRUE(is_complete);
+  ASSERT_OK(completion_status);
+
+  size_t num_rows = 0;
+  ASSERT_OK(CountRows(table_.get(), &num_rows));
+  ASSERT_EQ(1, num_rows);
+
+  // Since the commit has been successfully finalized, there should be no
+  // TxnOpDispatcher for the transaction.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+}
+
+TEST_F(TxnOpDispatcherITest, TxnWriteWhileReplicaDeleted) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  constexpr auto kDelayMs = 1000;
+  FLAGS_txn_participant_begin_op_inject_latency_ms = 2 * kDelayMs;
+  constexpr auto kMaxPendingOps = 8;
+  FLAGS_tablet_max_pending_txn_write_ops = kMaxPendingOps;
+
+  NO_FATALS(Prepare(1));
+  auto replicas = GetAllReplicas();
+  ASSERT_FALSE(replicas.empty());
+  const auto tablet_id = replicas.front()->tablet_id();
+
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+
+  Status tablet_delete_status;
+  thread tablet_deleter([&]{
+    SleepFor(MonoDelta::FromMilliseconds(kDelayMs));
+    tablet_delete_status = cluster_->mini_tablet_server(0)->server()->
+        tablet_manager()->DeleteTablet(tablet_id,
+                                       tablet::TABLET_DATA_TOMBSTONED,
+                                       boost::none);
+  });
+  auto cleanup = MakeScopedCleanup([&]() {
+    tablet_deleter.join();
+  });
+
+  shared_ptr<KuduSession> session;
+  int64_t key = 0;
+  // Send multiple rows (up to the capacity of the queue in TxnOpDispatcher),
+  // so at least one row is sent to the deleted replica.
+  auto s = InsertRows(txn.get(), kMaxPendingOps, &key, &session);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  auto op_status = GetSingleRowError(session.get());
+  ASSERT_TRUE(op_status.IsTimedOut()) << op_status.ToString();
+  ASSERT_STR_CONTAINS(op_status.ToString(), "STOPPED");
+
+  // The leader replica of other tablet might still have its TxnOpDispatcher
+  // registered.
+  ASSERT_GE(1, GetTxnOpDispatchersTotalCount(replicas));
+
+  cleanup.cancel();
+  tablet_deleter.join();
+  ASSERT_OK(tablet_delete_status);
+}
+
+// This is similar to TxnWriteWhileReplicaDeleted, but this more synthetic
+// scenario with multiple tablet servers is to verify a couple of edge cases:
+//   * TxnOpDispatchers are properly unregistered when there is an error while
+//     sumbitting the accumulated write operations from the queue
+//   * it's possible to rollback such a transaction with write operations
+//     failed due to non-running tablet replicas
+TEST_F(TxnOpDispatcherITest, TxnWriteWhenReplicaIsShutdown) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  constexpr auto kDelayMs = 1000;
+  FLAGS_txn_participant_begin_op_inject_latency_ms = 2 * kDelayMs;
+
+  constexpr auto kMaxPendingOps = 16;
+  FLAGS_tablet_max_pending_txn_write_ops = kMaxPendingOps;
+
+  constexpr auto kTServers = 3;
+  NO_FATALS(Prepare(kTServers));
+  auto replicas = GetAllReplicas();
+  ASSERT_FALSE(replicas.empty());
+
+  // No dispatches should be registered at this point.
+  ASSERT_GE(0, GetTxnOpDispatchersTotalCount(replicas));
+
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+
+  atomic<size_t> txn_dispatchers_count = 0;
+  vector<thread> stoppers;
+  stoppers.reserve(replicas.size());
+  for (auto r : replicas) {
+    stoppers.emplace_back([r, &txn_dispatchers_count, this]{
+      SleepFor(MonoDelta::FromMilliseconds(kDelayMs));
+      txn_dispatchers_count += GetTxnOpDispatchersTotalCount({ r });
+      r->Shutdown();
+    });
+  }
+  auto cleanup = MakeScopedCleanup([&]() {
+    std::for_each(stoppers.begin(), stoppers.end(), [](thread& t) { t.join(); });
+  });
+
+  // Send multiple rows up to the capacity of the queue in TxnOpDispatcher.
+  int64_t key = 0;
+  shared_ptr<KuduSession> session;
+  auto s = InsertRows(txn.get(), kMaxPendingOps, &key, &session);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  auto op_status = GetSingleRowError(session.get());
+  ASSERT_TRUE(op_status.IsTimedOut()) << op_status.ToString();
+  ASSERT_STR_CONTAINS(op_status.ToString(), "SHUTDOWN");
+
+  // Make sure there was at least one dispatcher spotted while accumulating
+  // write operations. All write requests were sent to leader replicas of two
+  // tablets, and there might be a change in a replica's leadership.
+  ASSERT_GT(txn_dispatchers_count, 0);
+  // No TxnOpDispatchers should be left at leader replicas after write
+  // operations failed.
+  ASSERT_LE(GetTxnOpDispatchersTotalCount(replicas),
+            kNumPartitions * (kTServers - 1));
+
+  // It should be possible to rollback the transaction.
+  ASSERT_OK(txn->Rollback());
+
+  // There should be no TxnOpDispatchers registered after rolling back
+  // the transaction.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount(replicas));
+}
+
+// Verify the functionality of the --tserver_txn_write_op_handling_enabled flag
+// (the flag is for testing purposes only, though).
+TEST_F(TxnOpDispatcherITest, TxnWriteOpHandlingEnabledFlag) {
+  FLAGS_tserver_txn_write_op_handling_enabled = false;
+  NO_FATALS(Prepare(1));
+
+  // It's a clean slate: no TxnOpDispatchers should be around.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+  int64_t key = 0;
+  shared_ptr<KuduSession> session;
+  auto s = InsertRows(txn.get(), 2, &key, &session);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  auto row_status = GetSingleRowError(session.get());
+  ASSERT_TRUE(row_status.IsNotFound());
+  ASSERT_STR_CONTAINS(row_status.ToString(),
+                      Substitute("txn $0 not found on tablet", GetTxnId(txn)));
+
+  // No dispatchers should be around: --tserver_txn_write_op_handling_enabled
+  // is set to false.
+  ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+  // It should be possible to commit an empty transaction after that.
+  ASSERT_OK(txn->Commit());
+  size_t num_rows = 0;
+  ASSERT_OK(CountRows(table_.get(), &num_rows));
+  ASSERT_EQ(0, num_rows);
+}
+
+// This scenario verifies that tablet servers are able to accept transactional
+// write requests upon restarting while automatically registering corresponding
+// tablets as transaction participants.
+//
+// TODO(aserbin): clarify why sometimes both the first and the second
+//                sub-scenarios fail with timeout error even if the timeout
+//                set to ample 300 seconds
+//
+// TODO(aserbin): clarify why sometimes the scond sub-scenario below fails with
+//                an error like below:
+//
+//  src/kudu/integration-tests/txn_write_ops-itest.cc:1210: Failure
+//  Expected equality of these values:
+//    16
+//    row_count
+//      Which is: 13
+//
+TEST_F(TxnOpDispatcherITest, DISABLED_TxnMultipleSingleRowsWithServerRestart) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  NO_FATALS(Prepare(3));
+
+  // The test scenario below might require multiple retries from the client if
+  // running on a slow or overloaded machine, so the timeout for RPC operations
+  // is set higher than the default setting to avoid false positives.
+  shared_ptr<KuduClient> c;
+  {
+    KuduClientBuilder builder;
+    builder.default_admin_operation_timeout(MonoDelta::FromSeconds(300));
+    builder.default_rpc_timeout(MonoDelta::FromSeconds(300));
+    ASSERT_OK(cluster_->CreateClient(&builder, &c));
+  }
+
+  int64_t key = 0;
+  // Restart all tablet servers between every row written, waiting for each
+  // tablet server to be up and running before trying to write the next row.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(c->NewTransaction(&txn));
+    shared_ptr<KuduSession> session;
+    ASSERT_OK(txn->CreateSession(&session));
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(c->OpenTable(kTableName, &table));
+
+    for (auto row_idx = 0; row_idx < 8; ++row_idx) {
+      unique_ptr<KuduInsert> ins = BuildInsert(table.get(), key++);
+      ASSERT_OK(session->Apply(ins.release()));
+      for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+        auto* ts = cluster_->mini_tablet_server(i);
+        ts->Shutdown();
+        ASSERT_OK(ts->Restart());
+        ASSERT_OK(ts->WaitStarted());
+      }
+    }
+
+    ASSERT_OK(txn->Commit());
+    size_t row_count = 0;
+    ASSERT_OK(CountRows(table_.get(), &row_count));
+    ASSERT_EQ(8, row_count);
+  }
+
+  // Restart one tablet server in a round-robin fashion with every row written,
+  // not waiting for the tablet server to be up and running before trying
+  // to write the next row.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(c->NewTransaction(&txn));
+    shared_ptr<KuduSession> session;
+    ASSERT_OK(txn->CreateSession(&session));
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(c->OpenTable(kTableName, &table));
+
+    const auto num_servers = cluster_->num_tablet_servers();
+    for (auto row_idx = 8; row_idx < 16; ++row_idx) {
+      unique_ptr<KuduInsert> ins = BuildInsert(table.get(), key++);
+      ASSERT_OK(session->Apply(ins.release()));
+      auto* ts = cluster_->mini_tablet_server(row_idx % num_servers);
+      ts->Shutdown();
+      ASSERT_OK(ts->Restart());
+    }
+
+    ASSERT_OK(txn->Commit());
+    size_t row_count = 0;
+    ASSERT_OK(CountRows(table_.get(), &row_count));
+    ASSERT_EQ(16, row_count);
+  }
+}
+
+} // namespace kudu
diff --git a/src/kudu/tablet/ops/participant_op.cc b/src/kudu/tablet/ops/participant_op.cc
index 4764b41..9d79a8f 100644
--- a/src/kudu/tablet/ops/participant_op.cc
+++ b/src/kudu/tablet/ops/participant_op.cc
@@ -156,13 +156,37 @@ Status ParticipantOp::Prepare() {
   state_->AcquireTxnAndLock();
   RETURN_NOT_OK(state_->ValidateOp());
 
+  const auto& op = state_->request()->op();
+  auto* replica = state_->tablet_replica();
+
   // Before we assign a timestamp, bump the clock so further ops get assigned
   // higher timestamps (including this one).
-  if (state_->request()->op().type() == ParticipantOpPB::FINALIZE_COMMIT &&
-      type() == consensus::LEADER) {
-    DCHECK(!state_->consensus_round()->replicate_msg()->has_timestamp());
-    RETURN_NOT_OK(state_->tablet_replica()->time_manager()->UpdateClockAndLastAssignedTimestamp(
-        state_->commit_timestamp()));
+  switch (op.type()) {
+    case ParticipantOpPB::BEGIN_COMMIT:
+      // To avoid inconsistencies, TxnOpDispatcher should not contain any
+      // pending write requests at this point. Those pending requests must be
+      // submitted and replied accordingly before BEGIN_COMMIT can be processed.
+      // Even if UnregisterTxnOpDispatcher() returns non-OK, the TxnOpDispatcher
+      // is marked for removal, so no write requests are accepted by the replica
+      // in the context of the specified transaction after a call to
+      // TabletReplica::UnregisterTxnOpDispatcher().
+      RETURN_NOT_OK(replica->UnregisterTxnOpDispatcher(
+          op.txn_id(), false/*abort_pending_ops*/));
+      break;
+    case ParticipantOpPB::FINALIZE_COMMIT:
+      if (type() == consensus::LEADER) {
+        DCHECK(!state_->consensus_round()->replicate_msg()->has_timestamp());
+        RETURN_NOT_OK(state_->tablet_replica()->time_manager()->
+            UpdateClockAndLastAssignedTimestamp(state_->commit_timestamp()));
+      }
+      break;
+    case ParticipantOpPB::ABORT_TXN:
+      RETURN_NOT_OK(replica->UnregisterTxnOpDispatcher(
+          op.txn_id(), true/*abort_pending_ops*/));
+      break;
+    default:
+      // Nothing to do in all other cases.
+      break;
   }
   TRACE("PREPARE: Finished.");
   return Status::OK();
@@ -183,7 +207,7 @@ Status ParticipantOp::Start() {
 
 Status ParticipantOpState::PerformOp(const consensus::OpId& op_id, Tablet* tablet) {
   const auto& op = request()->op();
-  const auto& op_type = request()->op().type();
+  const auto& op_type = op.type();
   Status s;
   switch (op_type) {
     // NOTE: these can currently never fail because we are only updating
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index d1086a5..cd61d16 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -18,16 +18,18 @@
 #include "kudu/tablet/tablet_replica.h"
 
 #include <algorithm>
+#include <deque>
 #include <functional>
 #include <memory>
 #include <mutex>
 #include <ostream>
 #include <string>
 #include <type_traits>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
-#include <gflags/gflags_declare.h>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/common/common.pb.h"
@@ -45,10 +47,12 @@
 #include "kudu/consensus/time_manager.h"
 #include "kudu/fs/data_dirs.h"
 #include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/result_tracker.h"
+#include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/ops/alter_schema_op.h"
 #include "kudu/tablet/ops/op_driver.h"
@@ -57,16 +61,28 @@
 #include "kudu/tablet/tablet.pb.h"
 #include "kudu/tablet/tablet_replica_mm_ops.h"
 #include "kudu/tablet/txn_coordinator.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/maintenance_manager.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status_callback.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
 
+DEFINE_uint32(tablet_max_pending_txn_write_ops, 2,
+              "Maximum number of write operations to be pending at leader "
+              "tablet replica per transaction prior to registering the tablet "
+              "as a participant in the corresponding transaction");
+TAG_FLAG(tablet_max_pending_txn_write_ops, experimental);
+TAG_FLAG(tablet_max_pending_txn_write_ops, runtime);
+
 METRIC_DEFINE_histogram(tablet, op_prepare_queue_length, "Operation Prepare Queue Length",
                         kudu::MetricUnit::kTasks,
                         "Number of operations waiting to be prepared within this tablet. "
@@ -129,6 +145,10 @@ using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::Messenger;
 using kudu::rpc::ResultTracker;
+using kudu::tserver::ParticipantOpPB;
+using kudu::tserver::ParticipantRequestPB;
+using kudu::tserver::TabletServerErrorPB;
+using std::deque;
 using std::map;
 using std::shared_ptr;
 using std::string;
@@ -150,7 +170,7 @@ TabletReplica::TabletReplica(
     : meta_(DCHECK_NOTNULL(std::move(meta))),
       cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))),
       local_peer_pb_(std::move(local_peer_pb)),
-      log_anchor_registry_(new LogAnchorRegistry()),
+      log_anchor_registry_(new LogAnchorRegistry),
       apply_pool_(apply_pool),
       reload_txn_status_tablet_pool_(reload_txn_status_tablet_pool),
       txn_coordinator_(meta_->table_type() &&
@@ -192,14 +212,15 @@ Status TabletReplica::Init(ServerContext server_ctx) {
   return Status::OK();
 }
 
-Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
-                            shared_ptr<Tablet> tablet,
-                            clock::Clock* clock,
-                            shared_ptr<Messenger> messenger,
-                            scoped_refptr<ResultTracker> result_tracker,
-                            scoped_refptr<Log> log,
-                            ThreadPool* prepare_pool,
-                            DnsResolver* resolver) {
+Status TabletReplica::Start(
+    const ConsensusBootstrapInfo& bootstrap_info,
+    shared_ptr<Tablet> tablet,
+    clock::Clock* clock,
+    shared_ptr<Messenger> messenger,
+    scoped_refptr<ResultTracker> result_tracker,
+    scoped_refptr<Log> log,
+    ThreadPool* prepare_pool,
+    DnsResolver* resolver) {
   DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
   DCHECK(log) << "A TabletReplica must be provided with a Log";
 
@@ -515,6 +536,58 @@ Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) {
   return Status::OK();
 }
 
+Status TabletReplica::SubmitTxnWrite(
+    std::unique_ptr<WriteOpState> op_state,
+    const std::function<Status(int64_t txn_id, StatusCallback cb)>& scheduler) {
+  DCHECK(op_state);
+  DCHECK(op_state->request()->has_txn_id());
+
+  RETURN_NOT_OK(CheckRunning());
+  op_state->SetResultTracker(result_tracker_);
+
+  const int64_t txn_id = op_state->request()->txn_id();
+  shared_ptr<TxnOpDispatcher> dispatcher;
+  {
+    std::lock_guard<simple_spinlock> guard(txn_op_dispatchers_lock_);
+    dispatcher = LookupOrEmplace(
+        &txn_op_dispatchers_,
+        txn_id,
+        std::make_shared<TxnOpDispatcher>(
+            this, FLAGS_tablet_max_pending_txn_write_ops));
+  }
+  return dispatcher->Dispatch(std::move(op_state), scheduler);
+}
+
+Status TabletReplica::UnregisterTxnOpDispatcher(int64_t txn_id,
+                                                bool abort_pending_ops) {
+  Status unregister_status;
+  VLOG(3) << Substitute(
+      "received request to unregister TxnOpDispatcher (txn ID $0)", txn_id);
+  std::lock_guard<simple_spinlock> guard(txn_op_dispatchers_lock_);
+  auto it = txn_op_dispatchers_.find(txn_id);
+  // It might happen that TxnOpDispatcher isn't there, and that's completely
+  // fine. One possible scenario that might lead to such condition is:
+  //   * There was a change in replica leadership, and the former leader replica
+  //     received all the write requests in the scope of the transaction, while
+  //     this new leader replica hasn't received any since its start. Now this
+  //     replica is receiving BEGIN_COMMIT transaction coordination request
+  //     for the participant (i.e. for the tablet) from TxnStatusManager.
+  if (it != txn_op_dispatchers_.end()) {
+    auto& dispatcher = it->second;
+    unregister_status = dispatcher->MarkUnregistered();
+    if (abort_pending_ops) {
+      dispatcher->Cancel(Status::Aborted("operation has been aborted"));
+      unregister_status = Status::OK();
+    }
+    if (unregister_status.ok()) {
+      txn_op_dispatchers_.erase(it);
+    }
+    VLOG(1) << Substitute("TxnOpDispatcher (txn ID $0) unregistration: $1",
+                          txn_id, unregister_status.ToString());
+  }
+  return unregister_status;
+}
+
 Status TabletReplica::SubmitWrite(unique_ptr<WriteOpState> op_state) {
   RETURN_NOT_OK(CheckRunning());
 
@@ -1020,6 +1093,69 @@ void TabletReplica::DecreaseTxnCoordinatorTaskCounter() {
   DCHECK_GE(txn_coordinator_task_counter_, 0);
 }
 
+class ParticipantBeginTxnCallback : public OpCompletionCallback {
+ public:
+  ParticipantBeginTxnCallback(StatusCallback cb,
+                              unique_ptr<ParticipantRequestPB> req)
+      : cb_(std::move(cb)),
+        req_(std::move(req)),
+        txn_id_(req_->op().txn_id()) {
+    DCHECK(req_->has_tablet_id());
+    DCHECK(req_->has_op());
+    DCHECK(req_->op().has_txn_id());
+    DCHECK(req_->op().has_type());
+    DCHECK_EQ(ParticipantOpPB::BEGIN_TXN, req_->op().type());
+    DCHECK_LE(0, txn_id_);
+  }
+
+  void OpCompleted() override {
+    if (status_.IsIllegalState() &&
+        code_ == TabletServerErrorPB::TXN_OP_ALREADY_APPLIED) {
+      // This is the case of duplicate calls to TxnStatusManager to begin
+      // transaction for a participant tablet. Those calls may happen if a
+      // a transactional write request arrives to a tablet server which
+      // hasn't yet served a write request in the context of the specified
+      // transaction.
+      return cb_(Status::OK());
+    }
+    return cb_(status_);
+  }
+
+ private:
+  StatusCallback cb_;
+  unique_ptr<ParticipantRequestPB> req_;
+  const int64_t txn_id_;
+};
+
+void TabletReplica::BeginTxnParticipantOp(int64_t txn_id, StatusCallback cb) {
+  auto s = CheckRunning();
+  if (PREDICT_FALSE(!s.ok())) {
+    return cb(s);
+  }
+
+  unique_ptr<ParticipantRequestPB> req(new ParticipantRequestPB);
+  req->set_tablet_id(tablet_id());
+  {
+    ParticipantOpPB op_pb;
+    op_pb.set_txn_id(txn_id);
+    op_pb.set_type(ParticipantOpPB::BEGIN_TXN);
+    *req->mutable_op() = std::move(op_pb);
+  }
+  unique_ptr<ParticipantOpState> op_state(
+      new ParticipantOpState(this, tablet()->txn_participant(), req.get()));
+  op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
+      new ParticipantBeginTxnCallback(cb, std::move(req))));
+  s = SubmitTxnParticipantOp(std::move(op_state));
+  VLOG(3) << Substitute(
+      "submitting BEGIN_TXN for participant $0 (txn ID $1): $2",
+      tablet_id(), txn_id, s.ToString());
+  if (PREDICT_FALSE(!s.ok())) {
+    // Now it's time to respond with appropriate error status to the RPCs
+    // corresponding to the pending write operations.
+    return cb(s);
+  }
+}
+
 void TabletReplica::MakeUnavailable(const Status& error) {
   std::shared_ptr<Tablet> tablet;
   {
@@ -1036,6 +1172,174 @@ void TabletReplica::MakeUnavailable(const Status& error) {
   SetError(error);
 }
 
+Status TabletReplica::TxnOpDispatcher::Dispatch(
+    std::unique_ptr<WriteOpState> op,
+    const std::function<Status(int64_t txn_id, StatusCallback cb)>& scheduler) {
+  const auto txn_id = op->request()->txn_id();
+  std::lock_guard<simple_spinlock> guard(lock_);
+  if (PREDICT_FALSE(unregistered_)) {
+    KLOG_EVERY_N_SECS(WARNING, 10) << Substitute(
+        "received request for unregistered TxnOpDispatcher (txn ID $0)", txn_id);
+    // TODO(aserbin): Status::ServiceUnavailable() is more appropriate here?
+    return Status::IllegalState(
+        "tablet replica could not accept txn write operation");
+  }
+
+  if (preliminary_tasks_completed_) {
+    // All preparatory work is done: submit the write operation directly.
+    DCHECK(ops_queue_.empty());
+    return replica_->SubmitWrite(std::move(op));
+  }
+
+  DCHECK(!preliminary_tasks_completed_);
+  if (!inflight_status_.ok()) {
+    // Still in process of cleaning up from prior error conditition: a request
+    // can be retried later.
+    return Status::ServiceUnavailable(Substitute(
+        "cleaning up from failure of prior write operation: $0",
+        inflight_status_.ToString()));
+  }
+
+  // This lambda is used as a status callback by the scheduler of "preliminary"
+  // tasks. In case of success, the callback is invoked after completion
+  // of the preliminary tasks with Status::OK(). In case of any failure down
+  // the road, the callback is called with corresponding non-OK status.
+  auto cb = [self = shared_from_this(), txn_id](const Status& s) {
+    if (PREDICT_TRUE(s.ok())) {
+      // The all-is-well case: it's time to submit all the write operations
+      // accumulated in the queue.
+      auto submit_status = self->Submit();
+      if (PREDICT_FALSE(!submit_status.ok())) {
+        // If submitting the accumulated write operations fails, it's necessary
+        // to remove this TxnOpDispatcher entry from the registry.
+        WARN_NOT_OK(submit_status, "fail to submit pending txn write requests");
+        CHECK_OK(self->replica_->UnregisterTxnOpDispatcher(
+            txn_id, false/*abort_pending_ops*/));
+      }
+    } else {
+      // Something went wrong: cancel all the pending write operations
+      self->Cancel(s);
+      CHECK_OK(self->replica_->UnregisterTxnOpDispatcher(
+          txn_id, false/*abort_pending_ops*/));
+    }
+  };
+
+  // If nothing is in the queue yet after checking for the state of all other
+  // related fields above, this is the very first request received by this
+  // dispatcher and it's time to schedule the preliminary tasks.
+  if (ops_queue_.empty()) {
+    RETURN_NOT_OK(scheduler(txn_id, std::move(cb)));
+  }
+  return EnqueueUnlocked(std::move(op));
+}
+
+Status TabletReplica::TxnOpDispatcher::Submit() {
+  decltype(ops_queue_) failed_ops;
+  Status failed_status;
+  {
+    std::lock_guard<simple_spinlock> guard(lock_);
+    DCHECK(!preliminary_tasks_completed_);
+    while (!ops_queue_.empty()) {
+      auto op = std::move(ops_queue_.front());
+      ops_queue_.pop_front();
+      // Store the information necessary to recreate WriteOp instance: this is
+      // useful if TabletReplica::SubmitWrite() returns non-OK status.
+      // The pointers to the replica, request, and response are kept valid until
+      // the corresponding RPC is responded to, and the RPC response is sent
+      // upon invoking completion callback for the 'op'. Receiving non-OK status
+      // from TabletReplica::SubmitWrite() is a guarantee that the completion
+      // callback hasn't been called yet, so these pointers stay valid.
+      TabletReplica* replica = op->tablet_replica();
+      const tserver::WriteRequestPB* request = op->request();
+      const bool has_request_id = op->has_request_id();
+      rpc::RequestIdPB request_id;
+      if (has_request_id) {
+        request_id = op->request_id();
+      }
+      tserver::WriteResponsePB* response = op->response();
+      auto s = replica_->SubmitWrite(std::move(op));
+      if (PREDICT_FALSE(!s.ok())) {
+        // Put the operation back into the queue if SubmitWrite() fails: this
+        // is necessary to respond back to the corresponding RPC, as needed.
+        ops_queue_.emplace_front(new WriteOpState(
+            replica, request, has_request_id ? &request_id : nullptr, response));
+        inflight_status_ = s;
+        break;
+      }
+    }
+    if (PREDICT_TRUE(inflight_status_.ok())) {
+      DCHECK(ops_queue_.empty());
+      preliminary_tasks_completed_ = true;
+      return Status::OK();
+    }
+
+    DCHECK(!inflight_status_.ok());
+    DCHECK(!ops_queue_.empty());
+    failed_status = inflight_status_;
+    std::swap(failed_ops, ops_queue_);
+  }
+
+  return RespondWithStatus(failed_status, std::move(failed_ops));
+}
+
+void TabletReplica::TxnOpDispatcher::Cancel(const Status& status) {
+  CHECK(!status.ok());
+  LOG(WARNING) << Substitute("$0: cancelling pending write operations",
+                             status.ToString());
+  decltype(ops_queue_) ops;
+  {
+    std::lock_guard<simple_spinlock> guard(lock_);
+    inflight_status_ = status;
+    std::swap(ops, ops_queue_);
+  }
+
+  RespondWithStatus(status, std::move(ops));
+}
+
+Status TabletReplica::TxnOpDispatcher::MarkUnregistered() {
+  std::lock_guard<simple_spinlock> guard(lock_);
+  unregistered_ = true;
+
+  // If there are still pending write operations, return ServiceUnavailable()
+  // to let the caller retry later, if needed: there is a chance that pending
+  // write operations will complete when the call is retried.
+  if (PREDICT_FALSE(!ops_queue_.empty())) {
+    // It might be Status::IllegalState() instead, but ServiceUnavailable()
+    // refers to the transient nature of this state.
+    return Status::ServiceUnavailable("there are pending txn write operations");
+  }
+
+  return Status::OK();
+}
+
+Status TabletReplica::TxnOpDispatcher::EnqueueUnlocked(unique_ptr<WriteOpState> op) {
+  // TODO(aserbin): do we need to track username coming with write operations
+  //                to make sure there is no way to slip in write operations for
+  //                transactions of other users?
+  DCHECK(lock_.is_locked());
+  if (PREDICT_FALSE(ops_queue_.size() >= max_queue_size_)) {
+    return Status::ServiceUnavailable("pending operations queue is at capacity");
+  }
+  if (PREDICT_FALSE(!inflight_status_.ok())) {
+    return inflight_status_;
+  }
+  ops_queue_.emplace_back(std::move(op));
+  return Status::OK();
+}
+
+Status TabletReplica::TxnOpDispatcher::RespondWithStatus(
+    const Status& status,
+    deque<unique_ptr<WriteOpState>> ops) {
+  // Invoke the callback for every operation in the queue.
+  for (auto& op : ops) {
+    auto* cb = op->completion_callback();
+    DCHECK(cb);
+    cb->set_error(status);
+    cb->OpCompleted();
+  }
+  return status;
+}
+
 Status FlushInflightsToLogCallback::WaitForInflightsAndFlushLog() {
   // This callback is triggered prior to any TabletMetadata flush.
   // The guarantee that we are trying to enforce is this:
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 7121c48..711360d 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -18,10 +18,13 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <deque>
+#include <functional>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include <gtest/gtest_prod.h>
@@ -36,11 +39,13 @@
 #include "kudu/tablet/op_order_verifier.h"
 #include "kudu/tablet/ops/op.h"
 #include "kudu/tablet/ops/op_tracker.h"
+#include "kudu/tablet/ops/write_op.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
 
 namespace kudu {
 class AlterTableTest;
@@ -50,6 +55,13 @@ class MaintenanceOp;
 class MonoDelta;
 class ThreadPool;
 class ThreadPoolToken;
+class TxnOpDispatcherITest;
+class TxnOpDispatcherITest_DispatcherLifecycleMultipleReplicas_Test;
+class TxnOpDispatcherITest_DuplicateTxnParticipantRegistration_Test;
+class TxnOpDispatcherITest_ErrorInProcessingWriteOp_Test;
+class TxnOpDispatcherITest_LifecycleBasic_Test;
+class TxnOpDispatcherITest_NoPendingWriteOps_Test;
+class TxnOpDispatcherITest_PreliminaryTasksTimeout_Test;
 
 namespace consensus {
 class ConsensusMetadataManager;
@@ -77,7 +89,6 @@ class ParticipantOpState;
 class TabletStatusPB;
 class TxnCoordinator;
 class TxnCoordinatorFactory;
-class WriteOpState;
 
 // A replica in a tablet consensus configuration, which coordinates writes to tablets.
 // Each time Write() is called this class appends a new entry to a replicated
@@ -133,13 +144,33 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   bool IsShuttingDown() const;
 
   // Wait until the tablet is in a RUNNING state or if there's a timeout.
-  // TODO have a way to wait for any state?
+  // TODO(jdcryans): have a way to wait for any state?
   Status WaitUntilConsensusRunning(const MonoDelta& timeout);
 
+  // Submit a write operation 'op_state' attributed to a multi-row transaction.
+  // This is similar to SubmitWrite(), but it's more complicated because it is
+  // stateful. The necessary preliminary steps to complete before submitting the
+  // operation via SubmitWrite() are: (1) register the tablet as a participant
+  // in the corresponding transaction (2) issue BEGIN_TXN operation to the
+  // replica. The 'scheduler' functor is used to schedule the activities
+  // mentioned above.
+  Status SubmitTxnWrite(
+      std::unique_ptr<WriteOpState> op_state,
+      const std::function<Status(int64_t txn_id, StatusCallback cb)>& scheduler);
+
+  // Unregister TxnWriteOpDispacher for the specified transaction identifier
+  // 'txn_id'. If no pending write requests are accumulated by the dispatcher,
+  // the dispatcher is unregistered immediately and this method returns
+  // Status::OK. If any write request is pending, the dispatcher is marked to be
+  // unregistered and this method returns Status::ServiceUnavailable(),
+  // prompting the caller to try again later, unless 'abort_pending_ops' is set
+  // to 'true'. If 'abort_pending_ops' is set to true, all pending requests are
+  // responsed with Status::Aborted() status and the entry is removed.
+  Status UnregisterTxnOpDispatcher(int64_t txn_id, bool abort_pending_ops);
+
   // Submits a write to a tablet and executes it asynchronously.
   // The caller is expected to build and pass a WriteOpState that points to the
-  // RPC WriteRequest, WriteResponse, RpcContext and to the tablet's
-  // MvccManager.
+  // RPC's WriteRequest, and WriteResponse.
   Status SubmitWrite(std::unique_ptr<WriteOpState> op_state);
 
   // Submits an op to update transaction participant state, executing it
@@ -350,14 +381,116 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   // Decrease the task counter of the transaction status manager.
   void DecreaseTxnCoordinatorTaskCounter();
 
+  // Submit ParticipantOpPB::BEGIN_TXN operation for the specified transaction.
+  void BeginTxnParticipantOp(int64_t txn_id, StatusCallback cb);
+
  private:
   friend class kudu::AlterTableTest;
   friend class RefCountedThreadSafe<TabletReplica>;
   friend class TabletReplicaTest;
   friend class TabletReplicaTestBase;
-  FRIEND_TEST(TabletReplicaTest, TestMRSAnchorPreventsLogGC);
-  FRIEND_TEST(TabletReplicaTest, TestDMSAnchorPreventsLogGC);
+  friend class kudu::TxnOpDispatcherITest;
   FRIEND_TEST(TabletReplicaTest, TestActiveOpPreventsLogGC);
+  FRIEND_TEST(TabletReplicaTest, TestDMSAnchorPreventsLogGC);
+  FRIEND_TEST(TabletReplicaTest, TestMRSAnchorPreventsLogGC);
+  FRIEND_TEST(kudu::TxnOpDispatcherITest, LifecycleBasic);
+
+  // A class to properly dispatch transactional write operations arriving
+  // with TabletServerService::Write() RPC for the specified tablet replica.
+  // Before submitting the operations via TabletReplica::SubmitWrite(), it's
+  // necessary to register the tablet as a participant in the transaction and
+  // issue BEGIN_TXN operation for the target tablet. This class implements
+  // the logic to schedule those preliminary tasks while accumulating incoming
+  // write operations for the interval between the time when the very first
+  // write operation for a transaction arrives and the time when the preliminary
+  // tasks finish.
+  class TxnOpDispatcher: public std::enable_shared_from_this<TxnOpDispatcher> {
+   public:
+    // The 'max_queue_size' parameter sets the limit of how many operations
+    // this TxnOpDispatcher instance is allowed to buffer before starting to
+    // reject new ones with Status::ServiceUnavailable error.
+    TxnOpDispatcher(TabletReplica* replica,
+                    size_t max_queue_size)
+        : replica_(replica),
+          max_queue_size_(max_queue_size),
+          preliminary_tasks_completed_(false),
+          unregistered_(false),
+          inflight_status_(Status::OK()) {
+    }
+
+    // Dispatch specified write operation: either put it into the queue,
+    // or submit it immediately via TabletReplica::SubmitWrite(), or reject the
+    // operation. In the two former cases, returns Status::OK(); in the latter
+    // case returns non-OK status correspondingly. The 'scheduler' function is
+    // invoked to schedule preliminary tasks, if necessary.
+    Status Dispatch(std::unique_ptr<WriteOpState> op,
+                    const std::function<Status(int64_t txn_id,
+                                               StatusCallback cb)>& scheduler);
+
+    // Submit all pending operations. Returns OK if all operations have been
+    // submitted successfully, or 'inflight_status_' if any of those failed.
+    Status Submit();
+
+    // Invoke callbacks for every buffered operation with the 'status';
+    // the 'status' must be a non-OK one.
+    void Cancel(const Status& status);
+
+    // Mark the dispatcher as not accepting any write operations: this is to
+    // eventually unregister the dispatcher for the corresponding transaction
+    // (i.e. remove the element from the map of available dispatchers). In the
+    // unlikely event of the presence of pending write operations, this method
+    // returns Status::ServiceUnavailable().
+    Status MarkUnregistered();
+
+   private:
+    FRIEND_TEST(kudu::TxnOpDispatcherITest, DispatcherLifecycleMultipleReplicas);
+    FRIEND_TEST(kudu::TxnOpDispatcherITest, DuplicateTxnParticipantRegistration);
+    FRIEND_TEST(kudu::TxnOpDispatcherITest, ErrorInProcessingWriteOp);
+    FRIEND_TEST(kudu::TxnOpDispatcherITest, LifecycleBasic);
+    FRIEND_TEST(kudu::TxnOpDispatcherITest, NoPendingWriteOps);
+    FRIEND_TEST(kudu::TxnOpDispatcherITest, PreliminaryTasksTimeout);
+
+    // Add the specified operation into the queue.
+    Status EnqueueUnlocked(std::unique_ptr<WriteOpState> op);
+
+    // Respond to the given write operations with the specified status.
+    static Status RespondWithStatus(
+        const Status& status,
+        std::deque<std::unique_ptr<WriteOpState>> ops);
+
+    // Pointer to the parent TabletReplica instance which keeps this
+    // TxnOpDispatcher instance in its 'txn_op_dispatchers_' map.
+    TabletReplica* const replica_;
+
+    // Maximum number of transactional write operation to buffer in the
+    // 'ops_queue_' before completing all the preliminary tasks which are
+    // required to start processing transactional write operations for the
+    // tablet.
+    const size_t max_queue_size_;
+
+    // Protects the members below: preliminary_tasks_completed_, unregistered_,
+    // inflight_status_, ops_queue_.
+    mutable simple_spinlock lock_;
+
+    // Whether the preliminary tasks are completed and this instance is
+    // ready to submit incoming txn write operations directly via
+    // TabletReplica::SubmitWrite().
+    bool preliminary_tasks_completed_;
+
+    // Whether this instance has been marked as unregistered and pending
+    // destruction.
+    bool unregistered_;
+
+    // This field stores the first non-OK status (if any) returned by
+    // TabletReplica::SubmitWrite() when submitting pending operations from
+    // 'ops_queue_' upon completion of preliminary tasks.
+    Status inflight_status_;
+
+    // Queue to buffer txn write operations while the preliminary work of
+    // registering the tablet as a participant in the transaction, etc. are
+    // in progress.
+    std::deque<std::unique_ptr<WriteOpState>> ops_queue_;
+  };
 
   ~TabletReplica();
 
@@ -403,6 +536,19 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   // Track the number of on-going tasks of the transaction status manager.
   int txn_coordinator_task_counter_;
 
+  // Maps txn_id --> txn write op dispatcher. This map stores ref-counted
+  // pointers instead of instances/references because an element can be removed
+  // from the map (unregistered) while concurrently still receiving requests
+  // (the latter will be rejected with Status::IllegalState). An alternative to
+  // this scheme with ref-counted pointers would be
+  //   (a) process _all_ transactional requests under the same giant lock
+  //       (even for different transactions)
+  //   (b) change the lifecycle of an entry in the txn_op_dispatcher_ map,
+  //       introducing an alternative approach to clean-up obsolete elements
+  //       from the map
+  std::unordered_map<int64_t, std::shared_ptr<TxnOpDispatcher>> txn_op_dispatchers_;
+  simple_spinlock txn_op_dispatchers_lock_; // protects 'txn_op_dispatchers_'
+
   // Function to mark this TabletReplica's tablet as dirty in the TSTabletManager.
   //
   // Must be called whenever cluster membership or leadership changes, or when
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 0a1834f..9385608 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -110,6 +110,7 @@
 #include "kudu/util/random_util.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
@@ -172,6 +173,11 @@ DEFINE_double(tserver_inject_invalid_authz_token_ratio, 0.0,
               "Fraction of the time that authz token validation will fail. Used for tests.");
 TAG_FLAG(tserver_inject_invalid_authz_token_ratio, hidden);
 
+DEFINE_bool(tserver_txn_write_op_handling_enabled, true,
+            "Whether to enable appropriate handling of write operations "
+            "in the context of multi-row transactions");
+TAG_FLAG(tserver_txn_write_op_handling_enabled, hidden);
+
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_int32(memory_limit_warn_threshold_percentage);
 DECLARE_int32(tablet_history_max_age_sec);
@@ -1581,11 +1587,10 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
       static const Status kStatus = Status::ServiceUnavailable(
           "op apply queue is overloaded");
       num_op_apply_queue_rejections_->Increment();
-      SetupErrorAndRespond(resp->mutable_error(),
-                           kStatus,
-                           TabletServerErrorPB::THROTTLED,
-                           context);
-      return;
+      return SetupErrorAndRespond(resp->mutable_error(),
+                                  kStatus,
+                                  TabletServerErrorPB::THROTTLED,
+                                  context);
     }
   }
 
@@ -1603,23 +1608,41 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
     s = server_->clock()->Update(ts);
   }
   if (PREDICT_FALSE(!s.ok())) {
-    SetupErrorAndRespond(resp->mutable_error(), s,
-                         TabletServerErrorPB::UNKNOWN_ERROR,
-                         context);
-    return;
+    return SetupErrorAndRespond(
+        resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context);
   }
 
   op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
       new RpcOpCompletionCallback<WriteResponsePB>(context, resp)));
 
-  // Submit the write. The RPC will be responded to asynchronously.
-  s = replica->SubmitWrite(std::move(op_state));
+  const auto deadline = context->GetClientDeadline();
+  const auto& username = context->remote_user().username();
+
+  if (!req->has_txn_id() ||
+      PREDICT_FALSE(!FLAGS_tserver_txn_write_op_handling_enabled)) {
+    // Submit the write operation. The RPC will be responded asynchronously.
+    s = replica->SubmitWrite(std::move(op_state));
+  } else {
+    // If it's a write operation in the context of a multi-row transaction,
+    // schedule running preliminary tasks if necessary: register the tablet as
+    // a participant in the transaction and begin transaction for the
+    // participating tablet.
+    //
+    // This functor is to schedule preliminary tasks prior to submitting
+    // the write operation via TabletReplica::SubmitWrite().
+    const auto scheduler = [this, &username, replica, deadline](
+        int64_t txn_id, StatusCallback cb) {
+      return server_->tablet_manager()->SchedulePreliminaryTasksForTxnWrite(
+          std::move(replica), txn_id, username, deadline, std::move(cb));
+    };
+    s = replica->SubmitTxnWrite(std::move(op_state), scheduler);
+    VLOG(2) << Substitute("submitting txn write op: $0", s.ToString());
+  }
 
   // Check that we could submit the write
   if (PREDICT_FALSE(!s.ok())) {
-    SetupErrorAndRespond(resp->mutable_error(), s,
-                         TabletServerErrorPB::UNKNOWN_ERROR,
-                         context);
+    return SetupErrorAndRespond(
+        resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context);
   }
 }
 
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 9e25e7c..989831e 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -18,6 +18,7 @@
 #include "kudu/tserver/ts_tablet_manager.h"
 
 #include <cstdint>
+#include <functional>
 #include <memory>
 #include <mutex>
 #include <ostream>
@@ -57,6 +58,7 @@
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/txn_coordinator.h"
 #include "kudu/transactions/txn_status_manager.h"
+#include "kudu/transactions/txn_system_client.h"
 #include "kudu/tserver/heartbeater.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/util/debug/trace_event.h"
@@ -103,6 +105,13 @@ DEFINE_int32(txn_commit_pool_num_threads, 10,
              "Number of threads available for transaction commit tasks.");
 TAG_FLAG(txn_commit_pool_num_threads, advanced);
 
+DEFINE_int32(txn_participant_registration_pool_num_threads, 10,
+             "Number of threads available for tasks to register tablets as "
+             "transaction participants upon receiving write operations "
+             "in the context of multi-row transactions");
+TAG_FLAG(txn_participant_registration_pool_num_threads, advanced);
+TAG_FLAG(txn_participant_registration_pool_num_threads, experimental);
+
 DEFINE_int32(tablet_start_warn_threshold_ms, 500,
              "If a tablet takes more than this number of millis to start, issue "
              "a warning with a trace.");
@@ -162,6 +171,18 @@ DEFINE_uint32(txn_staleness_tracker_disabled_interval_ms, 60000,
               "This is made configurable only for testing purposes.");
 TAG_FLAG(txn_staleness_tracker_disabled_interval_ms, hidden);
 
+DEFINE_int32(txn_participant_begin_op_inject_latency_ms, 0,
+             "Amount of delay in milliseconds to inject before issuing "
+             "BEGIN_TXN operation for a participating tablet");
+TAG_FLAG(txn_participant_begin_op_inject_latency_ms, runtime);
+TAG_FLAG(txn_participant_begin_op_inject_latency_ms, unsafe);
+
+DEFINE_int32(txn_participant_registration_inject_latency_ms, 0,
+             "Amount of delay in milliseconds to inject before registering "
+             "tablet as a participant in a multi-row transaction");
+TAG_FLAG(txn_participant_registration_inject_latency_ms, runtime);
+TAG_FLAG(txn_participant_registration_inject_latency_ms, unsafe);
+
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_uint32(txn_staleness_tracker_interval_ms);
 
@@ -408,6 +429,11 @@ Status TSTabletManager::Init() {
                 .set_max_threads(max_reload_threads)
                 .Build(&reload_txn_status_tablet_pool_));
 
+  RETURN_NOT_OK(
+      ThreadPoolBuilder("txn-participant-registration")
+      .set_max_threads(FLAGS_txn_participant_registration_pool_num_threads)
+      .Build(&txn_participant_registration_pool_));
+
   // TODO(aserbin): if better parallelism is needed to serve higher txn volume,
   //                consider using multiple threads in this pool and schedule
   //                per-tablet-replica clean-up tasks via threadpool serial
@@ -1093,6 +1119,61 @@ Status TSTabletManager::CheckRunningUnlocked(
                                                TSTabletManagerStatePB_Name(state_)));
 }
 
+void TSTabletManager::RegisterAndBeginParticipantTxnTask(
+    transactions::TxnSystemClient* txn_system_client,
+    scoped_refptr<TabletReplica> replica,
+    int64_t txn_id,
+    const string& user,
+    MonoTime deadline,
+    StatusCallback cb) {
+  DCHECK(txn_system_client);
+  // TODO(aserbin): add and update metrics to track how long these calls take
+  // TODO(aserbin): a future improvement to reduce overall latency is to use
+  //                the async variant of RegisterParticipant RPC that shares
+  //                a callback with the BEGIN_TXN op scheduled below. The
+  //                callback's code could check whether both the registration
+  //                and the BEGIN_TXN op are complete, and if so, submit the
+  //                pending write operations received so far in the context of
+  //                this transaction. That said, it seems like it would
+  //                complicate the error handling and would require a special
+  //                cleanup procedure for the orphaned BEGIN_TXN operations
+  //                when RegisterParticipant could not be completed even after
+  //                multiple retries.
+  VLOG(3) << Substitute("registering participant $0 for txn ID $1",
+                        replica->tablet_id(), txn_id);
+
+  // This is to simulate scheduling anomalies when the thread that runs this
+  // task has been sitting aside for too long.
+  MAYBE_INJECT_FIXED_LATENCY(FLAGS_txn_participant_registration_inject_latency_ms);
+
+  {
+    const auto now = MonoTime::Now();
+    if (deadline <= now) {
+      return cb(Status::TimedOut(
+          Substitute("time out prior registering tablet $0 as participant (txn ID $1)",
+          replica->tablet_id(), txn_id)));
+    }
+    auto s = txn_system_client->RegisterParticipant(
+        txn_id, replica->tablet_id(), user, deadline - now);
+    VLOG(2) << Substitute("RegisterParticipant() $0 for txn ID $1 returned $2",
+                          replica->tablet_id(), txn_id, s.ToString());
+    if (PREDICT_FALSE(!s.ok())) {
+      return cb(s);
+    }
+  }
+
+  // This is to simulate a situation when txn participant registration above
+  // takes too long.
+  MAYBE_INJECT_FIXED_LATENCY(FLAGS_txn_participant_begin_op_inject_latency_ms);
+
+  if (deadline <= MonoTime::Now()) {
+    return cb(Status::TimedOut(Substitute(
+        "time out prior submitting BEGIN_TXN for participant $0 (txn ID $1)",
+        replica->tablet_id(), txn_id)));
+  }
+  return replica->BeginTxnParticipantOp(txn_id, std::move(cb));
+}
+
 Status TSTabletManager::StartTabletStateTransitionUnlocked(
     const string& tablet_id,
     const string& reason,
@@ -1272,6 +1353,9 @@ void TSTabletManager::Shutdown() {
   // Shut down the delete pool, so no new tablets are deleted after this point.
   delete_tablet_pool_->Shutdown();
 
+  // Shut down the transaction participant registration pool.
+  txn_participant_registration_pool_->Shutdown();
+
   // Signal the only task running on the txn_status_manager_pool_ to wrap up.
   shutdown_latch_.CountDown();
   // Shut down the pool running the dedicated TxnStatusManager-related task.
@@ -1782,6 +1866,26 @@ void TSTabletManager::UpdateTabletStatsIfNecessary() {
   }
 }
 
+Status TSTabletManager::SchedulePreliminaryTasksForTxnWrite(
+    scoped_refptr<TabletReplica> replica,
+    int64_t txn_id,
+    const string& user,
+    MonoTime deadline,
+    StatusCallback cb) {
+  // An important pre-condition to running operations below: the availability
+  // of the transaction system client.
+  transactions::TxnSystemClient* tsc = nullptr;
+  RETURN_NOT_OK(server_->txn_client_initializer()->GetClient(&tsc));
+  DCHECK(tsc);
+  RETURN_NOT_OK(tsc->CheckOpenTxnStatusTable());
+  return txn_participant_registration_pool_->Submit(
+      [this, replica = std::move(replica), txn_id, tsc, user,
+          deadline, cb = std::move(cb)]() {
+    this->RegisterAndBeginParticipantTxnTask(
+        tsc, std::move(replica), txn_id, user, deadline, std::move(cb));
+  });
+}
+
 void TSTabletManager::SetNextUpdateTimeForTests() {
   std::lock_guard<rw_spinlock> l(lock_update_);
   next_update_time_ = MonoTime::Now();
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index fbbbf14..f5e1f92 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -44,6 +44,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/rw_mutex.h"
 #include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
 
 namespace boost {
 template <class T>
@@ -58,6 +59,9 @@ class Partition;
 class PartitionSchema;
 class Schema;
 class ThreadPool;
+namespace transactions {
+class TxnSystemClient;
+}  // namespace transactions
 
 namespace consensus {
 class ConsensusMetadataManager;
@@ -240,6 +244,14 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   // Update the tablet statistics if necessary.
   void UpdateTabletStatsIfNecessary();
 
+  // Schedule preliminary tasks to process
+  Status SchedulePreliminaryTasksForTxnWrite(
+      scoped_refptr<tablet::TabletReplica> replica,
+      int64_t txn_id,
+      const std::string& user,
+      MonoTime deadline,
+      StatusCallback cb);
+
  private:
   FRIEND_TEST(LeadershipChangeReportingTest, TestReportStatsDuringLeadershipChange);
   FRIEND_TEST(TsTabletManagerTest, TestPersistBlocks);
@@ -258,6 +270,14 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
     return LogPrefix(tablet_id, fs_manager_);
   }
 
+  static void RegisterAndBeginParticipantTxnTask(
+      transactions::TxnSystemClient* txn_system_client,
+      scoped_refptr<tablet::TabletReplica> replica,
+      int64_t txn_id,
+      const std::string& user,
+      MonoTime deadline,
+      StatusCallback cb);
+
   // Returns Status::OK() iff state_ == MANAGER_RUNNING.
   Status CheckRunningUnlocked(TabletServerErrorPB::Code* error_code) const;
 
@@ -359,7 +379,7 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
 
   const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;
 
-  TabletServer* server_;
+  TabletServer* const server_;
 
   consensus::RaftPeerPB local_peer_pb_;
 
@@ -418,6 +438,11 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   // Thread pool used to perform background tasks on transactions, e.g. to commit.
   std::unique_ptr<ThreadPool> txn_commit_pool_;
 
+  // Thread pool to perform preliminary tasks when processing write operations
+  // in the context of a multi-row transaction. Such tasks include registering
+  // tablet as a participant in the corresponding transaction, etc.
+  std::unique_ptr<ThreadPool> txn_participant_registration_pool_;
+
   // Thread pool to run TxnStatusManager tasks. As of now, this pool is
   // to run a long-running single periodic task to abort stale transactions
   // registered with corresponding transaction status tablets.