You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/08/03 17:26:09 UTC

[kudu] branch master updated: KUDU-2612 p6: add coordination calls to system client

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new cb1c2ef  KUDU-2612 p6: add coordination calls to system client
cb1c2ef is described below

commit cb1c2efb59373453e734074a02021f14c403257d
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Jul 9 16:48:30 2020 -0700

    KUDU-2612 p6: add coordination calls to system client
    
    This adds the following calls to the TxnSystemClient:
    - BeginTransaction()
    - RegisterParticipant()
    
    whose APIs roughly match those of the TxnStatusManager. For the sake of
    a more focused patch, I only added these couple to build out reusable
    pieces of the system client. Later patches will extend the client with
    BeginCommitTransaction() and AbortTransaction() calls.
    
    All of these calls share the same RPC-sending code, which draws loose
    inspiration from the client::Batcher. I considered templatizing/reusing
    the Batcher, but eventually opted not to for several reasons:
    - We don't need all the bells and whistles attached to the Batcher, e.g.
      timestamp tracking and consistency modes, session-based API, etc.
    - The complexity of abstracting the Batcher to the point of using it for
      other RPC types would have made the code very unwieldy and difficult
      to maintain and extend.
    - Not reusing the Batcher gives us an opportunity moving forward to
      consider other approaches to batching (e.g. batching by server rather
      than by tablet ID).
    
    Where the implementation draws inspiration from the Batcher is its usage
    of the MetaCache to attach callbacks to tablet lookups, its
    retry-handling, and its memory management -- namely, each request is
    encapsulated in such a way that the memory used to track the call
    automatically frees itself upon completion.
    
    Change-Id: I4126cb3dcf379b397f84578c2265dca3ece3d98c
    Reviewed-on: http://gerrit.cloudera.org:8080/16194
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/client.h                           |   3 +
 src/kudu/client/meta_cache.cc                      |  10 +
 src/kudu/client/meta_cache.h                       |   3 +
 .../integration-tests/auth_token_expire-itest.cc   |  29 +++
 .../client-negotiation-failover-itest.cc           |  59 ++++-
 .../integration-tests/ts_tablet_manager-itest.cc   |   5 +-
 .../integration-tests/txn_status_table-itest.cc    | 256 +++++++++++++++++++--
 src/kudu/tablet/ops/op.h                           |   1 +
 src/kudu/tablet/txn_coordinator.h                  |  38 ++-
 src/kudu/transactions/CMakeLists.txt               |   1 +
 src/kudu/transactions/coordinator_rpc.cc           | 209 +++++++++++++++++
 src/kudu/transactions/coordinator_rpc.h            |  90 ++++++++
 src/kudu/transactions/txn_status_manager-test.cc   |  91 ++++----
 src/kudu/transactions/txn_status_manager.cc        |  27 ++-
 src/kudu/transactions/txn_status_manager.h         |  16 +-
 src/kudu/transactions/txn_status_tablet-test.cc    |  42 ++--
 src/kudu/transactions/txn_status_tablet.cc         |  27 ++-
 src/kudu/transactions/txn_status_tablet.h          |  26 ++-
 src/kudu/transactions/txn_system_client.cc         | 100 +++++++-
 src/kudu/transactions/txn_system_client.h          |  48 +++-
 src/kudu/tserver/tablet_service.cc                 |  26 ++-
 21 files changed, 972 insertions(+), 135 deletions(-)

diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index e665937..e845c8f 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -67,6 +67,7 @@ class KuduTable;
 } // namespace client
 
 namespace transactions {
+class CoordinatorRpc;
 class TxnSystemClient;
 } // namespace transactions
 
@@ -622,6 +623,8 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   friend class internal::WriteRpc;
   friend class kudu::AuthzTokenTest;
   friend class kudu::SecurityUnknownTskTest;
+  friend class transactions::CoordinatorRpc;
+  friend class transactions::TxnSystemClient;
   friend class tools::LeaderMasterProxy;
   friend class tools::RemoteKsckCluster;
 
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 4d24729..5e3d110 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -48,6 +48,7 @@
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
@@ -66,6 +67,7 @@ using kudu::master::TabletLocationsPB;
 using kudu::master::TSInfoPB;
 using kudu::rpc::BackoffType;
 using kudu::rpc::CredentialsPolicy;
+using kudu::tserver::TabletServerAdminServiceProxy;
 using kudu::tserver::TabletServerServiceProxy;
 using std::set;
 using std::shared_ptr;
@@ -117,6 +119,8 @@ void RemoteTabletServer::DnsResolutionFinished(const HostPort& hp,
   {
     std::lock_guard<simple_spinlock> l(lock_);
     proxy_.reset(new TabletServerServiceProxy(client->data_->messenger_, (*addrs)[0], hp.host()));
+    admin_proxy_.reset(
+        new TabletServerAdminServiceProxy(client->data_->messenger_, (*addrs)[0], hp.host()));
     proxy_->set_user_credentials(client->data_->user_credentials_);
   }
   user_callback(s);
@@ -198,6 +202,12 @@ shared_ptr<TabletServerServiceProxy> RemoteTabletServer::proxy() const {
   return proxy_;
 }
 
+shared_ptr<TabletServerAdminServiceProxy> RemoteTabletServer::admin_proxy() {
+  std::lock_guard<simple_spinlock> l(lock_);
+  DCHECK(admin_proxy_);
+  return admin_proxy_;
+}
+
 string RemoteTabletServer::ToString() const {
   string ret = uuid_;
   std::lock_guard<simple_spinlock> l(lock_);
diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h
index 41e608f..9daa8e5 100644
--- a/src/kudu/client/meta_cache.h
+++ b/src/kudu/client/meta_cache.h
@@ -51,6 +51,7 @@ class Sockaddr;
 
 namespace tserver {
 class TabletServerServiceProxy;
+class TabletServerAdminServiceProxy;
 } // namespace tserver
 
 namespace master {
@@ -100,6 +101,7 @@ class RemoteTabletServer {
   // Return the current proxy to this tablet server. Requires that InitProxy()
   // be called prior to this.
   std::shared_ptr<tserver::TabletServerServiceProxy> proxy() const;
+  std::shared_ptr<tserver::TabletServerAdminServiceProxy> admin_proxy();
 
   std::string ToString() const;
 
@@ -133,6 +135,7 @@ class RemoteTabletServer {
   boost::optional<std::string> unix_domain_socket_path_;
 
   std::shared_ptr<tserver::TabletServerServiceProxy> proxy_;
+  std::shared_ptr<tserver::TabletServerAdminServiceProxy> admin_proxy_;
 
   DISALLOW_COPY_AND_ASSIGN(RemoteTabletServer);
 };
diff --git a/src/kudu/integration-tests/auth_token_expire-itest.cc b/src/kudu/integration-tests/auth_token_expire-itest.cc
index a77fcfd..4f07bb9 100644
--- a/src/kudu/integration-tests/auth_token_expire-itest.cc
+++ b/src/kudu/integration-tests/auth_token_expire-itest.cc
@@ -41,6 +41,7 @@
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/tablet/key_value_test_schema.h"
+#include "kudu/transactions/txn_system_client.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
@@ -53,8 +54,10 @@ DECLARE_bool(rpc_reopen_outbound_connections);
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::ExternalMiniCluster;
 using kudu::cluster::ExternalMiniClusterOptions;
+using kudu::transactions::TxnSystemClient;
 using std::string;
 using std::unique_ptr;
+using std::vector;
 using strings::Substitute;
 
 namespace kudu {
@@ -449,6 +452,32 @@ TEST_F(TokenBasedConnectionITest, ReacquireAuthnToken) {
   NO_FATALS(cluster_->AssertNoCrashes());
 }
 
+// Like the above test but testing the transaction system client and its access
+// of the transaction status table.
+TEST_F(TokenBasedConnectionITest, TxnSystemClientReacquireAuthnToken) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  vector<string> master_addrs;
+  for (const auto& hp : cluster_->master_rpc_addrs()) {
+    master_addrs.emplace_back(hp.ToString());
+  }
+  unique_ptr<TxnSystemClient> txn_client;
+  ASSERT_OK(TxnSystemClient::Create(master_addrs, &txn_client));
+  ASSERT_OK(txn_client->CreateTxnStatusTable(10));
+  ASSERT_OK(txn_client->OpenTxnStatusTable());
+
+  // Reset all connections with the cluster. Since authn token validty is
+  // checked for new connections (but not for existing non-idle connections),
+  // this will ensure our token expiration is checked below.
+  cluster_->Shutdown();
+  ASSERT_OK(cluster_->Restart());
+
+  // Wait for the initial authn token to expire and try to access the cluster.
+  // Try making a connection to the tablet server for the first time. It should
+  // automatically fetch a new token and succeed.
+  SleepFor(MonoDelta::FromSeconds(authn_token_validity_seconds_ + 1));
+  ASSERT_OK(txn_client->BeginTransaction(1, "user"));
+}
+
 // Test for scenarios involving multiple masters where
 // client-to-non-leader-master connections are closed due to inactivity,
 // but the connection to the former leader master is kept open.
diff --git a/src/kudu/integration-tests/client-negotiation-failover-itest.cc b/src/kudu/integration-tests/client-negotiation-failover-itest.cc
index 5b0c893..68ed99d 100644
--- a/src/kudu/integration-tests/client-negotiation-failover-itest.cc
+++ b/src/kudu/integration-tests/client-negotiation-failover-itest.cc
@@ -34,7 +34,9 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/tablet/key_value_test_schema.h"
+#include "kudu/transactions/txn_system_client.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -53,6 +55,7 @@ using kudu::cluster::ExternalMiniCluster;
 using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::cluster::ExternalTabletServer;
 using kudu::cluster::ScopedResumeExternalDaemon;
+using kudu::transactions::TxnSystemClient;
 using std::string;
 using std::thread;
 using std::unique_ptr;
@@ -163,17 +166,17 @@ TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu1580ConnectToTServer) {
     // Resume 2 out of 3 tablet servers (i.e. the majority), so the client
     // could eventially succeed with its write operations.
     thread resume_thread([&]() {
-        const int idx0 = rand() % kNumTabletServers;
-        unique_ptr<ScopedResumeExternalDaemon> r0(resumers[idx0].release());
-        const int idx1 = (idx0 + 1) % kNumTabletServers;
-        unique_ptr<ScopedResumeExternalDaemon> r1(resumers[idx1].release());
-        SleepFor(MonoDelta::FromSeconds(1));
-      });
+      const int idx0 = rand() % kNumTabletServers;
+      unique_ptr<ScopedResumeExternalDaemon> r0(resumers[idx0].release());
+      const int idx1 = (idx0 + 1) % kNumTabletServers;
+      unique_ptr<ScopedResumeExternalDaemon> r1(resumers[idx1].release());
+      SleepFor(MonoDelta::FromSeconds(1));
+    });
     // An automatic clean-up to handle both success and failure cases
     // in the code below.
     SCOPED_CLEANUP({
-        resume_thread.join();
-      });
+      resume_thread.join();
+    });
 
     // Since the table is hash-partitioned with kNumTabletServer partitions,
     // hopefully three sequential numbers would go into different partitions.
@@ -186,6 +189,46 @@ TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu1580ConnectToTServer) {
   }
 }
 
+// Like the above test but testing the transaction system client.
+TEST_F(ClientFailoverOnNegotiationTimeoutITest, TestTxnSystemClientRetryOnPause) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  static const int kNumTabletServers = 3;
+  cluster_opts_.num_tablet_servers = kNumTabletServers;
+  ASSERT_OK(CreateAndStartCluster());
+
+  vector<string> master_addrs;
+  for (const auto& hp : cluster_->master_rpc_addrs()) {
+    master_addrs.emplace_back(hp.ToString());
+  }
+  unique_ptr<TxnSystemClient> txn_client;
+  ASSERT_OK(TxnSystemClient::Create(master_addrs, &txn_client));
+  ASSERT_OK(txn_client->CreateTxnStatusTable(100, kNumTabletServers));
+  ASSERT_OK(txn_client->OpenTxnStatusTable());
+
+  vector<unique_ptr<ScopedResumeExternalDaemon>> resumers;
+  for (int i = 0; i < kNumTabletServers; i++) {
+    ExternalTabletServer* ts = cluster_->tablet_server(i);
+    ASSERT_OK(cluster_->tablet_server(i)->Pause());
+    resumers.emplace_back(new ScopedResumeExternalDaemon(ts));
+  }
+
+  // Resume a random majority so the system client can proceed.
+  thread resume_thread([&]() {
+    const int idx0 = rand() % kNumTabletServers;
+    unique_ptr<ScopedResumeExternalDaemon> r0(resumers[idx0].release());
+    const int idx1 = (idx0 + 1) % kNumTabletServers;
+    unique_ptr<ScopedResumeExternalDaemon> r1(resumers[idx1].release());
+    SleepFor(MonoDelta::FromSeconds(1));
+  });
+  SCOPED_CLEANUP({
+    resume_thread.join();
+  });
+
+  for (int i = 1; i < 10; i++) {
+    ASSERT_OK(txn_client->BeginTransaction(i, "bob"));
+  }
+}
+
 // Regression test for KUDU-2021: if client times out on establishing a
 // connection to the leader master, it should retry with other master in case of
 // a multi-master configuration.
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index bf58916..f6cc980 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -1080,11 +1080,12 @@ class TxnStatusTabletManagementTest : public TsTabletManagerITest {
   }
 
   static Status StartTransactions(const ParticipantIdsByTxnId& txns, TxnCoordinator* coordinator) {
+    TabletServerErrorPB ts_error;
     for (const auto& txn_id_and_prt_ids : txns) {
       const auto& txn_id = txn_id_and_prt_ids.first;
-      RETURN_NOT_OK(coordinator->BeginTransaction(txn_id, kOwner));
+      RETURN_NOT_OK(coordinator->BeginTransaction(txn_id, kOwner, &ts_error));
       for (const auto& prt_id : txn_id_and_prt_ids.second) {
-        RETURN_NOT_OK(coordinator->RegisterParticipant(txn_id, prt_id, kOwner));
+        RETURN_NOT_OK(coordinator->RegisterParticipant(txn_id, prt_id, kOwner, &ts_error));
       }
     }
     return Status::OK();
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc
index cc73101..8c5c2a3 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -15,11 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
+#include <atomic>
+#include <cstdint>
 #include <functional>
 #include <map>
 #include <memory>
 #include <string>
-#include <utility>
+#include <thread>
+#include <unordered_map>
 #include <vector>
 
 #include <boost/none_t.hpp>
@@ -32,13 +36,18 @@
 #include "kudu/client/client.pb.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/master/master.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/master/ts_manager.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/transactions/txn_status_tablet.h"
@@ -46,30 +55,45 @@
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_string(superuser_acl);
 DECLARE_string(user_acl);
 
+using kudu::client::sp::shared_ptr;
 using kudu::client::AuthenticationCredentialsPB;
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
 using kudu::client::KuduTable;
 using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
+using kudu::itest::TabletServerMap;
+using kudu::itest::TServerDetails;
 using kudu::tablet::TabletReplica;
 using kudu::transactions::TxnSystemClient;
 using kudu::transactions::TxnStatusTablet;
 using std::map;
 using std::string;
+using std::thread;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace itest {
 
+namespace {
+string ParticipantId(int i) {
+  return Substitute("strawhat-$0", i);
+}
+} // anonymous namespace
+
 class TxnStatusTableITest : public KuduTest {
  public:
   TxnStatusTableITest() {}
@@ -115,14 +139,14 @@ class TxnStatusTableITest : public KuduTest {
   }
 
   // Creates and returns a client as the given user.
-  Status CreateClientAs(const string& user, client::sp::shared_ptr<KuduClient>* client) {
+  Status CreateClientAs(const string& user, shared_ptr<KuduClient>* client) {
     KuduClientBuilder client_builder;
     string authn_creds;
     AuthenticationCredentialsPB pb;
     pb.set_real_user(user);
     CHECK(pb.SerializeToString(&authn_creds));
     client_builder.import_authentication_credentials(authn_creds);
-    client::sp::shared_ptr<KuduClient> c;
+    shared_ptr<KuduClient> c;
     RETURN_NOT_OK(cluster_->CreateClient(&client_builder, &c));
     *client = std::move(c);
     return Status::OK();
@@ -140,11 +164,12 @@ class TxnStatusTableITest : public KuduTest {
     });
   }
 
-  client::sp::shared_ptr<KuduClient> client_sp() {
+  shared_ptr<KuduClient> client_sp() {
     return txn_sys_client_->client_;
   }
 
  protected:
+  static constexpr const char* kUser = "user";
   unique_ptr<InternalMiniCluster> cluster_;
   unique_ptr<TxnSystemClient> txn_sys_client_;
 };
@@ -166,7 +191,7 @@ TEST_F(TxnStatusTableITest, TestTxnStatusTableNotListed) {
   ASSERT_OK(client->TableExists(kTableName, &exists));
   ASSERT_TRUE(exists);
 
-  client::sp::shared_ptr<KuduTable> table;
+  shared_ptr<KuduTable> table;
   ASSERT_OK(client->OpenTable(kTableName, &table));
   ASSERT_NE(nullptr, table);
 }
@@ -177,7 +202,7 @@ TEST_F(TxnStatusTableITest, TestProtectCreateAndAlter) {
   auto service_client = client_sp();
   // We're both a super user and a service user since the ACLs default to "*".
   // We should thus have access to the table.
-  ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, service_client.get()));
+  ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, 1, service_client.get()));
   ASSERT_OK(TxnSystemClient::AddTxnStatusTableRangeWithClient(100, 200, service_client.get()));
   const string& kTableName = TxnStatusTablet::kTxnStatusTableName;
   ASSERT_OK(service_client->DeleteTable(kTableName));
@@ -185,7 +210,7 @@ TEST_F(TxnStatusTableITest, TestProtectCreateAndAlter) {
   // The service user should be able to create and alter the transaction status
   // table.
   NO_FATALS(SetSuperuserAndUser("nobody", "nobody"));
-  ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, service_client.get()));
+  ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, 1, service_client.get()));
   ASSERT_OK(TxnSystemClient::AddTxnStatusTableRangeWithClient(100, 200, service_client.get()));
 
   // The service user doesn't have access to the delete table endpoint.
@@ -196,18 +221,18 @@ TEST_F(TxnStatusTableITest, TestProtectCreateAndAlter) {
   // The super user should be able to create, alter, and delete the transaction
   // status table.
   NO_FATALS(SetSuperuserAndUser("bob", "nobody"));
-  client::sp::shared_ptr<KuduClient> bob_client;
+  shared_ptr<KuduClient> bob_client;
   ASSERT_OK(CreateClientAs("bob", &bob_client));
   ASSERT_OK(bob_client->DeleteTable(kTableName));
-  ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, bob_client.get()));
+  ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, 1, bob_client.get()));
   ASSERT_OK(TxnSystemClient::AddTxnStatusTableRangeWithClient(100, 200, bob_client.get()));
   ASSERT_OK(bob_client->DeleteTable(kTableName));
 
   // Regular users shouldn't be able to do anything.
   NO_FATALS(SetSuperuserAndUser("nobody", "bob"));
-  s = TxnSystemClient::CreateTxnStatusTableWithClient(100, bob_client.get());
+  s = TxnSystemClient::CreateTxnStatusTableWithClient(100, 1, bob_client.get());
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-  ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, service_client.get()));
+  ASSERT_OK(TxnSystemClient::CreateTxnStatusTableWithClient(100, 1, service_client.get()));
   s = TxnSystemClient::AddTxnStatusTableRangeWithClient(100, 200, bob_client.get());
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
   s = service_client->DeleteTable(kTableName);
@@ -232,7 +257,7 @@ TEST_F(TxnStatusTableITest, TestProtectAccess) {
     vector<Status> results;
     bool unused;
     STORE_AND_PREPEND(client->TableExists(kTableName, &unused), "failed to check existence");
-    client::sp::shared_ptr<KuduTable> unused_table;
+    shared_ptr<KuduTable> unused_table;
     STORE_AND_PREPEND(client->OpenTable(kTableName, &unused_table), "failed to open table");
     client::KuduTableStatistics* unused_stats = nullptr;
     STORE_AND_PREPEND(client->GetTableStatistics(kTableName, &unused_stats), "failed to get stats");
@@ -267,7 +292,7 @@ TEST_F(TxnStatusTableITest, TestProtectAccess) {
   // table.
   NO_FATALS(SetSuperuserAndUser("bob", "nobody"));
   // Create a client as 'bob', who is not the service user.
-  client::sp::shared_ptr<KuduClient> bob_client;
+  shared_ptr<KuduClient> bob_client;
   ASSERT_OK(CreateClientAs("bob", &bob_client));
   results = attempt_accesses_and_delete(bob_client.get());
   for (const auto& r : results) {
@@ -322,5 +347,210 @@ TEST_F(TxnStatusTableITest, TestTxnStatusTableColocatedWithTables) {
   }));
 }
 
+TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
+  ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
+  ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+
+  // If we write out of range, we should see an error.
+  Status s = txn_sys_client_->BeginTransaction(100, kUser);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+
+  // Once we add a new range, we should be able to leverage it.
+  ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(100, 200));
+  ASSERT_OK(txn_sys_client_->BeginTransaction(100, kUser));
+}
+
+TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
+  ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
+  ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+
+  // When the only server is down, the system client should keep trying until
+  // it times out.
+  cluster_->mini_tablet_server(0)->Shutdown();
+  Status s = txn_sys_client_->BeginTransaction(1, kUser, MonoDelta::FromMilliseconds(100));
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+  // Now try with a longer timeout and ensure that if the server comes back up,
+  // the system client will succeed.
+  thread t([&] {
+    // Wait a bit to give some time for the system client to make its request
+    // and retry some.
+    SleepFor(MonoDelta::FromMilliseconds(500));
+    ASSERT_OK(cluster_->mini_tablet_server(0)->Restart());
+  });
+  SCOPED_CLEANUP({
+    t.join();
+  });
+  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, MonoDelta::FromSeconds(3)));
+}
+
+TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) {
+  ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
+  ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+
+  // Trying to start another transaction with a used ID should yield an error.
+  Status s = txn_sys_client_->BeginTransaction(1, kUser);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
+
+  // The same should be true with a different user.
+  s = txn_sys_client_->BeginTransaction(1, "stranger");
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
+}
+
+TEST_F(TxnStatusTableITest, TestSystemClientRegisterParticipantErrors) {
+  ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
+  ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+  Status s = txn_sys_client_->RegisterParticipant(1, "participant", kUser);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_STR_MATCHES(s.ToString(), "transaction ID.*not found, current highest txn ID:.*");
+
+  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+  ASSERT_OK(txn_sys_client_->RegisterParticipant(1, ParticipantId(1), kUser));
+
+  // If a user other than the transaction owner is passed as an argument, the
+  // request should be rejected.
+  s = txn_sys_client_->RegisterParticipant(1, ParticipantId(2), "stranger");
+  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+}
+
+// Test that a transaction system client can make concurrent calls to multiple
+// transaction status tablets.
+TEST_F(TxnStatusTableITest, TestSystemClientConcurrentCalls) {
+  int kPartitionWidth = 10;
+  ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(kPartitionWidth));
+  ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+  ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(kPartitionWidth, 2 * kPartitionWidth));
+  vector<thread> threads;
+  Status statuses[2 * kPartitionWidth];
+  for (int t = 0; t < 2; t++) {
+    threads.emplace_back([&, t] {
+      // NOTE: we can "race" transaction IDs so they're not monotonically
+      // increasing, as long as within each range partition they are
+      // monotonically increasing.
+      for (int i = 0; i < kPartitionWidth; i++) {
+        int64_t txn_id = t * kPartitionWidth + i;
+        Status s = txn_sys_client_->BeginTransaction(txn_id, kUser).AndThen([&] {
+          return txn_sys_client_->RegisterParticipant(txn_id, ParticipantId(1), kUser);
+        });
+        if (PREDICT_FALSE(!s.ok())) {
+          statuses[txn_id] = s;
+        }
+      }
+    });
+  }
+  std::for_each(threads.begin(), threads.end(), [&] (thread& t) { t.join(); });
+  for (const auto& s : statuses) {
+    EXPECT_OK(s);
+  }
+}
+
+class MultiServerTxnStatusTableITest : public TxnStatusTableITest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    InternalMiniClusterOptions opts;
+    opts.num_tablet_servers = 4;
+    cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
+    ASSERT_OK(cluster_->Start());
+    vector<string> master_addrs;
+    for (const auto& hp : cluster_->master_rpc_addrs()) {
+      master_addrs.emplace_back(hp.ToString());
+    }
+    ASSERT_OK(TxnSystemClient::Create(master_addrs, &txn_sys_client_));
+
+    // Create the initial transaction status table partitions and start an
+    // initial transaction.
+    ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100, 3));
+    ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+    ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+  }
+
+  // Returns the tablet ID found in the cluster, expecting a single tablet.
+  string GetTabletId() const {
+    string tablet_id;
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      const auto& tablets = cluster_->mini_tablet_server(i)->ListTablets();
+      if (!tablets.empty()) {
+        if (!tablet_id.empty()) {
+          DCHECK_EQ(tablet_id, tablets[0]);
+          continue;
+        }
+        tablet_id = tablets[0];
+      }
+    }
+    DCHECK(!tablet_id.empty());
+    return tablet_id;
+  }
+
+  // Returns the UUID of the given tablet's leader replica.
+  Status FindLeaderId(const string& tablet_id, string* uuid) {
+    TabletServerMap ts_map;
+    RETURN_NOT_OK(CreateTabletServerMap(cluster_->master_proxy(), cluster_->messenger(), &ts_map));
+    ValueDeleter deleter(&ts_map);
+    TServerDetails* leader_ts;
+    RETURN_NOT_OK(FindTabletLeader(ts_map, tablet_id, MonoDelta::FromSeconds(10), &leader_ts));
+    *uuid = leader_ts->uuid();
+    return Status::OK();
+  }
+};
+
+TEST_F(MultiServerTxnStatusTableITest, TestSystemClientDeletedTablet) {
+  // Find the leader and force it to step down. The system client should be
+  // able to find the new leader.
+  const string& tablet_id = GetTabletId();
+  ASSERT_FALSE(tablet_id.empty());
+  string orig_leader_uuid;
+  ASSERT_OK(FindLeaderId(tablet_id, &orig_leader_uuid));
+  ASSERT_FALSE(orig_leader_uuid.empty());
+  ASSERT_OK(cluster_->mini_tablet_server_by_uuid(
+      orig_leader_uuid)->server()->tablet_manager()->DeleteTablet(
+          tablet_id, tablet::TABLET_DATA_TOMBSTONED, /*cas_config_index*/boost::none));
+
+  // The client should automatically try to get a new location for the tablet.
+  ASSERT_OK(txn_sys_client_->BeginTransaction(2, kUser));
+}
+
+TEST_F(MultiServerTxnStatusTableITest, TestSystemClientLeadershipChange) {
+  // Find the leader and force it to step down. The system client should be
+  // able to find the new leader.
+  const string& tablet_id = GetTabletId();
+  ASSERT_FALSE(tablet_id.empty());
+  string orig_leader_uuid;
+  ASSERT_OK(FindLeaderId(tablet_id, &orig_leader_uuid));
+  ASSERT_FALSE(orig_leader_uuid.empty());
+  cluster_->mini_tablet_server_by_uuid(
+      orig_leader_uuid)->server()->mutable_quiescing()->store(true);
+  ASSERT_EVENTUALLY([&] {
+    string new_leader_uuid;
+    ASSERT_OK(FindLeaderId(tablet_id, &new_leader_uuid));
+    ASSERT_NE(new_leader_uuid, orig_leader_uuid);
+  });
+  ASSERT_OK(txn_sys_client_->BeginTransaction(2, kUser));
+}
+
+TEST_F(MultiServerTxnStatusTableITest, TestSystemClientCrashedNodes) {
+  // Find the leader and shut it down. The system client should be able to
+  // find the new leader.
+  const auto& tablet_id = GetTabletId();
+  ASSERT_FALSE(tablet_id.empty());
+  string leader_uuid;
+  ASSERT_OK(FindLeaderId(tablet_id, &leader_uuid));
+  ASSERT_FALSE(leader_uuid.empty());
+  FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
+  cluster_->mini_tablet_server_by_uuid(leader_uuid)->Shutdown();
+  // We have to wait for a leader to be elected. Until that happens, the system
+  // client may try to start transactions on followers, and in doing so use up
+  // transaction IDs. Have the system client try again with a higher
+  // transaction ID until a leader is elected.
+  int txn_id = 2;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(txn_sys_client_->BeginTransaction(++txn_id, kUser));
+  });
+}
+
 } // namespace itest
 } // namespace kudu
diff --git a/src/kudu/tablet/ops/op.h b/src/kudu/tablet/ops/op.h
index 995104a..258dd05 100644
--- a/src/kudu/tablet/ops/op.h
+++ b/src/kudu/tablet/ops/op.h
@@ -365,6 +365,7 @@ class LatchOpCompletionCallback : public OpCompletionCallback {
   virtual void OpCompleted() OVERRIDE {
     if (!status_.ok()) {
       StatusToPB(status_, response_->mutable_error()->mutable_status());
+      response_->mutable_error()->set_code(code_);
     }
     latch_->CountDown();
   }
diff --git a/src/kudu/tablet/txn_coordinator.h b/src/kudu/tablet/txn_coordinator.h
index 861d941..dd84415 100644
--- a/src/kudu/tablet/txn_coordinator.h
+++ b/src/kudu/tablet/txn_coordinator.h
@@ -24,6 +24,10 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+namespace tserver {
+class TabletServerErrorPB;
+} // namespace tserver
+
 namespace tablet {
 
 class TabletReplica;
@@ -40,22 +44,48 @@ class TxnCoordinator {
   virtual Status LoadFromTablet() = 0;
 
   // Starts a transaction with the given ID as the given user.
-  virtual Status BeginTransaction(int64_t txn_id, const std::string& user) = 0;
+  //
+  // Returns any replication-layer errors (e.g. not-the-leader errors) in
+  // 'ts_error'. If there was otherwise a logical error with the request (e.g.
+  // transaction already exists), returns an error without populating
+  // 'ts_error'.
+  virtual Status BeginTransaction(int64_t txn_id, const std::string& user,
+                                  tserver::TabletServerErrorPB* ts_error) = 0;
 
   // Begins committing the given transaction as the given user.
-  virtual Status BeginCommitTransaction(int64_t txn_id, const std::string& user) = 0;
+  //
+  // Returns any replication-layer errors (e.g. not-the-leader errors) in
+  // 'ts_error'. If there was otherwise a logical error with the request (e.g.
+  // no such transaction), returns an error without populating 'ts_error'.
+  virtual Status BeginCommitTransaction(int64_t txn_id, const std::string& user,
+                                        tserver::TabletServerErrorPB* ts_error) = 0;
 
   // Finalizes the commit of the transaction.
+  //
+  // Returns any replication-layer errors (e.g. not-the-leader errors) in
+  // 'ts_error'. If there was otherwise a logical error with the request (e.g.
+  // no such transaction), returns an error without populating 'ts_error'.
+  //
   // TODO(awong): add a commit timestamp.
   virtual Status FinalizeCommitTransaction(int64_t txn_id) = 0;
 
   // Aborts the given transaction as the given user.
-  virtual Status AbortTransaction(int64_t txn_id, const std::string& user) = 0;
+  //
+  // Returns any replication-layer errors (e.g. not-the-leader errors) in
+  // 'ts_error'. If there was otherwise a logical error with the request (e.g.
+  // no such transaction), returns an error without populating 'ts_error'.
+  virtual Status AbortTransaction(int64_t txn_id, const std::string& user,
+                                  tserver::TabletServerErrorPB* ts_error) = 0;
 
   // Registers a participant tablet ID to the given transaction ID as the given
   // user.
+  //
+  // Returns any replication-layer errors (e.g. not-the-leader errors) in
+  // 'ts_error'. If there was otherwise a logical error with the request (e.g.
+  // no such transaction), returns an error without populating 'ts_error'.
   virtual Status RegisterParticipant(int64_t txn_id, const std::string& tablet_id,
-                                     const std::string& user) = 0;
+                                     const std::string& user,
+                                     tserver::TabletServerErrorPB* ts_error) = 0;
 
   // Populates a map from transaction ID to the list of participants associated
   // with that transaction ID.
diff --git a/src/kudu/transactions/CMakeLists.txt b/src/kudu/transactions/CMakeLists.txt
index c86c25b..0d679cb 100644
--- a/src/kudu/transactions/CMakeLists.txt
+++ b/src/kudu/transactions/CMakeLists.txt
@@ -30,6 +30,7 @@ target_link_libraries(transactions_proto
 )
 
 set(TRANSACTIONS_SRCS
+  coordinator_rpc.cc
   txn_status_entry.cc
   txn_status_manager.cc
   txn_status_tablet.cc
diff --git a/src/kudu/transactions/coordinator_rpc.cc b/src/kudu/transactions/coordinator_rpc.cc
new file mode 100644
index 0000000..e0870c8
--- /dev/null
+++ b/src/kudu/transactions/coordinator_rpc.cc
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/transactions/coordinator_rpc.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/client/client-internal.h"
+#include "kudu/client/client.h"
+#include "kudu/client/meta_cache.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/retriable_rpc.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/tserver/tserver_admin.proxy.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
+
+using kudu::client::internal::MetaCacheServerPicker;
+using kudu::client::internal::RemoteTabletServer;
+using kudu::client::KuduClient;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::CredentialsPolicy;
+using kudu::rpc::ErrorStatusPB;
+using kudu::rpc::ResponseCallback;
+using kudu::rpc::RetriableRpc;
+using kudu::rpc::RetriableRpcStatus;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+class MonoTime;
+namespace transactions {
+
+CoordinatorRpc* CoordinatorRpc::NewRpc(unique_ptr<TxnStatusTabletContext> ctx,
+                                       const MonoTime& deadline,
+                                       StatusCallback cb) {
+  KuduClient* client = ctx->table->client();
+  scoped_refptr<MetaCacheServerPicker> server_picker(
+      new MetaCacheServerPicker(client,
+                                client->data_->meta_cache_,
+                                ctx->table.get(),
+                                ctx->tablet.get()));
+  CoordinatorRpc* rpc = new CoordinatorRpc(std::move(ctx),
+                                           server_picker,
+                                           deadline,
+                                           std::move(cb));
+  return rpc;
+}
+
+string CoordinatorRpc::ToString() const {
+  return Substitute("CoordinateTransaction($0)", SecureShortDebugString(req_));
+}
+
+void CoordinatorRpc::Finish(const Status& status) {
+  // Free memory upon completion.
+  unique_ptr<CoordinatorRpc> this_instance(this);
+  Status final_status = status;
+  if (final_status.ok() &&
+      resp_.has_op_result() && resp_.op_result().has_op_error()) {
+    final_status = StatusFromPB(resp_.op_result().op_error());
+  }
+  cb_(final_status);
+}
+
+bool CoordinatorRpc::GetNewAuthnTokenAndRetry() {
+  resp_.Clear();
+  client_->data_->ConnectToClusterAsync(client_, retrier().deadline(),
+      [this] (const Status& s) { this->GotNewAuthnTokenRetryCb(s); },
+      CredentialsPolicy::PRIMARY_CREDENTIALS);
+  return true;
+}
+
+CoordinatorRpc::CoordinatorRpc(unique_ptr<TxnStatusTabletContext> ctx,
+                               const scoped_refptr<MetaCacheServerPicker>& replica_picker,
+                               const MonoTime& deadline,
+                               StatusCallback cb)
+    : RetriableRpc(replica_picker,
+                   DCHECK_NOTNULL(ctx->table)->client()->data_->request_tracker_,
+                   deadline,
+                   DCHECK_NOTNULL(ctx->table)->client()->data_->messenger_),
+      client_(ctx->table->client()),
+      table_(std::move(ctx->table)),
+      tablet_(std::move(ctx->tablet)),
+      cb_(std::move(cb)) {
+  req_.set_txn_status_tablet_id(tablet_->tablet_id());
+  *req_.mutable_op() = std::move(ctx->coordinate_txn_op);
+}
+
+void CoordinatorRpc::Try(RemoteTabletServer* replica, const ResponseCallback& callback) {
+  replica->admin_proxy()->CoordinateTransactionAsync(
+      req_, &resp_, mutable_retrier()->mutable_controller(),
+      callback);
+}
+
+// TODO(awong): much of this is borrowed from WriteRpc::AnalyzeResponse(). It'd
+// be nice to share some code.
+RetriableRpcStatus CoordinatorRpc::AnalyzeResponse(const Status& rpc_cb_status) {
+  // We only analyze OK statuses if we succeeded to do the tablet lookup. In
+  // either case, let's examine whatever errors exist.
+  RetriableRpcStatus result;
+  result.status = rpc_cb_status.ok() ? retrier().controller().status() : rpc_cb_status;
+
+  // Check for specific RPC errors.
+  if (result.status.IsRemoteError()) {
+    const ErrorStatusPB* err = mutable_retrier()->controller().error_response();
+    if (err && err->has_code()) {
+      switch (err->code()) {
+        case ErrorStatusPB::ERROR_SERVER_TOO_BUSY:
+        case ErrorStatusPB::ERROR_UNAVAILABLE:
+          result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
+          return result;
+        default:
+          break;
+      }
+    }
+  }
+
+  // TODO(awong): it might be easier to understand if the resulting expected
+  // action were encoded in these status enums, e.g. RETRY_SAME_SERVER.
+  if (result.status.IsServiceUnavailable()) {
+    result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
+    return result;
+  }
+
+  // Check whether we need to get a new authentication token.
+  if (result.status.IsNotAuthorized()) {
+    const ErrorStatusPB* err = mutable_retrier()->controller().error_response();
+    if (err && err->has_code() &&
+        err->code() == ErrorStatusPB::FATAL_INVALID_AUTHENTICATION_TOKEN) {
+      result.result = RetriableRpcStatus::INVALID_AUTHENTICATION_TOKEN;
+      return result;
+    }
+  }
+
+  // If we couldn't connect to the server, e.g. it was down, failover to a
+  // different replica.
+  if (result.status.IsNetworkError()) {
+    result.result = RetriableRpcStatus::SERVER_NOT_ACCESSIBLE;
+    return result;
+  }
+
+  // We're done parsing the RPC controller errors. Unwrap the tserver response
+  // errors -- from here on out, the result status will be the response error.
+  if (result.status.ok() && resp_.has_error()) {
+    result.status = StatusFromPB(resp_.error().status());
+  }
+
+  // If we get TABLET_NOT_FOUND, the replica we thought was leader has been
+  // deleted.
+  if (resp_.has_error() &&
+      resp_.error().code() == tserver::TabletServerErrorPB::TABLET_NOT_FOUND) {
+    result.result = RetriableRpcStatus::RESOURCE_NOT_FOUND;
+    return result;
+  }
+
+  if (result.status.IsIllegalState() || result.status.IsAborted()) {
+    result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
+    return result;
+  }
+
+  // Handle the connection negotiation failure case if overall RPC's timeout
+  // hasn't expired yet: if the connection negotiation returned non-OK status,
+  // mark the server as not accessible and rely on the RetriableRpc's logic
+  // to switch to an alternative tablet replica.
+  //
+  // NOTE: Connection negotiation errors related to security are handled in the
+  //       code above: see the handlers for IsNotAuthorized(), IsRemoteError().
+  if (!rpc_cb_status.IsTimedOut() && !result.status.ok() &&
+      mutable_retrier()->controller().negotiation_failed()) {
+    result.result = RetriableRpcStatus::SERVER_NOT_ACCESSIBLE;
+    return result;
+  }
+
+  if (result.status.ok()) {
+    result.result = RetriableRpcStatus::OK;
+  } else {
+    result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
+  }
+  return result;
+}
+
+} // namespace transactions
+} // namespace kudu
diff --git a/src/kudu/transactions/coordinator_rpc.h b/src/kudu/transactions/coordinator_rpc.h
new file mode 100644
index 0000000..00c56a6
--- /dev/null
+++ b/src/kudu/transactions/coordinator_rpc.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "kudu/client/meta_cache.h"
+#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/retriable_rpc.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/status_callback.h"
+
+namespace kudu {
+class MonoTime;
+class Status;
+
+namespace client {
+class KuduClient;
+class KuduTable;
+} // namespace client
+
+namespace transactions {
+
+// Context to be used when sending RPCs to specific tablets.
+struct TxnStatusTabletContext {
+  client::sp::shared_ptr<client::KuduTable> table;
+  tserver::CoordinatorOpPB coordinate_txn_op;
+
+  // NOTE: this gets set after the tablet lookup completes.
+  scoped_refptr<client::internal::RemoteTablet> tablet;
+};
+
+// Encapsulates an RPC being sent to a partition of the transaction status
+// table.
+class CoordinatorRpc final : public rpc::RetriableRpc<client::internal::RemoteTabletServer,
+                                                      tserver::CoordinateTransactionRequestPB,
+                                                      tserver::CoordinateTransactionResponsePB> {
+ public:
+  static CoordinatorRpc* NewRpc(std::unique_ptr<TxnStatusTabletContext> ctx,
+                                const MonoTime& deadline,
+                                StatusCallback cb);
+
+  ~CoordinatorRpc() {}
+
+  std::string ToString() const override;
+
+ protected:
+  void Try(client::internal::RemoteTabletServer* replica,
+           const rpc::ResponseCallback& callback) override;
+
+  rpc::RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status) override;
+
+  // Deletes itself upon completion.
+  void Finish(const Status& status) override;
+
+  bool GetNewAuthnTokenAndRetry() override;
+
+ private:
+  CoordinatorRpc(std::unique_ptr<TxnStatusTabletContext> ctx,
+                 const scoped_refptr<client::internal::MetaCacheServerPicker>& replica_picker,
+                 const MonoTime& deadline,
+                 StatusCallback cb);
+
+  client::KuduClient* client_;
+  client::sp::shared_ptr<client::KuduTable> table_;
+  scoped_refptr<client::internal::RemoteTablet> tablet_;
+  const StatusCallback cb_;
+};
+
+} // namespace transactions
+} // namespace kudu
diff --git a/src/kudu/transactions/txn_status_manager-test.cc b/src/kudu/transactions/txn_status_manager-test.cc
index 9e1e8c7..e4dba88 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -40,6 +40,7 @@
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet_replica-test-base.h"
 #include "kudu/tablet/txn_coordinator.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/transactions/transactions.pb.h"
 #include "kudu/transactions/txn_status_tablet.h"
 #include "kudu/util/barrier.h"
@@ -55,6 +56,7 @@
 using kudu::consensus::ConsensusBootstrapInfo;
 using kudu::tablet::ParticipantIdsByTxnId;
 using kudu::tablet::TabletReplicaTestBase;
+using kudu::tserver::TabletServerErrorPB;
 using std::string;
 using std::thread;
 using std::unique_ptr;
@@ -101,27 +103,28 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
 
   ASSERT_TRUE(txn_manager_->GetParticipantsByTxnIdForTests().empty());
 
+  TabletServerErrorPB ts_error;
   for (const auto& txn_id_and_prts : expected_prts_by_txn_id) {
     const auto& txn_id = txn_id_and_prts.first;
-    ASSERT_OK(txn_manager_->BeginTransaction(txn_id, kOwner));
+    ASSERT_OK(txn_manager_->BeginTransaction(txn_id, kOwner, &ts_error));
     for (const auto& prt : txn_id_and_prts.second) {
-      ASSERT_OK(txn_manager_->RegisterParticipant(txn_id, prt, kOwner));
+      ASSERT_OK(txn_manager_->RegisterParticipant(txn_id, prt, kOwner, &ts_error));
     }
   }
   // Registering a participant that's already open is harmless, presuming the
   // participant is still open.
-  ASSERT_OK(txn_manager_->RegisterParticipant(3, kParticipant1, kOwner));
+  ASSERT_OK(txn_manager_->RegisterParticipant(3, kParticipant1, kOwner, &ts_error));
 
   // Starting a transaction that's already been started should result in an
   // error, even if it's not currently in flight.
-  Status s = txn_manager_->BeginTransaction(1, kOwner);
+  Status s = txn_manager_->BeginTransaction(1, kOwner, &ts_error);
   ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
-  s = txn_manager_->BeginTransaction(2, kOwner);
+  s = txn_manager_->BeginTransaction(2, kOwner, &ts_error);
   ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
 
   // Registering participants to transactions that don't exist should also
   // result in errors.
-  s = txn_manager_->RegisterParticipant(2, kParticipant1, kOwner);
+  s = txn_manager_->RegisterParticipant(2, kParticipant1, kOwner, &ts_error);
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
 
   // The underlying participants map should only reflect the successful
@@ -183,7 +186,8 @@ TEST_F(TxnStatusManagerTest, TestStartTransactionsConcurrently) {
         // time.
         barriers[b]->Wait();
         auto txn_id = txns_to_insert[b][i];
-        Status s = txn_manager_->BeginTransaction(txn_id, kOwner);
+        TabletServerErrorPB ts_error;
+        Status s = txn_manager_->BeginTransaction(txn_id, kOwner, &ts_error);
         if (s.ok()) {
           std::lock_guard<simple_spinlock> l(lock);
           successful_txn_ids.emplace_back(txn_id);
@@ -220,7 +224,8 @@ TEST_F(TxnStatusManagerTest, TestRegisterParticipantsConcurrently) {
   CountDownLatch begun_txn(1);
   threads.reserve(1 + kParticipantsInParallel);
   threads.emplace_back([&] {
-    CHECK_OK(txn_manager_->BeginTransaction(kTxnId, kOwner));
+    TabletServerErrorPB ts_error;
+    CHECK_OK(txn_manager_->BeginTransaction(kTxnId, kOwner, &ts_error));
     begun_txn.CountDown();
   });
 
@@ -234,7 +239,8 @@ TEST_F(TxnStatusManagerTest, TestRegisterParticipantsConcurrently) {
         begun_txn.Wait();
       }
       string prt = ParticipantId(i % kUniqueParticipantIds);
-      Status s = txn_manager_->RegisterParticipant(kTxnId, prt, kOwner);
+      TabletServerErrorPB ts_error;
+      Status s = txn_manager_->RegisterParticipant(kTxnId, prt, kOwner, &ts_error);
       if (s.ok()) {
         std::lock_guard<simple_spinlock> l(lock);
         successful_participants.emplace_back(std::move(prt));
@@ -267,7 +273,8 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
   const int kNumTransactions = 10;
   const int kNumUpdatesInParallel = 20;
   for (int i = 0; i < kNumTransactions; i++) {
-    ASSERT_OK(txn_manager_->BeginTransaction(i, kOwner));
+    TabletServerErrorPB ts_error;
+    ASSERT_OK(txn_manager_->BeginTransaction(i, kOwner, &ts_error));
   }
   typedef std::pair<int64_t, TxnStatePB> IdAndUpdate;
   vector<IdAndUpdate> all_updates;
@@ -286,12 +293,13 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
   for (int i = 0; i < kNumUpdatesInParallel; i++) {
     threads.emplace_back([&, i] {
       const auto& txn_id = updates[i].first;
+      TabletServerErrorPB ts_error;
       switch (updates[i].second) {
         case TxnStatePB::ABORTED:
-          statuses[i] = txn_manager_->AbortTransaction(txn_id, kOwner);
+          statuses[i] = txn_manager_->AbortTransaction(txn_id, kOwner, &ts_error);
           break;
         case TxnStatePB::COMMIT_IN_PROGRESS:
-          statuses[i] = txn_manager_->BeginCommitTransaction(txn_id, kOwner);
+          statuses[i] = txn_manager_->BeginCommitTransaction(txn_id, kOwner, &ts_error);
           break;
         case TxnStatePB::COMMITTED:
           statuses[i] = txn_manager_->FinalizeCommitTransaction(txn_id);
@@ -345,22 +353,23 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
 // Test that performing actions as the wrong user will return errors.
 TEST_F(TxnStatusManagerTest, TestWrongUser) {
   const string kWrongUser = "stranger";
-  ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner));
-  ASSERT_OK(txn_manager_->RegisterParticipant(1, ParticipantId(1), kOwner));
+  TabletServerErrorPB ts_error;
+  ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->RegisterParticipant(1, ParticipantId(1), kOwner, &ts_error));
 
   // First, any other call to begin the transaction should be rejected,
   // regardless of user.
-  Status s = txn_manager_->BeginTransaction(1, kWrongUser);
+  Status s = txn_manager_->BeginTransaction(1, kWrongUser, &ts_error);
   ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
 
   // All actions should be rejected if performed by the wrong user.
-  s = txn_manager_->RegisterParticipant(1, ParticipantId(1), kWrongUser);
+  s = txn_manager_->RegisterParticipant(1, ParticipantId(1), kWrongUser, &ts_error);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-  s = txn_manager_->RegisterParticipant(1, ParticipantId(2), kWrongUser);
+  s = txn_manager_->RegisterParticipant(1, ParticipantId(2), kWrongUser, &ts_error);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-  s = txn_manager_->BeginCommitTransaction(1, kWrongUser);
+  s = txn_manager_->BeginCommitTransaction(1, kWrongUser, &ts_error);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-  s = txn_manager_->AbortTransaction(1, kWrongUser);
+  s = txn_manager_->AbortTransaction(1, kWrongUser, &ts_error);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
   ParticipantIdsByTxnId prts_by_txn_id = txn_manager_->GetParticipantsByTxnIdForTests();
   ParticipantIdsByTxnId kExpectedPrtsByTxnId = { { 1, { ParticipantId(1) } } };
@@ -371,30 +380,31 @@ TEST_F(TxnStatusManagerTest, TestWrongUser) {
 // appropriate state.
 TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
   const int64_t kTxnId1 = 1;
-  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner));
+  TabletServerErrorPB ts_error;
+  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner, &ts_error));
 
   // Redundant calls are benign.
-  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner));
-  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner));
-  ASSERT_OK(txn_manager_->AbortTransaction(kTxnId1, kOwner));
-  ASSERT_OK(txn_manager_->AbortTransaction(kTxnId1, kOwner));
+  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->AbortTransaction(kTxnId1, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->AbortTransaction(kTxnId1, kOwner, &ts_error));
 
   // We can't begin or finalize a commit if we've aborted.
-  Status s = txn_manager_->BeginCommitTransaction(kTxnId1, kOwner);
+  Status s = txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
   s = txn_manager_->FinalizeCommitTransaction(kTxnId1);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't finalize a commit that hasn't begun committing.
   const int64_t kTxnId2 = 2;
-  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner));
+  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, &ts_error));
   s = txn_manager_->FinalizeCommitTransaction(kTxnId2);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't abort a transaction that has finished committing.
-  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner));
+  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error));
   ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2));
-  s = txn_manager_->AbortTransaction(kTxnId2, kOwner);
+  s = txn_manager_->AbortTransaction(kTxnId2, kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // Redundant finalize calls are also benign.
@@ -402,41 +412,42 @@ TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
 
   // Calls to begin committing should return an error if we've already
   // finalized the commit.
-  s = txn_manager_->BeginCommitTransaction(kTxnId2, kOwner);
+  s = txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 }
 
 // Test that we can only add participants to a transaction when it's in an
 // appropriate state.
 TEST_F(TxnStatusManagerTest, TestRegisterParticipantsWithStates) {
+  TabletServerErrorPB ts_error;
   const int64_t kTxnId1 = 1;
 
   // We can't register a participant to a transaction that hasn't started.
-  Status s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner);
+  Status s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner, &ts_error);
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
 
-  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner));
-  ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner));
+  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner, &ts_error));
 
   // Registering the same participant is idempotent and benign.
-  ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner));
+  ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), kOwner, &ts_error));
 
   // We can't register participants when we've already begun committing.
-  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner));
-  s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner);
+  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error));
+  s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't register participants when we've finished committnig.
   ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId1));
-  s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner);
+  s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't register participants when we've aborted the transaction.
   const int64_t kTxnId2 = 2;
-  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner));
-  ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId2, ParticipantId(1), kOwner));
-  ASSERT_OK(txn_manager_->AbortTransaction(kTxnId2, kOwner));
-  s = txn_manager_->RegisterParticipant(kTxnId2, ParticipantId(2), kOwner);
+  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId2, ParticipantId(1), kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->AbortTransaction(kTxnId2, kOwner, &ts_error));
+  s = txn_manager_->RegisterParticipant(kTxnId2, ParticipantId(2), kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 }
 
diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc
index 737ed47..070298f 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -29,12 +29,14 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/transactions/transactions.pb.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/cow_object.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 
 using kudu::pb_util::SecureShortDebugString;
 using kudu::tablet::ParticipantIdsByTxnId;
+using kudu::tserver::TabletServerErrorPB;
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -108,7 +110,8 @@ Status TxnStatusManager::GetTransaction(int64_t txn_id,
   return Status::OK();
 }
 
-Status TxnStatusManager::BeginTransaction(int64_t txn_id, const string& user) {
+Status TxnStatusManager::BeginTransaction(int64_t txn_id, const string& user,
+                                          TabletServerErrorPB* ts_error) {
   {
     // First, make sure the requested ID is viable.
     std::lock_guard<simple_spinlock> l(lock_);
@@ -117,6 +120,9 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id, const string& user) {
           Substitute("transaction ID $0 is not higher than the highest ID so far: $1",
                      txn_id, highest_txn_id_));
     }
+    // TODO(awong): reduce the "damage" from followers getting requests by
+    // checking for leadership before doing anything. As is, if this replica
+    // isn't the leader, we may aggressively burn through transaction IDs.
     highest_txn_id_ = txn_id;
   }
 
@@ -125,7 +131,7 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id, const string& user) {
   // that at most one call to start a given transaction ID can succeed.
 
   // Write an entry to the status tablet for this transaction.
-  RETURN_NOT_OK(status_tablet_.AddNewTransaction(txn_id, user));
+  RETURN_NOT_OK(status_tablet_.AddNewTransaction(txn_id, user, ts_error));
 
   // Now that we've successfully persisted the new transaction ID, initialize
   // the in-memory state and make it visible to clients.
@@ -141,7 +147,8 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id, const string& user) {
   return Status::OK();
 }
 
-Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string& user) {
+Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string& user,
+                                                TabletServerErrorPB* ts_error) {
   scoped_refptr<TransactionEntry> txn;
   RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
 
@@ -158,7 +165,7 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string& us
   }
   auto* mutable_data = txn_lock.mutable_data();
   mutable_data->pb.set_state(TxnStatePB::COMMIT_IN_PROGRESS);
-  RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb));
+  RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, ts_error));
   txn_lock.Commit();
   return Status::OK();
 }
@@ -180,12 +187,14 @@ Status TxnStatusManager::FinalizeCommitTransaction(int64_t txn_id) {
   }
   auto* mutable_data = txn_lock.mutable_data();
   mutable_data->pb.set_state(TxnStatePB::COMMITTED);
-  RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb));
+  TabletServerErrorPB ts_error;
+  RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, &ts_error));
   txn_lock.Commit();
   return Status::OK();
 }
 
-Status TxnStatusManager::AbortTransaction(int64_t txn_id, const std::string& user) {
+Status TxnStatusManager::AbortTransaction(int64_t txn_id, const std::string& user,
+                                          TabletServerErrorPB* ts_error) {
   scoped_refptr<TransactionEntry> txn;
   RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
 
@@ -203,13 +212,13 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id, const std::string& use
   }
   auto* mutable_data = txn_lock.mutable_data();
   mutable_data->pb.set_state(TxnStatePB::ABORTED);
-  RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb));
+  RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, ts_error));
   txn_lock.Commit();
   return Status::OK();
 }
 
 Status TxnStatusManager::RegisterParticipant(int64_t txn_id, const string& tablet_id,
-                                             const string& user) {
+                                             const string& user, TabletServerErrorPB* ts_error) {
   scoped_refptr<TransactionEntry> txn;
   RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
 
@@ -239,7 +248,7 @@ Status TxnStatusManager::RegisterParticipant(int64_t txn_id, const string& table
   prt_lock.mutable_data()->pb.set_state(TxnStatePB::OPEN);
 
   // Write the new participant entry.
-  RETURN_NOT_OK(status_tablet_.AddNewParticipant(txn_id, tablet_id));
+  RETURN_NOT_OK(status_tablet_.AddNewParticipant(txn_id, tablet_id, ts_error));
 
   // Now that we've persisted the new participant to disk, update the in-memory
   // state to denote the participant is open.
diff --git a/src/kudu/transactions/txn_status_manager.h b/src/kudu/transactions/txn_status_manager.h
index e5d8e7b..4036bb2 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -38,6 +38,10 @@ namespace tablet {
 class TabletReplica;
 } // namespace tablet
 
+namespace tserver {
+class TabletServerErrorPB;
+} // namespace tserver
+
 namespace transactions {
 
 class TxnStatusEntryPB;
@@ -83,11 +87,13 @@ class TxnStatusManager : public tablet::TxnCoordinator {
   // TODO(awong): consider computing the next available transaction ID in this
   // partition and using it in case this transaction is already used, or having
   // callers forward a request for the next-highest transaction ID.
-  Status BeginTransaction(int64_t txn_id, const std::string& user) override;
+  Status BeginTransaction(int64_t txn_id, const std::string& user,
+                          tserver::TabletServerErrorPB* ts_error) override;
 
   // Begins committing the given transaction, returning an error if the
   // transaction doesn't exist, isn't open, or isn't owned by the given user.
-  Status BeginCommitTransaction(int64_t txn_id, const std::string& user) override;
+  Status BeginCommitTransaction(int64_t txn_id, const std::string& user,
+                                tserver::TabletServerErrorPB* ts_error) override;
 
   // Finalizes the commit of the transaction, returning an error if the
   // transaction isn't in an appropraite state.
@@ -101,7 +107,8 @@ class TxnStatusManager : public tablet::TxnCoordinator {
   // Aborts the given transaction, returning an error if the transaction
   // doesn't exist, is committed or not yet opened, or isn't owned by the given
   // user.
-  Status AbortTransaction(int64_t txn_id, const std::string& user) override;
+  Status AbortTransaction(int64_t txn_id, const std::string& user,
+                          tserver::TabletServerErrorPB* ts_error) override;
 
   // Creates an in-memory participant, writes an entry to the status table, and
   // attaches the in-memory participant to the transaction.
@@ -109,7 +116,8 @@ class TxnStatusManager : public tablet::TxnCoordinator {
   // If the transaction is open, it is ensured to be active for the duration of
   // this call. Returns an error if the given transaction isn't open.
   Status RegisterParticipant(int64_t txn_id, const std::string& tablet_id,
-                             const std::string& user) override;
+                             const std::string& user,
+                             tserver::TabletServerErrorPB* ts_error) override;
 
   // Populates a map from transaction ID to the sorted list of participants
   // associated with that transaction ID.
diff --git a/src/kudu/transactions/txn_status_tablet-test.cc b/src/kudu/transactions/txn_status_tablet-test.cc
index e0378b8..79498ac 100644
--- a/src/kudu/transactions/txn_status_tablet-test.cc
+++ b/src/kudu/transactions/txn_status_tablet-test.cc
@@ -36,6 +36,7 @@
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet_replica-test-base.h"
 #include "kudu/transactions/transactions.pb.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -43,6 +44,7 @@
 using kudu::consensus::ConsensusBootstrapInfo;
 using kudu::pb_util::SecureShortDebugString;
 using kudu::tablet::TabletReplicaTestBase;
+using kudu::tserver::TabletServerErrorPB;
 using std::ostream;
 using std::string;
 using std::thread;
@@ -135,23 +137,24 @@ class TxnStatusTabletTest : public TabletReplicaTestBase {
 };
 
 TEST_F(TxnStatusTabletTest, TestWriteTransactions) {
+  TabletServerErrorPB ts_error;
   // We can make multiple calls to add a single transaction. This will only
   // insert a single row to the table.
-  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner));
-  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner));
+  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner, &ts_error));
+  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner, &ts_error));
 
   // The storage abstraction doesn't prevent us from writing a new transaction
   // entry for a lower transaction ID.
-  ASSERT_OK(status_tablet_->AddNewTransaction(5, kOwner));
-  ASSERT_OK(status_tablet_->AddNewTransaction(2, kOwner));
+  ASSERT_OK(status_tablet_->AddNewTransaction(5, kOwner, &ts_error));
+  ASSERT_OK(status_tablet_->AddNewTransaction(2, kOwner, &ts_error));
 
   // Also try updating the status of one of our transaction entries.
   TxnStatusEntryPB status_entry_pb;
   status_entry_pb.set_user(kOwner);
   status_entry_pb.set_state(TxnStatePB::ABORTED);
-  ASSERT_OK(status_tablet_->UpdateTransaction(2, status_entry_pb));
+  ASSERT_OK(status_tablet_->UpdateTransaction(2, status_entry_pb, &ts_error));
   status_entry_pb.set_state(TxnStatePB::COMMITTED);
-  ASSERT_OK(status_tablet_->UpdateTransaction(2, status_entry_pb));
+  ASSERT_OK(status_tablet_->UpdateTransaction(2, status_entry_pb, &ts_error));
 
   // The stored entries should be sorted, de-duplicated, and have the latest
   // values.
@@ -169,22 +172,23 @@ TEST_F(TxnStatusTabletTest, TestWriteTransactions) {
 }
 
 TEST_F(TxnStatusTabletTest, TestWriteParticipants) {
-  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner));
+  TabletServerErrorPB ts_error;
+  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner, &ts_error));
 
   // Participants will be de-duplicated.
-  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1)));
-  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1)));
+  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1), &ts_error));
+  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1), &ts_error));
 
   // There aren't ordering constraints for registering participant IDs.
-  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(5)));
-  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(2)));
+  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(5), &ts_error));
+  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(2), &ts_error));
 
   // Try updating the status of one of our participant entries.
   TxnParticipantEntryPB prt_entry_pb;
   prt_entry_pb.set_state(TxnStatePB::ABORTED);
-  ASSERT_OK(status_tablet_->UpdateParticipant(1, ParticipantId(2), prt_entry_pb));
+  ASSERT_OK(status_tablet_->UpdateParticipant(1, ParticipantId(2), prt_entry_pb, &ts_error));
   prt_entry_pb.set_state(TxnStatePB::COMMITTED);
-  ASSERT_OK(status_tablet_->UpdateParticipant(1, ParticipantId(2), prt_entry_pb));
+  ASSERT_OK(status_tablet_->UpdateParticipant(1, ParticipantId(2), prt_entry_pb, &ts_error));
 
   const vector<SimpleEntry> kExpectedEntries({
       SimpleEntry::Create(1, kOwner, TxnStatePB::OPEN, {
@@ -202,18 +206,19 @@ TEST_F(TxnStatusTabletTest, TestWriteParticipants) {
 // Test that a participant entry can't be visited without a corresponding
 // status entry.
 TEST_F(TxnStatusTabletTest, TestFailedVisitor) {
-  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1)));
+  TabletServerErrorPB ts_error;
+  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1), &ts_error));
   SimpleTransactionsVisitor visitor;
   Status s = status_tablet_->VisitTransactions(&visitor);
   ASSERT_TRUE(s.IsCorruption()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "missing transaction status entry");
 
   // Now try again but with the transaction ID written.
-  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner));
+  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner, &ts_error));
   ASSERT_OK(status_tablet_->VisitTransactions(&visitor));
 
   // And again with a new transaction ID.
-  ASSERT_OK(status_tablet_->AddNewParticipant(2, ParticipantId(2)));
+  ASSERT_OK(status_tablet_->AddNewParticipant(2, ParticipantId(2), &ts_error));
   s = status_tablet_->VisitTransactions(&visitor);
   ASSERT_TRUE(s.IsCorruption()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "missing transaction status entry");
@@ -238,9 +243,10 @@ TEST_F(TxnStatusTabletTest, TestMultithreadedAccess) {
   // storing any errors we see.
   for (int i = 0; i < kNumThreads; i++) {
     threads.emplace_back([&, i] {
-      RET_IF_NOT_OK(status_tablet_->AddNewTransaction(i, kOwner));
+      TabletServerErrorPB ts_error;
+      RET_IF_NOT_OK(status_tablet_->AddNewTransaction(i, kOwner, &ts_error));
       for (int p = 0; p < kNumParticipantsPerTransaction; p++) {
-        RET_IF_NOT_OK(status_tablet_->AddNewParticipant(i, Substitute("prt-$0", p)));
+        RET_IF_NOT_OK(status_tablet_->AddNewParticipant(i, Substitute("prt-$0", p), &ts_error));
       }
     });
   }
diff --git a/src/kudu/transactions/txn_status_tablet.cc b/src/kudu/transactions/txn_status_tablet.cc
index 877a814..cf3c76e 100644
--- a/src/kudu/transactions/txn_status_tablet.cc
+++ b/src/kudu/transactions/txn_status_tablet.cc
@@ -55,6 +55,7 @@
 using kudu::tablet::LatchOpCompletionCallback;
 using kudu::tablet::OpCompletionCallback;
 using kudu::tablet::WriteOpState;
+using kudu::tserver::TabletServerErrorPB;
 using kudu::tserver::WriteRequestPB;
 using kudu::tserver::WriteResponsePB;;
 using std::string;
@@ -295,7 +296,8 @@ Status TxnStatusTablet::VisitTransactions(TransactionsVisitor* visitor) {
   return Status::OK();
 }
 
-Status TxnStatusTablet::AddNewTransaction(int64_t txn_id, const string& user) {
+Status TxnStatusTablet::AddNewTransaction(int64_t txn_id, const string& user,
+                                          TabletServerErrorPB* ts_error) {
   WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id());
 
   TxnStatusEntryPB entry;
@@ -308,10 +310,11 @@ Status TxnStatusTablet::AddNewTransaction(int64_t txn_id, const string& user) {
   RETURN_NOT_OK(PopulateTransactionEntryRow(txn_id, metadata_buf, &row));
   RowOperationsPBEncoder enc(req.mutable_row_operations());
   enc.Add(RowOperationsPB::INSERT_IGNORE, row);
-  return SyncWrite(req);
+  return SyncWrite(req, ts_error);
 }
 
-Status TxnStatusTablet::UpdateTransaction(int64_t txn_id, const TxnStatusEntryPB& pb) {
+Status TxnStatusTablet::UpdateTransaction(int64_t txn_id, const TxnStatusEntryPB& pb,
+                                          TabletServerErrorPB* ts_error) {
   WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id());
 
   faststring metadata_buf;
@@ -321,10 +324,11 @@ Status TxnStatusTablet::UpdateTransaction(int64_t txn_id, const TxnStatusEntryPB
   RETURN_NOT_OK(PopulateTransactionEntryRow(txn_id, metadata_buf, &row));
   RowOperationsPBEncoder enc(req.mutable_row_operations());
   enc.Add(RowOperationsPB::UPDATE, row);
-  return SyncWrite(req);
+  return SyncWrite(req, ts_error);
 }
 
-Status TxnStatusTablet::AddNewParticipant(int64_t txn_id, const string& tablet_id) {
+Status TxnStatusTablet::AddNewParticipant(int64_t txn_id, const string& tablet_id,
+                                          TabletServerErrorPB* ts_error) {
   WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id());
 
   TxnParticipantEntryPB entry;
@@ -336,11 +340,12 @@ Status TxnStatusTablet::AddNewParticipant(int64_t txn_id, const string& tablet_i
   PopulateParticipantEntryRow(txn_id, tablet_id, metadata_buf, &row);
   RowOperationsPBEncoder enc(req.mutable_row_operations());
   enc.Add(RowOperationsPB::INSERT_IGNORE, row);
-  return SyncWrite(req);
+  return SyncWrite(req, ts_error);
 }
 
 Status TxnStatusTablet::UpdateParticipant(int64_t txn_id, const string& tablet_id,
-                                          const TxnParticipantEntryPB& pb) {
+                                          const TxnParticipantEntryPB& pb,
+                                          TabletServerErrorPB* ts_error) {
   WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id());
 
   faststring metadata_buf;
@@ -350,10 +355,10 @@ Status TxnStatusTablet::UpdateParticipant(int64_t txn_id, const string& tablet_i
   RETURN_NOT_OK(PopulateParticipantEntryRow(txn_id, tablet_id, metadata_buf, &row));
   RowOperationsPBEncoder enc(req.mutable_row_operations());
   enc.Add(RowOperationsPB::UPDATE, row);
-  return SyncWrite(req);
+  return SyncWrite(req, ts_error);
 }
 
-Status TxnStatusTablet::SyncWrite(const WriteRequestPB& req) {
+Status TxnStatusTablet::SyncWrite(const WriteRequestPB& req, TabletServerErrorPB* ts_error) {
   DCHECK(req.has_tablet_id());
   DCHECK(req.has_schema());
   CountDownLatch latch(1);
@@ -369,7 +374,9 @@ Status TxnStatusTablet::SyncWrite(const WriteRequestPB& req) {
   RETURN_NOT_OK(tablet_replica_->SubmitWrite(std::move(op_state)));
   latch.Wait();
   if (resp.has_error()) {
-    return StatusFromPB(resp.error().status());
+    DCHECK(ts_error);
+    *ts_error = std::move(resp.error());
+    return StatusFromPB(ts_error->status());
   }
   if (resp.per_row_errors_size() > 0) {
     for (const auto& error : resp.per_row_errors()) {
diff --git a/src/kudu/transactions/txn_status_tablet.h b/src/kudu/transactions/txn_status_tablet.h
index 0d9fb4f..6ead705 100644
--- a/src/kudu/transactions/txn_status_tablet.h
+++ b/src/kudu/transactions/txn_status_tablet.h
@@ -32,6 +32,7 @@ class TabletReplica;
 }  // namespace tablet
 
 namespace tserver {
+class TabletServerErrorPB;
 class WriteRequestPB;
 } // namespace tserver
 
@@ -102,13 +103,19 @@ class TxnStatusTablet {
   // contents of the tablet into a more usable memory representation.
   Status VisitTransactions(TransactionsVisitor* visitor);
 
-  // Writes to the underlying storage. Returns an error if there was an error
-  // writing the new entry.
-  Status AddNewTransaction(int64_t txn_id, const std::string& user);
-  Status UpdateTransaction(int64_t txn_id, const TxnStatusEntryPB& pb);
-  Status AddNewParticipant(int64_t txn_id, const std::string& tablet_id);
+  // Writes to the underlying storage. If there was an error replicating the
+  // request, returns the specific error via 'ts_error' and returns a non-OK
+  // error. If there was otherwise a logical error with the request (e.g. row
+  // already exists), returns an error without populating 'ts_error'.
+  Status AddNewTransaction(int64_t txn_id, const std::string& user,
+                           tserver::TabletServerErrorPB* ts_error);
+  Status UpdateTransaction(int64_t txn_id, const TxnStatusEntryPB& pb,
+                           tserver::TabletServerErrorPB* ts_error);
+  Status AddNewParticipant(int64_t txn_id, const std::string& tablet_id,
+                           tserver::TabletServerErrorPB* ts_error);
   Status UpdateParticipant(int64_t txn_id, const std::string& tablet_id,
-                           const TxnParticipantEntryPB& pb);
+                           const TxnParticipantEntryPB& pb,
+                           tserver::TabletServerErrorPB* ts_error);
 
  private:
   friend class TxnStatusManager;
@@ -116,9 +123,10 @@ class TxnStatusTablet {
     return tablet_replica_;
   }
 
-  // Writes 'req' to the underlying tablet replica, returning an error if there
-  // was a problem replicating the request, or if there were any row errors.
-  Status SyncWrite(const tserver::WriteRequestPB& req);
+  // Writes 'req' to the underlying tablet replica, populating 'ts_error' and
+  // returning non-OK if there was a problem replicating the request, or simply
+  // returning a non-OK error if there were any row errors.
+  Status SyncWrite(const tserver::WriteRequestPB& req, tserver::TabletServerErrorPB* ts_error);
 
   // The tablet replica that backs this transaction status tablet.
   tablet::TabletReplica* tablet_replica_;
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index dbed9f3..fa6bbc6 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -17,24 +17,37 @@
 
 #include "kudu/transactions/txn_system_client.h"
 
+#include <functional>
 #include <memory>
+#include <mutex>
 #include <string>
 
 #include <boost/optional/optional.hpp>
+#include <glog/logging.h>
 
+#include "kudu/client/client-internal.h"
 #include "kudu/client/client.h"
+#include "kudu/client/meta_cache.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/table_creator-internal.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
+#include "kudu/common/partition.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/transactions/coordinator_rpc.h"
 #include "kudu/transactions/txn_status_tablet.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/async_util.h"
 
 using kudu::client::KuduClient;
 using kudu::client::KuduSchema;
 using kudu::client::KuduClientBuilder;
+using kudu::client::KuduTable;
 using kudu::client::KuduTableAlterer;
 using kudu::client::KuduTableCreator;
-using kudu::client::sp::shared_ptr;
+using kudu::client::internal::MetaCache;
+using kudu::tserver::CoordinatorOpPB;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -46,13 +59,14 @@ Status TxnSystemClient::Create(const vector<string>& master_addrs,
                                unique_ptr<TxnSystemClient>* sys_client) {
   KuduClientBuilder builder;
   builder.master_server_addrs(master_addrs);
-  shared_ptr<KuduClient> client;
+  client::sp::shared_ptr<KuduClient> client;
   RETURN_NOT_OK(builder.Build(&client));
   sys_client->reset(new TxnSystemClient(std::move(client)));
   return Status::OK();
 }
 
 Status TxnSystemClient::CreateTxnStatusTableWithClient(int64_t initial_upper_bound,
+                                                       int num_replicas,
                                                        KuduClient* client) {
 
   const auto& schema = TxnStatusTablet::GetSchema();
@@ -77,7 +91,7 @@ Status TxnSystemClient::CreateTxnStatusTableWithClient(int64_t initial_upper_bou
       .set_range_partition_columns({ TxnStatusTablet::kTxnIdColName })
       .add_range_partition(lb.release(), ub.release())
       .table_name(TxnStatusTablet::kTxnStatusTableName)
-      .num_replicas(1)
+      .num_replicas(num_replicas)
       .wait(true)
       .Create();
 }
@@ -97,5 +111,85 @@ Status TxnSystemClient::AddTxnStatusTableRangeWithClient(int64_t lower_bound, in
       ->Alter();
 }
 
+Status TxnSystemClient::OpenTxnStatusTable() {
+  client::sp::shared_ptr<KuduTable> table;
+  RETURN_NOT_OK(client_->OpenTable(TxnStatusTablet::kTxnStatusTableName, &table));
+
+  std::lock_guard<simple_spinlock> l(table_lock_);
+  txn_status_table_ = std::move(table);
+  return Status::OK();
+}
+
+Status TxnSystemClient::BeginTransaction(int64_t txn_id, const string& user, MonoDelta timeout) {
+  CoordinatorOpPB coordinate_txn_op;
+  coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_TXN);
+  coordinate_txn_op.set_txn_id(txn_id);
+  coordinate_txn_op.set_user(user);
+  Synchronizer s;
+  RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op),
+                                           timeout,
+                                           s.AsStatusCallback()));
+  return s.Wait();
+}
+
+Status TxnSystemClient::RegisterParticipant(int64_t txn_id, const string& participant_id,
+                                            const string& user, MonoDelta timeout) {
+  CoordinatorOpPB coordinate_txn_op;
+  coordinate_txn_op.set_type(CoordinatorOpPB::REGISTER_PARTICIPANT);
+  coordinate_txn_op.set_txn_id(txn_id);
+  coordinate_txn_op.set_txn_participant_id(participant_id);
+  coordinate_txn_op.set_user(user);
+  Synchronizer s;
+  RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op),
+                                           timeout,
+                                           s.AsStatusCallback()));
+  return s.Wait();
+}
+
+Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_txn_op,
+                                                   const MonoDelta& timeout,
+                                                   const StatusCallback& cb) {
+  const MonoTime deadline = MonoTime::Now() + timeout;
+  unique_ptr<TxnStatusTabletContext> ctx(
+      new TxnStatusTabletContext({
+          txn_status_table(),
+          std::move(coordinate_txn_op),
+          /*tablet=*/nullptr
+      }));
+
+  string partition_key;
+  KuduPartialRow row(&TxnStatusTablet::GetSchema());
+  DCHECK(ctx->coordinate_txn_op.has_txn_id());
+  RETURN_NOT_OK(row.SetInt64(TxnStatusTablet::kTxnIdColName, ctx->coordinate_txn_op.txn_id()));
+  RETURN_NOT_OK(ctx->table->partition_schema().EncodeKey(row, &partition_key));
+
+  TxnStatusTabletContext* ctx_raw = ctx.release();
+  client_->data_->meta_cache_->LookupTabletByKey(
+      ctx_raw->table.get(),
+      std::move(partition_key),
+      deadline,
+      MetaCache::LookupType::kPoint,
+      &ctx_raw->tablet,
+      // TODO(awong): when we start using C++14, stack-allocate 'ctx' and
+      // move capture it.
+      [cb, deadline, ctx_raw] (const Status& s) {
+        // First, take ownership of the context.
+        unique_ptr<TxnStatusTabletContext> ctx(ctx_raw);
+
+        // If the lookup failed, run the callback with the error.
+        if (PREDICT_FALSE(!s.ok())) {
+          cb(s);
+          return;
+        }
+        // NOTE: the CoordinatorRpc frees its own memory upon completion.
+        CoordinatorRpc* rpc = CoordinatorRpc::NewRpc(
+            std::move(ctx),
+            deadline,
+            cb);
+        rpc->SendRpc();
+      });
+  return Status::OK();
+}
+
 } // namespace transactions
 } // namespace kudu
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index 4737d8b..2477feb 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -18,6 +18,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <utility>
 #include <vector>
@@ -25,11 +26,15 @@
 #include <gtest/gtest_prod.h>
 
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
 
 namespace kudu {
 namespace client {
 class KuduClient;
+class KuduTable;
 } // namespace client
 
 namespace itest {
@@ -37,6 +42,10 @@ class TxnStatusTableITest;
 class TxnStatusTableITest_TestProtectCreateAndAlter_Test;
 } // namespace itest
 
+namespace tserver {
+class CoordinatorOpPB;
+} // namespace tserver
+
 namespace transactions {
 
 // Wrapper around a KuduClient used by Kudu for making transaction-related
@@ -48,8 +57,8 @@ class TxnSystemClient {
 
   // Creates the transaction status table with a single range partition of the
   // given upper bound.
-  Status CreateTxnStatusTable(int64_t initial_upper_bound) {
-    return CreateTxnStatusTableWithClient(initial_upper_bound, client_.get());
+  Status CreateTxnStatusTable(int64_t initial_upper_bound, int num_replicas = 1) {
+    return CreateTxnStatusTableWithClient(initial_upper_bound, num_replicas, client_.get());
   }
 
   // Adds a new range to the transaction status table with the given bounds.
@@ -61,6 +70,27 @@ class TxnSystemClient {
     return AddTxnStatusTableRangeWithClient(lower_bound, upper_bound, client_.get());
   }
 
+  // Attempts to create a transaction with the given 'txn_id'.
+  // Returns an error if the transaction ID has already been taken, or if there
+  // was an error writing to the transaction status table.
+  // TODO(awong): pass a deadline instead of a timeout so we can more easily
+  // associate it with potential user-specified deadlines.
+  Status BeginTransaction(int64_t txn_id, const std::string& user,
+                          MonoDelta timeout = MonoDelta::FromSeconds(10));
+
+  // Attempts to register the given participant with the given transaction.
+  // Returns an error if the transaction hasn't yet been started, or if the
+  // 'user' isn't permitted to modify the transaction.
+  // TODO(awong): pass a deadline instead of a timeout so we can more easily
+  // associate it with potential user-specified deadlines.
+  Status RegisterParticipant(int64_t txn_id, const std::string& participant_id,
+                             const std::string& user,
+                             MonoDelta timeout = MonoDelta::FromSeconds(10));
+
+  // Opens the transaction status table, refreshing metadata with that from the
+  // masters.
+  Status OpenTxnStatusTable();
+
  private:
   friend class itest::TxnStatusTableITest;
   FRIEND_TEST(itest::TxnStatusTableITest, TestProtectCreateAndAlter);
@@ -68,12 +98,24 @@ class TxnSystemClient {
   explicit TxnSystemClient(client::sp::shared_ptr<client::KuduClient> client)
       : client_(std::move(client)) {}
 
-  static Status CreateTxnStatusTableWithClient(int64_t initial_upper_bound,
+  static Status CreateTxnStatusTableWithClient(int64_t initial_upper_bound, int num_replicas,
                                                client::KuduClient* client);
   static Status AddTxnStatusTableRangeWithClient(int64_t lower_bound, int64_t upper_bound,
                                                  client::KuduClient* client);
 
+  Status CoordinateTransactionAsync(tserver::CoordinatorOpPB coordinate_txn_op,
+                                    const MonoDelta& timeout,
+                                    const StatusCallback& cb);
+
+  client::sp::shared_ptr<client::KuduTable> txn_status_table() {
+    std::lock_guard<simple_spinlock> l(table_lock_);
+    return txn_status_table_;
+  }
+
   client::sp::shared_ptr<client::KuduClient> client_;
+
+  simple_spinlock table_lock_;
+  client::sp::shared_ptr<client::KuduTable> txn_status_table_;
 };
 
 } // namespace transactions
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 4276feb..9fe419d 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -103,7 +103,6 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/process_memory.h"
-#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
@@ -675,7 +674,6 @@ class RpcOpCompletionCallback : public OpCompletionCallback {
 
   rpc::RpcContext* context_;
   Response* response_;
-  tablet::OpState* state_;
 };
 
 // Generic interface to handle scan results.
@@ -1229,33 +1227,37 @@ void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe
                          context);
     return;
   }
-  // From here on out, errors are considered application errors.
-  SCOPED_CLEANUP({
-    context->RespondSuccess();
-  });
+  // Catch any replication errors in this 'ts_error' so we can return an
+  // appropriate error to the caller if need be.
+  TabletServerErrorPB ts_error;
   const auto& user = op.user();
   const auto& txn_id = op.txn_id();
-  // TODO(awong): pass a TabletServerErrorPB pointer down in case these
-  // actually resulted in a failure to write, rather than an application error.
   switch (op.type()) {
     case CoordinatorOpPB::BEGIN_TXN:
-      s = txn_coordinator->BeginTransaction(txn_id, user);
+      s = txn_coordinator->BeginTransaction(txn_id, user, &ts_error);
       break;
     case CoordinatorOpPB::REGISTER_PARTICIPANT:
-      s = txn_coordinator->RegisterParticipant(txn_id, op.txn_participant_id(), user);
+      s = txn_coordinator->RegisterParticipant(txn_id, op.txn_participant_id(), user, &ts_error);
       break;
     case CoordinatorOpPB::BEGIN_COMMIT_TXN:
-      s = txn_coordinator->BeginCommitTransaction(txn_id, user);
+      s = txn_coordinator->BeginCommitTransaction(txn_id, user, &ts_error);
       break;
     case CoordinatorOpPB::ABORT_TXN:
-      s = txn_coordinator->AbortTransaction(txn_id, user);
+      s = txn_coordinator->AbortTransaction(txn_id, user, &ts_error);
       break;
     default:
       s = Status::InvalidArgument(Substitute("Unknown op type: $0", op.type()));
   }
+  if (ts_error.has_status() && ts_error.status().code() != AppStatusPB::OK) {
+    *resp->mutable_error() = std::move(ts_error);
+    context->RespondNoCache();
+    return;
+  }
+  // From here on out, errors are considered application errors.
   if (PREDICT_FALSE(!s.ok())) {
     StatusToPB(s, resp->mutable_op_result()->mutable_op_error());
   }
+  context->RespondSuccess();
 }
 
 bool TabletServiceAdminImpl::SupportsFeature(uint32_t feature) const {