You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/11/18 19:44:09 UTC

[kudu] branch master updated (0df4eaa -> 2d0631f)

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 0df4eaa  [tool] allow customizing parallelism for local_replica copy_from_remote
     new 38aee4b  KUDU-2612 C++ client transaction API
     new 2d0631f  KUDU-2612 move keep-alive assignment to TxnStatusManager

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/client/CMakeLists.txt                     |   5 +-
 src/kudu/client/client-internal.cc                 |  21 +-
 src/kudu/client/client-internal.h                  |  12 +
 src/kudu/client/client-test.cc                     | 440 ++++++++++++++++++++-
 src/kudu/client/client.cc                          |  61 ++-
 src/kudu/client/client.h                           | 161 +++++++-
 src/kudu/client/transaction-internal.cc            | 300 ++++++++++++++
 src/kudu/client/transaction-internal.h             |  72 ++++
 src/kudu/client/txn_manager_proxy_rpc.cc           | 231 +++++++++++
 ...{master_proxy_rpc.h => txn_manager_proxy_rpc.h} |  90 ++---
 src/kudu/common/txn_id.h                           |   4 +-
 .../integration-tests/txn_status_table-itest.cc    |  35 +-
 src/kudu/master/txn_manager-test.cc                |   6 +-
 src/kudu/master/txn_manager.cc                     |  19 +-
 src/kudu/master/txn_manager.h                      |   2 +-
 src/kudu/master/txn_manager.proto                  |   3 +-
 src/kudu/master/txn_manager_service.cc             |   6 +-
 src/kudu/transactions/transactions.proto           |  14 +
 src/kudu/transactions/txn_status_manager.cc        |   9 +
 src/kudu/transactions/txn_system_client.cc         |  10 +
 src/kudu/transactions/txn_system_client.h          |   5 +-
 src/kudu/tserver/tablet_service.cc                 |   4 +
 src/kudu/tserver/tserver_admin.proto               |   8 +
 23 files changed, 1414 insertions(+), 104 deletions(-)
 create mode 100644 src/kudu/client/transaction-internal.cc
 create mode 100644 src/kudu/client/transaction-internal.h
 create mode 100644 src/kudu/client/txn_manager_proxy_rpc.cc
 copy src/kudu/client/{master_proxy_rpc.h => txn_manager_proxy_rpc.h} (54%)


[kudu] 02/02: KUDU-2612 move keep-alive assignment to TxnStatusManager

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 2d0631fe289968094c9dc6a7b16fcc6df9a269ba
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Nov 16 22:52:04 2020 -0800

    KUDU-2612 move keep-alive assignment to TxnStatusManager
    
    To prepare for the processing of transactions' keep-alive heartbeats
    and tracking of stale/abandoned transactions, the assignment of the
    keep-alive setting for a newly started transaction has moved from
    TxnManager to TxnStatusManager.
    
    The rationale for this change is the fact that TxnStatusManager will
    have a periodic task to track the staleness of open transactions
    registered with it.  At this point we anticipate the keep-alive setting
    to be a system-wide parameter, not a per-transaction one.  With that,
    it's natural to make TxnStatusManager the single source of truth for
    the transaction keep-alive interval, such that the latter to be used
    to assign the corresponding property for newly created transactions
    and to spot stale/abandoned ones.
    
    An alternative would be keeping TxnManager as the source of truth
    for the transaction keep-alive setting, but that would entail storing
    persistently the value assigned by TxnManager in a per-transaction
    basis, which is hard to justify without a particular use case.
    
    This patch also brings in a few test scenarios and updates the relevant
    existing ones.
    
    Change-Id: Ie98688b2bbe4c673abbfc5801f89eb8182003c18
    Reviewed-on: http://gerrit.cloudera.org:8080/16737
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../integration-tests/txn_status_table-itest.cc    | 35 ++++++++++++++++------
 src/kudu/master/txn_manager-test.cc                |  6 ++--
 src/kudu/master/txn_manager.cc                     | 19 +++++-------
 src/kudu/master/txn_manager.h                      |  2 +-
 src/kudu/master/txn_manager.proto                  |  3 +-
 src/kudu/master/txn_manager_service.cc             |  6 ++--
 src/kudu/transactions/txn_status_manager.cc        |  9 ++++++
 src/kudu/transactions/txn_system_client.cc         | 10 +++++++
 src/kudu/transactions/txn_system_client.h          |  5 +++-
 src/kudu/tserver/tablet_service.cc                 |  4 +++
 src/kudu/tserver/tserver_admin.proto               |  8 +++++
 11 files changed, 78 insertions(+), 29 deletions(-)

diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc
index 37ad5b8..edeb4bb 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -66,6 +66,7 @@
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_string(superuser_acl);
 DECLARE_string(user_acl);
+DECLARE_uint32(transaction_keepalive_interval_ms);
 
 using kudu::client::AuthenticationCredentialsPB;
 using kudu::client::KuduClient;
@@ -353,16 +354,23 @@ TEST_F(TxnStatusTableITest, TestTxnStatusTableColocatedWithTables) {
 TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
   ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
   ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
-  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+  uint32_t txn_keepalive_ms;
+  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &txn_keepalive_ms));
+  ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms);
   ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
 
   // If we write out of range, we should see an error.
   {
     int64_t highest_seen_txn_id = -1;
-    auto s = txn_sys_client_->BeginTransaction(100, kUser, &highest_seen_txn_id);
+    uint32_t txn_keepalive_ms = 0;
+    auto s = txn_sys_client_->BeginTransaction(
+        100, kUser, &txn_keepalive_ms, &highest_seen_txn_id);
     ASSERT_TRUE(s.IsNotFound()) << s.ToString();
     // The 'highest_seen_txn_id' should be left untouched.
     ASSERT_EQ(-1, highest_seen_txn_id);
+    // txn_keepalive_ms isn't assigned in case of non-OK status.
+    ASSERT_EQ(0, txn_keepalive_ms);
+    ASSERT_NE(0, FLAGS_transaction_keepalive_interval_ms);
   }
   {
     auto s = txn_sys_client_->BeginCommitTransaction(100, kUser);
@@ -376,7 +384,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
   // Once we add a new range, we should be able to leverage it.
   ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(100, 200));
   int64_t highest_seen_txn_id = -1;
-  ASSERT_OK(txn_sys_client_->BeginTransaction(100, kUser, &highest_seen_txn_id));
+  ASSERT_OK(txn_sys_client_->BeginTransaction(
+      100, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id));
   ASSERT_EQ(100, highest_seen_txn_id);
   ASSERT_OK(txn_sys_client_->BeginCommitTransaction(100, kUser));
   ASSERT_OK(txn_sys_client_->AbortTransaction(100, kUser));
@@ -393,7 +402,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
   {
     int64_t highest_seen_txn_id = -1;
     auto s = txn_sys_client_->BeginTransaction(
-        1, kUser, &highest_seen_txn_id, MonoDelta::FromMilliseconds(100));
+        1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
+        MonoDelta::FromMilliseconds(100));
     ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
     // The 'highest_seen_txn_id' should be left untouched.
     ASSERT_EQ(-1, highest_seen_txn_id);
@@ -413,7 +423,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
 
   int64_t highest_seen_txn_id = -1;
   ASSERT_OK(txn_sys_client_->BeginTransaction(
-      1, kUser, &highest_seen_txn_id, MonoDelta::FromSeconds(3)));
+      1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
+      MonoDelta::FromSeconds(3)));
   ASSERT_EQ(highest_seen_txn_id, 1);
 }
 
@@ -421,13 +432,17 @@ TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) {
   ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
   ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
   int64_t highest_seen_txn_id = -1;
-  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id));
+  uint32_t txn_keepalive_ms;
+  ASSERT_OK(txn_sys_client_->BeginTransaction(
+      1, kUser, &txn_keepalive_ms, &highest_seen_txn_id));
   ASSERT_EQ(1, highest_seen_txn_id);
+  ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms);
 
   // Trying to start another transaction with a used ID should yield an error.
   {
     int64_t highest_seen_txn_id = -1;
-    auto s = txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id);
+    auto s = txn_sys_client_->BeginTransaction(
+        1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id);
     ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_EQ(1, highest_seen_txn_id);
     ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
@@ -436,7 +451,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) {
   // The same should be true with a different user.
   {
     int64_t highest_seen_txn_id = -1;
-    auto s = txn_sys_client_->BeginTransaction(1, "stranger", &highest_seen_txn_id);
+    auto s = txn_sys_client_->BeginTransaction(
+        1, "stranger", nullptr /* txn_keepalive_ms */, &highest_seen_txn_id);
     ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_EQ(1, highest_seen_txn_id);
     ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
@@ -478,7 +494,8 @@ TEST_F(TxnStatusTableITest, SystemClientCommitAndAbortTransaction) {
   // with already used ID should yield an error.
   {
     int64_t highest_seen_txn_id = -1;
-    auto s = txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id);
+    auto s = txn_sys_client_->BeginTransaction(
+        1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id);
     ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_EQ(1, highest_seen_txn_id);
     ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
diff --git a/src/kudu/master/txn_manager-test.cc b/src/kudu/master/txn_manager-test.cc
index 47d5fce..034ae89 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -61,7 +61,7 @@ DECLARE_bool(txn_manager_enabled);
 DECLARE_bool(txn_manager_lazily_initialized);
 DECLARE_int32(rpc_service_queue_length);
 DECLARE_int64(txn_manager_status_table_range_partition_span);
-DECLARE_uint32(transaction_keep_alive_interval_ms);
+DECLARE_uint32(transaction_keepalive_interval_ms);
 
 namespace kudu {
 namespace transactions {
@@ -320,7 +320,7 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
     txn_id = resp.txn_id();
     ASSERT_LE(0, txn_id);
     ASSERT_TRUE(resp.has_keepalive_millis());
-    ASSERT_EQ(FLAGS_transaction_keep_alive_interval_ms, resp.keepalive_millis());
+    ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis());
     TxnStatePB txn_state;
     NO_FATALS(fetch_txn_status(txn_id, &txn_state));
     ASSERT_EQ(TxnStatePB::OPEN, txn_state);
@@ -400,7 +400,7 @@ TEST_F(TxnManagerTest, BeginManyTransactions) {
         txn_ids->emplace_back(txn_id);
       }
       CHECK(resp.has_keepalive_millis());
-      CHECK_EQ(FLAGS_transaction_keep_alive_interval_ms, resp.keepalive_millis());
+      CHECK_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis());
     }
   };
 
diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc
index aee7ce6..f67d942 100644
--- a/src/kudu/master/txn_manager.cc
+++ b/src/kudu/master/txn_manager.cc
@@ -73,13 +73,6 @@ DEFINE_int64(txn_manager_status_table_range_partition_span, 1000000,
 TAG_FLAG(txn_manager_status_table_range_partition_span, advanced);
 TAG_FLAG(txn_manager_status_table_range_partition_span, experimental);
 
-DEFINE_uint32(transaction_keep_alive_interval_ms, 3000,
-              "Maximum interval (in milliseconds) between subsequent "
-              "keep-alive heartbeats from client to TxnManager to let it know "
-              "that a transaction is alive");
-TAG_FLAG(transaction_keep_alive_interval_ms, runtime);
-TAG_FLAG(transaction_keep_alive_interval_ms, experimental);
-
 namespace kudu {
 namespace transactions {
 
@@ -124,9 +117,9 @@ TxnManager::~TxnManager() {
 Status TxnManager::BeginTransaction(const string& username,
                                     const MonoTime& deadline,
                                     int64_t* txn_id,
-                                    int32_t* keep_alive_interval_ms) {
+                                    uint32_t* txn_keepalive_ms) {
   DCHECK(txn_id);
-  DCHECK(keep_alive_interval_ms);
+  DCHECK(txn_keepalive_ms);
   RETURN_NOT_OK(CheckInitialized(deadline));
 
   // TxnManager uses next_txn_id_ as a hint for next transaction identifier.
@@ -142,11 +135,13 @@ Status TxnManager::BeginTransaction(const string& username,
   //                the response and passed back to be used here as a hint for
   //                txn ID on the next request.
   int64_t try_txn_id = next_txn_id_++;
+  uint32_t keepalive_ms = 0;
   auto s = Status::TimedOut("timed out while trying to find txn_id");
   while (MonoTime::Now() < deadline) {
     int64_t highest_seen_txn_id = -1;
     s = txn_sys_client_->BeginTransaction(
-        try_txn_id, username, &highest_seen_txn_id, ToDelta(deadline));
+        try_txn_id, username, &keepalive_ms,
+        &highest_seen_txn_id, ToDelta(deadline));
     if (s.ok()) {
       DCHECK_GE(highest_seen_txn_id, 0);
       // The idea is to make the thread that has gotten a transaction reserved
@@ -197,8 +192,10 @@ Status TxnManager::BeginTransaction(const string& username,
     break;
   }
   if (s.ok()) {
+    DCHECK_GT(try_txn_id, -1);
+    DCHECK_GT(keepalive_ms, 0);
     *txn_id = try_txn_id;
-    *keep_alive_interval_ms = FLAGS_transaction_keep_alive_interval_ms;
+    *txn_keepalive_ms = keepalive_ms;
   }
   return s;
 }
diff --git a/src/kudu/master/txn_manager.h b/src/kudu/master/txn_manager.h
index f852799..01bd082 100644
--- a/src/kudu/master/txn_manager.h
+++ b/src/kudu/master/txn_manager.h
@@ -55,7 +55,7 @@ class TxnManager final {
   Status BeginTransaction(const std::string& username,
                           const MonoTime& deadline,
                           int64_t* txn_id,
-                          int32_t* keep_alive_interval_ms);
+                          uint32_t* txn_keepalive_ms);
 
   // Initiate the commit phase for the transaction. The control is returned
   // right after initiating the commit phase: the caller can check for the
diff --git a/src/kudu/master/txn_manager.proto b/src/kudu/master/txn_manager.proto
index cec41a7..03efa52 100644
--- a/src/kudu/master/txn_manager.proto
+++ b/src/kudu/master/txn_manager.proto
@@ -61,7 +61,8 @@ message BeginTransactionResponsePB {
 
   // The keep-alive interval (in milliseconds) to keep the transaction alive.
   // TxnManager expects the client to send keep-alive heartbeats spaced by
-  // keepalive_millis interval.
+  // keepalive_millis interval or shorter, otherwise the transaction may be
+  // automatically aborted as a stale/abandoned one.
   optional uint32 keepalive_millis = 3;
 }
 
diff --git a/src/kudu/master/txn_manager_service.cc b/src/kudu/master/txn_manager_service.cc
index a86b965..de26065 100644
--- a/src/kudu/master/txn_manager_service.cc
+++ b/src/kudu/master/txn_manager_service.cc
@@ -71,15 +71,15 @@ void TxnManagerServiceImpl::BeginTransaction(
     BeginTransactionResponsePB* resp,
     RpcContext* ctx) {
   int64_t txn_id;
-  int32_t keep_alive_interval_ms;
+  uint32_t txn_keepalive_ms;
   const auto s = server_->txn_manager()->BeginTransaction(
       ctx->remote_user().username(),
       ctx->GetClientDeadline(),
       &txn_id,
-      &keep_alive_interval_ms);
+      &txn_keepalive_ms);
   if (PREDICT_TRUE(s.ok())) {
     resp->set_txn_id(txn_id);
-    resp->set_keepalive_millis(keep_alive_interval_ms);
+    resp->set_keepalive_millis(txn_keepalive_ms);
   }
   CheckRespErrorOrSetUnknown(s, resp);
   return ctx->RespondSuccess();
diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc
index e6bacb3..74508a8 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -24,18 +24,27 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/transactions/transactions.pb.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/cow_object.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 
+DEFINE_uint32(transaction_keepalive_interval_ms, 5000,
+              "Maximum interval (in milliseconds) between subsequent "
+              "keep-alive heartbeats to let the transaction status manager "
+              "know that a transaction is not abandoned");
+TAG_FLAG(transaction_keepalive_interval_ms, experimental);
+
 using kudu::pb_util::SecureShortDebugString;
 using kudu::tablet::ParticipantIdsByTxnId;
 using kudu::tserver::TabletServerErrorPB;
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index 092eb5d..cfefae4 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -125,6 +125,7 @@ Status TxnSystemClient::OpenTxnStatusTable() {
 
 Status TxnSystemClient::BeginTransaction(int64_t txn_id,
                                          const string& user,
+                                         uint32_t* txn_keepalive_ms,
                                          int64_t* highest_seen_txn_id,
                                          MonoDelta timeout) {
   CoordinatorOpPB coordinate_txn_op;
@@ -138,6 +139,15 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id,
                                            s.AsStatusCallback(),
                                            &result));
   const auto ret = s.Wait();
+  if (ret.ok()) {
+    DCHECK(result.has_highest_seen_txn_id());
+    DCHECK(result.has_keepalive_millis());
+    if (txn_keepalive_ms) {
+      *txn_keepalive_ms = result.keepalive_millis();
+    }
+  }
+  // The 'highest_seen_tnx_id' field in the 'result' can be set in case of
+  // some non-OK cases as well.
   if (result.has_highest_seen_txn_id() && highest_seen_txn_id) {
     *highest_seen_txn_id = result.highest_seen_txn_id();
   }
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index c5269b7..f5b3a21 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -84,9 +84,12 @@ class TxnSystemClient {
   // parameter (if not null) is set to the highest transaction identifier
   // observed by corresponding TxnStatusManager. Otherwise, the
   // 'highest_seen_txn_id' parameter is unset (e.g., in case of the requeset
-  // to TxnStatusManager timed out).
+  // to TxnStatusManager timed out). The 'keep_alive_ms' output parameter is
+  // populated with number of milliseconds for the transaction's keep-alive
+  // interval in case of success, otherwise it is not set.
   Status BeginTransaction(int64_t txn_id, const
                           std::string& user,
+                          uint32_t* txn_keepalive_ms = nullptr,
                           int64_t* highest_seen_txn_id = nullptr,
                           MonoDelta timeout = MonoDelta::FromSeconds(10));
 
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index a73c5a0..11c1f08 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -173,6 +173,7 @@ TAG_FLAG(tserver_inject_invalid_authz_token_ratio, hidden);
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_int32(memory_limit_warn_threshold_percentage);
 DECLARE_int32(tablet_history_max_age_sec);
+DECLARE_uint32(transaction_keepalive_interval_ms);
 
 METRIC_DEFINE_counter(
     server,
@@ -1283,6 +1284,9 @@ void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe
   } else if (op.type() == CoordinatorOpPB::GET_TXN_STATUS) {
     // Populate corresponding field in the response.
     *(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status);
+  } else if (op.type() == CoordinatorOpPB::BEGIN_TXN) {
+    resp->mutable_op_result()->set_keepalive_millis(
+        FLAGS_transaction_keepalive_interval_ms);
   }
   if (op.type() == CoordinatorOpPB::BEGIN_TXN && !s.IsServiceUnavailable()) {
     DCHECK_GE(highest_seen_txn_id, 0);
diff --git a/src/kudu/tserver/tserver_admin.proto b/src/kudu/tserver/tserver_admin.proto
index bd1f881..e95a12a 100644
--- a/src/kudu/tserver/tserver_admin.proto
+++ b/src/kudu/tserver/tserver_admin.proto
@@ -35,6 +35,7 @@ message CoordinatorOpPB {
     BEGIN_COMMIT_TXN = 3;
     ABORT_TXN = 4;
     GET_TXN_STATUS = 5;
+    KEEP_TXN_ALIVE = 6;
   }
   optional CoordinatorOpType type = 1;
   optional int64 txn_id = 2;
@@ -62,6 +63,13 @@ message CoordinatorOpResultPB {
   // to a request of the BEGIN_TXN type (in success and error cases).
   optional int64 highest_seen_txn_id = 3;
 
+  // The keep-alive interval for the transaction to begin. This field
+  // is set only if responding to requests of the BEGIN_TXN type.
+  //
+  // TODO(aserbin): it might make sense to populate this field also for
+  //                responses of the GET_TXN_STATUS and KEEP_TXN_ALIVE types.
+  optional uint32 keepalive_millis = 4;
+
   // TODO(awong): populate this with some application-level results, like the
   // actual transaction ID assigned, the next highest transaction ID available,
   // etc.


[kudu] 01/02: KUDU-2612 C++ client transaction API

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 38aee4b1407b04b8c299768be0ae40d84a8e2a3a
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Oct 28 18:26:39 2020 -0700

    KUDU-2612 C++ client transaction API
    
    This patch adds C++ client API to interact with TxnManager.
    Corresponding tests are added as well.  True end-to-end tests are not
    available yet because they require transaction orchestration
    functionality to be present, but it's not there yet.  I'm planning
    to add more tests as soon as the orchestration starts to appear.
    
    This patch is mostly focused on the client-side transactional API,
    containing several TODOs.  The scope of those is limited to the
    related client-side-only functionality.  I'm planning to address
    those in follow-up patches.  I think that way it's easier to scope,
    implemented and review compared with lumping them all in a single patch.
    
    Change-Id: Ic48233f72873012ea36ff4a05d16c58a0ba9b407
    Reviewed-on: http://gerrit.cloudera.org:8080/16710
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/CMakeLists.txt           |   5 +-
 src/kudu/client/client-internal.cc       |  21 +-
 src/kudu/client/client-internal.h        |  12 +
 src/kudu/client/client-test.cc           | 440 ++++++++++++++++++++++++++++++-
 src/kudu/client/client.cc                |  61 ++++-
 src/kudu/client/client.h                 | 161 ++++++++++-
 src/kudu/client/transaction-internal.cc  | 300 +++++++++++++++++++++
 src/kudu/client/transaction-internal.h   |  72 +++++
 src/kudu/client/txn_manager_proxy_rpc.cc | 231 ++++++++++++++++
 src/kudu/client/txn_manager_proxy_rpc.h  | 132 ++++++++++
 src/kudu/common/txn_id.h                 |   4 +-
 src/kudu/transactions/transactions.proto |  14 +
 12 files changed, 1430 insertions(+), 23 deletions(-)

diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index acce7f0..a9b0ed3 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -58,6 +58,8 @@ set(CLIENT_SRCS
   table_creator-internal.cc
   tablet-internal.cc
   tablet_server-internal.cc
+  transaction-internal.cc
+  txn_manager_proxy_rpc.cc
   value.cc
   write_op.cc
 )
@@ -70,7 +72,8 @@ set(CLIENT_LIBS
   kudu_util
   master_proto
   tserver_proto
-  tserver_service_proto)
+  tserver_service_proto
+  txn_manager_proto)
 
 # Make sure we exclude tcmalloc from the exported library; we want the library
 # code to use the linking application's malloc implementation.
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index a55e5aa..04069ac 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -53,6 +53,7 @@
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
+#include "kudu/master/txn_manager.proxy.h"
 #include "kudu/rpc/connection.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/request_tracker.h"
@@ -99,8 +100,8 @@ using kudu::master::MasterServiceProxy;
 using kudu::master::TableIdentifierPB;
 using kudu::rpc::BackoffType;
 using kudu::rpc::CredentialsPolicy;
-using kudu::rpc::MessengerBuilder;
 using kudu::security::SignedTokenPB;
+using kudu::transactions::TxnManagerServiceProxy;
 using std::map;
 using std::pair;
 using std::set;
@@ -483,15 +484,15 @@ Status KuduClient::Data::ListTabletServers(KuduClient* client,
   return Status::OK();
 }
 
+bool KuduClient::Data::FetchCachedAuthzToken(const string& table_id, SignedTokenPB* token) {
+  return authz_token_cache_.Fetch(table_id, token);
+}
+
 void KuduClient::Data::StoreAuthzToken(const string& table_id,
                                        const SignedTokenPB& token) {
   authz_token_cache_.Put(table_id, token);
 }
 
-bool KuduClient::Data::FetchCachedAuthzToken(const string& table_id, SignedTokenPB* token) {
-  return authz_token_cache_.Fetch(table_id, token);
-}
-
 Status KuduClient::Data::OpenTable(KuduClient* client,
                                    const TableIdentifierPB& table_identifier,
                                    client::sp::shared_ptr<KuduTable>* table) {
@@ -665,6 +666,10 @@ void KuduClient::Data::ConnectedToClusterCb(
       master_proxy_ = std::make_shared<MasterServiceProxy>(
           messenger_, leader_addr, leader_hostname);
       master_proxy_->set_user_credentials(user_credentials_);
+
+      txn_manager_proxy_ = std::make_shared<TxnManagerServiceProxy>(
+          messenger_, leader_addr, leader_hostname);
+      txn_manager_proxy_->set_user_credentials(user_credentials_);
     }
   }
 
@@ -826,6 +831,12 @@ shared_ptr<master::MasterServiceProxy> KuduClient::Data::master_proxy() const {
   return master_proxy_;
 }
 
+std::shared_ptr<transactions::TxnManagerServiceProxy>
+KuduClient::Data::txn_manager_proxy() const {
+  std::lock_guard<simple_spinlock> l(leader_master_lock_);
+  return txn_manager_proxy_;
+}
+
 uint64_t KuduClient::Data::GetLatestObservedTimestamp() const {
   return latest_observed_timestamp_.Load();
 }
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index dc7dade..4f8153a 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -66,6 +66,10 @@ class Messenger;
 class RequestTracker;
 } // namespace rpc
 
+namespace transactions {
+class TxnManagerServiceProxy;
+} // namespace transactions
+
 namespace client {
 
 class KuduSchema;
@@ -231,6 +235,7 @@ class KuduClient::Data {
                        const security::SignedTokenPB& token);
 
   std::shared_ptr<master::MasterServiceProxy> master_proxy() const;
+  std::shared_ptr<transactions::TxnManagerServiceProxy> txn_manager_proxy() const;
 
   HostPort leader_master_hostport() const;
 
@@ -300,6 +305,13 @@ class KuduClient::Data {
   // Proxy to the leader master.
   std::shared_ptr<master::MasterServiceProxy> master_proxy_;
 
+  // A proxy to TxnManagerService instance. As of now, every master hosts
+  // a TxnManagerService, so it's easy to find one.
+  //
+  // TODO(aserbin): switch to multiple (per-master) proxies from TxnManager
+  //                and distribute transaction-related requests among those
+  std::shared_ptr<transactions::TxnManagerServiceProxy> txn_manager_proxy_;
+
   // Ref-counted RPC instance: since 'ConnectToClusterAsync' call
   // is asynchronous, we need to hold a reference in this class
   // itself, as to avoid a "use-after-free" scenario.
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 6849c6a..4d2a2d7 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -62,6 +62,7 @@
 #include "kudu/client/schema.h"
 #include "kudu/client/session-internal.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/client/transaction-internal.h"
 #include "kudu/client/value.h"
 #include "kudu/client/write_op.h"
 #include "kudu/clock/clock.h"
@@ -100,6 +101,8 @@
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/txn_coordinator.h"
+#include "kudu/transactions/transactions.pb.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_server.h"
@@ -137,6 +140,7 @@ DECLARE_bool(mock_table_metrics_for_testing);
 DECLARE_bool(rpc_listen_on_unix_domain_socket);
 DECLARE_bool(rpc_trace_negotiation);
 DECLARE_bool(scanner_inject_service_unavailable_on_continue_scan);
+DECLARE_bool(txn_manager_enabled);
 DECLARE_int32(flush_threshold_mb);
 DECLARE_int32(flush_threshold_secs);
 DECLARE_int32(heartbeat_interval_ms);
@@ -187,6 +191,7 @@ using kudu::rpc::MessengerBuilder;
 using kudu::security::SignedTokenPB;
 using kudu::client::sp::shared_ptr;
 using kudu::tablet::TabletReplica;
+using kudu::transactions::TxnTokenPB;
 using kudu::tserver::MiniTabletServer;
 using std::function;
 using std::map;
@@ -224,6 +229,9 @@ class ClientTest : public KuduTest {
     FLAGS_heartbeat_interval_ms = 10;
     FLAGS_scanner_gc_check_interval_us = 50 * 1000; // 50 milliseconds.
 
+    // Enable TxnManager in Kudu master.
+    FLAGS_txn_manager_enabled = true;
+
     SetLocationMappingCmd();
 
     // Start minicluster and wait for tablet servers to connect to master.
@@ -343,6 +351,43 @@ class ClientTest : public KuduTest {
     }
   }
 
+  // TODO(aserbin): consider removing this method and update the scenarios it
+  //                was used once the transaction orchestration is implemented
+  Status FinalizeCommitTransaction(int64_t txn_id) {
+    for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+      auto* tm = cluster_->mini_tablet_server(i)->server()->tablet_manager();
+      vector<scoped_refptr<TabletReplica>> replicas;
+      tm->GetTabletReplicas(&replicas);
+      for (auto& r : replicas) {
+        auto* c = r->txn_coordinator();
+        if (!c) {
+          continue;
+        }
+        auto highest_txn_id = c->highest_txn_id();
+        if (txn_id > highest_txn_id) {
+          continue;
+        }
+        auto s = c->FinalizeCommitTransaction(txn_id);
+        if (s.IsNotFound()) {
+          continue;
+        }
+        return s;
+      }
+    }
+    return Status::NotFound(Substitute("transaction $0 not found", txn_id));
+  }
+
+  // TODO(aserbin): remove this method and update the scenarios it's used
+  //                once the transaction orchestration is implemented
+  Status FinalizeCommitTransaction(const shared_ptr<KuduTransaction>& txn) {
+    string txn_token;
+    RETURN_NOT_OK(txn->Serialize(&txn_token));
+    TxnTokenPB token;
+    CHECK(token.ParseFromString(txn_token));
+    CHECK(token.has_txn_id());
+    return FinalizeCommitTransaction(token.txn_id());
+  }
+
   // Return the number of lookup-related RPCs which have been serviced by the master.
   int CountMasterLookupRPCs() const {
     auto ent = cluster_->mini_master()->master()->metric_entity();
@@ -6585,12 +6630,15 @@ TEST_F(ClientTest, TxnIdOfTransactionalSession) {
   // Check how relevant member fields are populated in case of
   // transactional session.
   {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
     const TxnId kTxnId(0);
     KuduSession s(client_, kTxnId);
 
     const auto& session_data_txn_id = s.data_->txn_id_;
     ASSERT_TRUE(session_data_txn_id.IsValid());
-    ASSERT_EQ(kTxnId.value(), session_data_txn_id.value());
+    const auto& txnId = txn->data_->txn_id_;
+    ASSERT_EQ(txnId.value(), session_data_txn_id.value());
 
     NO_FATALS(apply_single_insert(&s));
 
@@ -6598,7 +6646,7 @@ TEST_F(ClientTest, TxnIdOfTransactionalSession) {
     ASSERT_NE(nullptr, s.data_->batcher_.get());
     const auto& batcher_txn_id = s.data_->batcher_->txn_id();
     ASSERT_TRUE(batcher_txn_id.IsValid());
-    ASSERT_EQ(kTxnId.value(), batcher_txn_id.value());
+    ASSERT_EQ(txnId.value(), batcher_txn_id.value());
   }
 }
 
@@ -6693,6 +6741,394 @@ TEST_F(ClientTest, TestClientLocationNoLocationMappingCmd) {
   ASSERT_TRUE(client_->location().empty());
 }
 
+// Check basic operations of the transaction-related API.
+// TODO(aserbin): add more scenarios and update existing ones to remove explicit
+//                FinalizeCommitTransaction() call when transaction
+//                orchestration is ready (i.e. FinalizeCommitTransaction() is
+//                called for all registered participants by the backend itself).
+TEST_F(ClientTest, TxnBasicOperations) {
+  // KuduClient::NewTransaction() populates the output parameter on success.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    ASSERT_NE(nullptr, txn.get());
+
+    shared_ptr<KuduSession> session;
+    ASSERT_OK(txn->CreateSession(&session));
+    ASSERT_NE(nullptr, session.get());
+    ASSERT_EQ(0, session->CountPendingErrors());
+    ASSERT_OK(session->Close());
+  }
+
+  // Multiple Rollback() calls are OK.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    ASSERT_OK(txn->Rollback());
+    ASSERT_OK(txn->Rollback());
+  }
+
+  // It's possible to rollback a transaction that hasn't yet finalized
+  // its commit phase.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    ASSERT_OK(txn->Commit(false /* wait */));
+    ASSERT_OK(txn->Rollback());
+  }
+
+  // It's impossible to rollback a transaction that has finalized
+  // its commit phase.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    ASSERT_OK(txn->Commit(false /* wait */));
+    ASSERT_OK(FinalizeCommitTransaction(txn));
+    auto cs = Status::Incomplete("other than Status::OK() initial status");
+    bool is_complete = false;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
+    ASSERT_OK(cs);
+    ASSERT_TRUE(is_complete);
+    auto s = txn->Rollback();
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  }
+
+  // It's impossible to commit a transaction that's being rolled back.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    ASSERT_OK(txn->Rollback());
+    auto s = txn->Commit(false /* wait */);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "is not open: state: ABORTED");
+  }
+
+  // TODO(aserbin): uncomment this when other parts of transaction lifecycle
+  //                are properly implemented
+#if 0
+  // Insert rows in a transactional session, then rollback the transaction
+  // and make sure the rows are gone.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    shared_ptr<KuduSession> session;
+    ASSERT_OK(txn->CreateSession(&session));
+    NO_FATALS(InsertTestRows(client_table_.get(), session.get(), 10));
+    ASSERT_OK(txn->Rollback());
+    ASSERT_EQ(0, CountRowsFromClient(client_table_.get()));
+  }
+
+  // Insert rows in a transactional session, then commit the transaction
+  // and make sure the expected rows are there.
+  {
+    constexpr auto kRowsNum = 123;
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    shared_ptr<KuduSession> session;
+    ASSERT_OK(txn->CreateSession(&session));
+    NO_FATALS(InsertTestRows(client_table_.get(), session.get(), kRowsNum));
+    ASSERT_OK(txn->Commit());
+    ASSERT_EQ(kRowsNum, CountRowsFromClient(
+        client_table_.get(), KuduScanner::READ_YOUR_WRITES, kNoBound, kNoBound));
+    ASSERT_EQ(0, session->CountPendingErrors());
+  }
+#endif
+}
+
+// Verify the basic functionality of the KuduTransaction::Commit() and
+// KuduTransaction::IsCommitComplete() methods.
+TEST_F(ClientTest, TxnCommit) {
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    bool is_complete = true;
+    Status cs;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
+    ASSERT_FALSE(is_complete);
+    ASSERT_TRUE(cs.IsIllegalState()) << cs.ToString();
+    ASSERT_STR_CONTAINS(cs.ToString(), "transaction is still open");
+
+    ASSERT_OK(txn->Rollback());
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
+    ASSERT_TRUE(is_complete);
+    ASSERT_TRUE(cs.IsAborted()) << cs.ToString();
+    ASSERT_STR_CONTAINS(cs.ToString(), "transaction has been aborted");
+  }
+
+  {
+    string txn_token;
+    {
+      shared_ptr<KuduTransaction> txn;
+      ASSERT_OK(client_->NewTransaction(&txn));
+      ASSERT_OK(txn->Commit(false /* wait */));
+      // TODO(aserbin): when txn lifecycle is properly implemented, inject a
+      //                delay into the txn finalizing code to make sure
+      //                the transaction stays in the COMMIT_IN_PROGRESS state
+      //                for a while
+      bool is_complete = true;
+      Status cs;
+      ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
+      ASSERT_FALSE(is_complete);
+      ASSERT_TRUE(cs.IsIncomplete()) << cs.ToString();
+      ASSERT_STR_CONTAINS(cs.ToString(), "commit is still in progress");
+      ASSERT_OK(txn->Serialize(&txn_token));
+    }
+
+    // Make sure the transaction isn't aborted once its KuduTransaction handle
+    // goes out of scope.
+    shared_ptr<KuduTransaction> serdes_txn;
+    ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn));
+    bool is_complete = true;
+    Status cs;
+    ASSERT_OK(serdes_txn->IsCommitComplete(&is_complete, &cs));
+    ASSERT_FALSE(is_complete);
+    ASSERT_TRUE(cs.IsIncomplete()) << cs.ToString();
+    ASSERT_STR_CONTAINS(cs.ToString(), "commit is still in progress");
+  }
+
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    ASSERT_OK(txn->Commit(false /* wait */));
+    bool is_complete = true;
+    Status cs;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
+    ASSERT_FALSE(is_complete);
+    ASSERT_TRUE(cs.IsIncomplete()) << cs.ToString();
+    ASSERT_OK(FinalizeCommitTransaction(txn));
+    {
+      bool is_complete = false;
+      auto cs = Status::Incomplete("other than Status::OK() initial status");
+      ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
+      ASSERT_TRUE(is_complete);
+      ASSERT_OK(cs);
+    }
+  }
+
+  // TODO(aserbin): uncomment this when txn lifecycle is properly implemented
+#if 0
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    ASSERT_OK(txn->Commit(false /* wait */));
+    ASSERT_EVENTUALLY([&] {
+      bool is_complete = false;
+      Status cs;
+      ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
+      ASSERT_TRUE(is_complete);
+      ASSERT_OK(cs);
+    });
+  }
+
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    ASSERT_OK(txn->Commit());
+    bool is_complete = false;
+    Status cs = Status::Incomplete("other than Status::OK() initial status");
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
+    ASSERT_TRUE(is_complete);
+    ASSERT_OK(cs);
+  }
+#endif
+}
+
+// This test verifies the behavior of KuduTransaction instance when the bound
+// KuduClient instance gets out of scope.
+TEST_F(ClientTest, TxnHandleLifecycle) {
+  shared_ptr<KuduTransaction> txn;
+  {
+    const auto master_addr = cluster_->mini_master()->bound_rpc_addr().ToString();
+    KuduClientBuilder b;
+    b.add_master_server_addr(master_addr);
+    shared_ptr<KuduClient> c;
+    ASSERT_OK(b.Build(&c));
+    ASSERT_OK(c->NewTransaction(&txn));
+  }
+  auto s = txn->Rollback();
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "associated KuduClient is gone");
+}
+
+TEST_F(ClientTest, TxnCorruptedToken) {
+  vector<pair<string, string>> token_test_cases;
+  token_test_cases.emplace_back("an empty string", "");
+  {
+    TxnTokenPB token;
+    string buf;
+    ASSERT_TRUE(token.SerializeToString(&buf));
+    token_test_cases.emplace_back("empty token", std::move(buf));
+  }
+  {
+    TxnTokenPB token;
+    token.set_txn_id(0);
+    string buf;
+    ASSERT_TRUE(token.SerializeToString(&buf));
+    token_test_cases.emplace_back("missing keepalive_millis field",
+                                  std::move(buf));
+  }
+  {
+    TxnTokenPB token;
+    token.set_keepalive_millis(1000);
+    string buf;
+    ASSERT_TRUE(token.SerializeToString(&buf));
+    token_test_cases.emplace_back("missing txn_id field", std::move(buf));
+  }
+
+  for (const auto& description_and_token : token_test_cases) {
+    SCOPED_TRACE(description_and_token.first);
+    const auto& token = description_and_token.second;
+    shared_ptr<KuduTransaction> serdes_txn;
+    auto s = KuduTransaction::Deserialize(client_, token, &serdes_txn);
+    ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+  }
+}
+
+// Test scenario to verify serialization/deserialization of transaction tokens.
+TEST_F(ClientTest, TxnToken) {
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+
+  const TxnId txn_id = txn->data_->txn_id_;
+  ASSERT_TRUE(txn_id.IsValid());
+  const uint32_t txn_keepalive_ms = txn->data_->txn_keep_alive_ms_;
+  ASSERT_GT(txn_keepalive_ms, 0);
+
+  string txn_token;
+  ASSERT_OK(txn->Serialize(&txn_token));
+
+  // Serializing the same transaction again produces the same result.
+  {
+    string token;
+    ASSERT_OK(txn->Serialize(&token));
+    ASSERT_EQ(txn_token, token);
+  }
+
+  // Check the token for consistency.
+  {
+    TxnTokenPB token;
+    ASSERT_TRUE(token.ParseFromString(txn_token));
+    ASSERT_TRUE(token.has_txn_id());
+    ASSERT_EQ(txn_id.value(), token.txn_id());
+    ASSERT_TRUE(token.has_keepalive_millis());
+    ASSERT_EQ(txn_keepalive_ms, token.keepalive_millis());
+  }
+
+  shared_ptr<KuduTransaction> serdes_txn;
+  ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn));
+  ASSERT_NE(nullptr, serdes_txn.get());
+  ASSERT_EQ(txn_id, serdes_txn->data_->txn_id_);
+  ASSERT_EQ(txn_keepalive_ms, serdes_txn->data_->txn_keep_alive_ms_);
+
+  // Make sure the KuduTransaction object deserialized from a token is fully
+  // functional.
+  string serdes_txn_token;
+  ASSERT_OK(serdes_txn->Serialize(&serdes_txn_token));
+  ASSERT_EQ(txn_token, serdes_txn_token);
+
+  {
+    static constexpr auto kNumRows = 10;
+    shared_ptr<KuduSession> session;
+    ASSERT_OK(serdes_txn->CreateSession(&session));
+    NO_FATALS(InsertTestRows(client_table_.get(), session.get(), kNumRows));
+    ASSERT_OK(serdes_txn->Commit(false /* wait */));
+
+    // The state of a transaction isn't stored in the token, so initiating
+    // commit of the transaction doesn't change the result of the serialization.
+    string token;
+    ASSERT_OK(serdes_txn->Serialize(&token));
+    ASSERT_EQ(serdes_txn_token, token);
+  }
+
+  // A new transaction should produce in a new, different token.
+  shared_ptr<KuduTransaction> other_txn;
+  ASSERT_OK(client_->NewTransaction(&other_txn));
+  string other_txn_token;
+  ASSERT_OK(other_txn->Serialize(&other_txn_token));
+  ASSERT_NE(txn_token, other_txn_token);
+
+  // The state of a transaction isn't stored in the token, so aborting
+  // the doesn't change the result of the serialization.
+  string token;
+  ASSERT_OK(other_txn->Rollback());
+  ASSERT_OK(other_txn->Serialize(&token));
+  ASSERT_EQ(other_txn_token, token);
+}
+
+// Begin a transaction under one user, and then try to commit/rollback the
+// transaction under different user. The latter should result in
+// Status::NotAuthorized() status.
+TEST_F(ClientTest, AttemptToControlTxnByOtherUser) {
+  static constexpr const char* const kOtherTxnUser = "other-txn-user";
+
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+  string txn_token;
+  ASSERT_OK(txn->Serialize(&txn_token));
+
+  // Transaction identifier is surfacing here only to build the reference error
+  // message for Status::NotAuthorized() returned by attempts to perform
+  // commit/rollback operations below.
+  TxnId txn_id;
+  {
+    TxnTokenPB token;
+    ASSERT_TRUE(token.ParseFromString(txn_token));
+    ASSERT_TRUE(token.has_txn_id());
+    txn_id = token.txn_id();
+  }
+  ASSERT_TRUE(txn_id.IsValid());
+  const auto ref_msg = Substitute(
+      "transaction ID $0 not owned by $1", txn_id.value(), kOtherTxnUser);
+
+  KuduClientBuilder client_builder;
+  string authn_creds;
+  AuthenticationCredentialsPB pb;
+  pb.set_real_user(kOtherTxnUser);
+  ASSERT_TRUE(pb.SerializeToString(&authn_creds));
+  client_builder.import_authentication_credentials(authn_creds);
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(cluster_->CreateClient(&client_builder, &client));
+  shared_ptr<KuduTransaction> serdes_txn;
+  ASSERT_OK(KuduTransaction::Deserialize(client, txn_token, &serdes_txn));
+  const vector<pair<string, Status>> txn_ctl_results = {
+    { "rollback", serdes_txn->Rollback() },
+    { "commit", serdes_txn->Commit(false /* wait */) },
+  };
+  for (const auto& op_and_status : txn_ctl_results) {
+    SCOPED_TRACE(op_and_status.first);
+    const auto& s = op_and_status.second;
+    ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), ref_msg);
+  }
+}
+
+TEST_F(ClientTest, NoTxnManager) {
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(client_->NewTransaction(&txn));
+
+  // Shutdown all masters: a TxnManager is a part of master, so after shutting
+  // down all masters in the cluster there will be no single TxnManager running.
+  cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
+
+  {
+    shared_ptr<KuduTransaction> txn;
+    auto s = client_->NewTransaction(&txn);
+    ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Client connection negotiation failed");
+  }
+
+  const vector<pair<string, Status>> txn_ctl_results = {
+    { "rollback", txn->Rollback() },
+    { "commit", txn->Commit(false /* wait */) },
+  };
+  for (const auto& op_and_status : txn_ctl_results) {
+    SCOPED_TRACE(op_and_status.first);
+    const auto& s = op_and_status.second;
+    ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Client connection negotiation failed");
+  }
+}
+
 // Client test that assigns locations to clients and tablet servers.
 // For now, assigns a uniform location to all clients and tablet servers.
 class ClientWithLocationTest : public ClientTest {
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index ebc4fc1..61e82ca 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -57,6 +57,7 @@
 #include "kudu/client/table_statistics-internal.h"
 #include "kudu/client/tablet-internal.h"
 #include "kudu/client/tablet_server-internal.h"
+#include "kudu/client/transaction-internal.h"
 #include "kudu/client/value.h"
 #include "kudu/client/write_op.h"
 #include "kudu/common/common.pb.h"
@@ -66,6 +67,7 @@
 #include "kudu/common/row_operations.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
+#include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/metadata.pb.h"
@@ -100,10 +102,9 @@
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/version_info.h"
 
-namespace kudu {
-class BlockBloomFilter;
-}  // namespace kudu
-
+using kudu::client::internal::AsyncLeaderMasterRpc;
+using kudu::client::internal::MetaCache;
+using kudu::client::sp::shared_ptr;
 using kudu::consensus::RaftPeerPB;
 using kudu::master::AlterTableRequestPB;
 using kudu::master::AlterTableResponsePB;
@@ -159,16 +160,13 @@ struct tm;
 
 namespace kudu {
 
+class BlockBloomFilter;
 class simple_spinlock;
 
 namespace client {
 
 class ResourceMetrics;
 
-using internal::AsyncLeaderMasterRpc;
-using internal::MetaCache;
-using sp::shared_ptr;
-
 const char* kVerboseEnvVar = "KUDU_CLIENT_VERBOSE";
 
 #if defined(kudu_client_exported_EXPORTS)
@@ -402,6 +400,41 @@ Status KuduClientBuilder::Build(shared_ptr<KuduClient>* client) {
   return Status::OK();
 }
 
+KuduTransaction::KuduTransaction(const sp::shared_ptr<KuduClient>& client)
+    : data_(new KuduTransaction::Data(client)) {
+}
+
+KuduTransaction::~KuduTransaction() {
+  delete data_;
+}
+
+Status KuduTransaction::CreateSession(sp::shared_ptr<KuduSession>* session) {
+  return data_->CreateSession(session);
+}
+
+Status KuduTransaction::Commit(bool wait) {
+  return data_->Commit(wait);
+}
+
+Status KuduTransaction::IsCommitComplete(
+    bool* is_complete, Status* completion_status) {
+  return data_->IsCommitComplete(is_complete, completion_status);
+}
+
+Status KuduTransaction::Rollback() {
+  return data_->Rollback();
+}
+
+Status KuduTransaction::Serialize(string* serialized_txn) const {
+  return data_->Serialize(serialized_txn);
+}
+
+Status KuduTransaction::Deserialize(const sp::shared_ptr<KuduClient>& client,
+                                    const string& serialized_txn,
+                                    sp::shared_ptr<KuduTransaction>* txn) {
+  return Data::Deserialize(client, serialized_txn, txn);
+}
+
 KuduClient::KuduClient()
   : data_(new KuduClient::Data()) {
   static ObjectIdGenerator oid_generator;
@@ -531,6 +564,15 @@ shared_ptr<KuduSession> KuduClient::NewSession() {
   return ret;
 }
 
+Status KuduClient::NewTransaction(sp::shared_ptr<KuduTransaction>* txn) {
+  shared_ptr<KuduTransaction> ret(new KuduTransaction(shared_from_this()));
+  const auto s = ret->data_->Begin();
+  if (s.ok()) {
+    *txn = std::move(ret);
+  }
+  return s;
+}
+
 Status KuduClient::GetTablet(const string& tablet_id, KuduTablet** tablet) {
   GetTabletLocationsRequestPB req;
   GetTabletLocationsResponsePB resp;
@@ -1181,8 +1223,7 @@ KuduSession::KuduSession(const shared_ptr<KuduClient>& client)
     : data_(new KuduSession::Data(client, client->data_->messenger_)) {
 }
 
-KuduSession::KuduSession(const shared_ptr<KuduClient>& client,
-                         const TxnId& txn_id)
+KuduSession::KuduSession(const shared_ptr<KuduClient>& client, const TxnId& txn_id)
     : data_(new KuduSession::Data(client, client->data_->messenger_, txn_id)) {
 }
 
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 5001543..95a2a1d 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -115,6 +115,8 @@ class ScanBatchDataInterface;
 class WriteRpc;
 template <class ReqClass, class RespClass>
 class AsyncLeaderMasterRpc; // IWYU pragma: keep
+template <class ReqClass, class RespClass>
+class AsyncRandomTxnManagerRpc; // IWYU pragma: keep
 } // namespace internal
 
 /// Install a callback for internal client logging.
@@ -322,6 +324,134 @@ class KUDU_EXPORT KuduClientBuilder {
   DISALLOW_COPY_AND_ASSIGN(KuduClientBuilder);
 };
 
+/// A class representing a multi-row transaction in Kudu. Once created using
+/// @c KuduClient::BeginTransaction() or @c KuduTransaction::Deserialize method,
+/// @c KuduTransaction instance can be used to commit or rollback the underlying
+/// multi-row transaction and create a transactional session.
+///
+/// @note There isn't any automation to rollback or commit the underlying
+///   transaction upon destruction of an instance of this class.
+///
+/// @warning The set of methods in this class, their behavior, and signatures
+///          are experimental and may change or disappear in future. The class
+///          itself is experimental and may change its lineage, API status,
+///          or disappear in future.
+class KUDU_EXPORT KuduTransaction :
+    public sp::enable_shared_from_this<KuduTransaction> {
+ public:
+  ~KuduTransaction();
+
+  /// Create a new @c KuduSession with "transactional" semantics.
+  ///
+  /// Every write operation performed in the context of the newly created
+  /// "transactional" session becomes a part of the corresponding multi-row
+  /// transaction represented by an instance of this class. Multiple sessions
+  /// can be created in the context of the same multi-row distributed
+  /// transaction by the same or different Kudu clients residing on a single
+  /// or multiple nodes.
+  ///
+  /// @param [out] session
+  ///   The result session object.
+  /// @return Operation result status.
+  Status CreateSession(sp::shared_ptr<KuduSession>* session) WARN_UNUSED_RESULT;
+
+  /// Commit the transaction.
+  ///
+  /// This method initiates committing the transaction, and, depending on the
+  /// @c wait parameter, either returns right after that or awaits
+  /// for the commit to finalize.
+  ///
+  /// @param [in] mode
+  ///   This parameter controls the way how this method operates:
+  ///     @li @c true means synchronous operation mode
+  ///     @li @c false means asynchronous operation mode
+  ///   In case of asynchronous mode, @c KuduTransaction::IsCommitComplete()
+  ///   can be used to detect whether the commit has successfully finalized.
+  /// @return Operation result status.
+  Status Commit(bool wait = true) WARN_UNUSED_RESULT;
+
+  /// Whether the commit has completed i.e. no longer in progress of finalizing.
+  ///
+  /// This method checks for the transaction's commit status, setting the
+  /// @c is_complete out parameter to @c true and the @c completion_status
+  /// parameter to the finalization status of the commit process,
+  /// assuming the method returning @c Status::OK(). The happy case is when
+  /// the method returns @c Status::OK(), @c is_complete is set to @c true and
+  /// @c completion_status is set to @c Status::OK() -- that means the
+  /// transaction has successfully finalized its commit phase.
+  ///
+  /// @param [out] is_complete
+  ///   Whether the process of finalizing the commit of the transaction has
+  ///   ended, including both success and failure outcomes. In other words,
+  ///   the value of this out parameter indicates whether the finalization
+  ///   of the transaction's commit phase is no longer in progress: it already
+  ///   succeeded or failed by the time of processing the request.
+  ///   This parameter is assigned a meaningful value iff the method returns
+  ///   @c Status::OK().
+  /// @param [out] commit_status
+  ///   The status of finalization of the transaction's commit phase:
+  ///     @li Status::OK() if the commit phase successfully finalized
+  ///     @li non-OK status if the commit phase failed to finalize
+  ///   This parameter is assigned a meaningful value iff the method returns
+  ///   @c Status::OK().
+  /// @return The result status of querying the transaction's commit status.
+  ///   Both @c is_complete and @c completion_status are set iff the method
+  ///   returns @c Status::OK().
+  Status IsCommitComplete(bool* is_complete,
+                          Status* completion_status) WARN_UNUSED_RESULT;
+
+  /// Rollback/abort the transaction.
+  ///
+  /// @return Operation result status.
+  Status Rollback() WARN_UNUSED_RESULT;
+
+  /// Export the information on this transaction in a serialized form.
+  ///
+  /// The serialized information on a Kudu transaction can be passed among
+  /// different Kudu clients running at multiple nodes, so those separate
+  /// Kudu clients can perform operations to be a part of the same distributed
+  /// transaction. The resulting string is called "transaction token" and it
+  /// can be deserialized using the @c KuduTransaction::Deserialize() method.
+  /// The result of the latter is an instance of the @c KuduTransaction class
+  /// to work with in the run-time.
+  ///
+  /// This method doesn't perform any RPC under the hood.
+  ///
+  /// @note The representation of the data in the serialized form
+  ///   (i.e. the format of a Kudu transaction token) is an implementation
+  ///   detail, not a part of the public API.
+  ///
+  /// @param [out] serialized_txn
+  ///   Result string to output the serialized transaction information.
+  /// @return Operation result status.
+  Status Serialize(std::string* serialized_txn) const WARN_UNUSED_RESULT;
+
+  /// Re-create KuduTransaction object given its serialized representation.
+  ///
+  /// This method doesn't perform any RPC under the hood.
+  ///
+  /// @param [in] client
+  ///   Client instance to bound the result object to.
+  /// @param [in] serialized_txn
+  ///   String containing serialized representation of KuduTransaction object.
+  /// @param [out] txn
+  ///   The result KuduTransaction object, wrapped into a smart pointer.
+  /// @return Operation result status.
+  static Status Deserialize(const sp::shared_ptr<KuduClient>& client,
+                            const std::string& serialized_txn,
+                            sp::shared_ptr<KuduTransaction>* txn) WARN_UNUSED_RESULT;
+ private:
+  friend class KuduClient;
+  friend class KuduSession;
+  FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
+  FRIEND_TEST(ClientTest, TxnToken);
+
+  class KUDU_NO_EXPORT Data;
+
+  explicit KuduTransaction(const sp::shared_ptr<KuduClient>& client);
+  Data* data_; // Owned.
+};
+
 /// @brief A handle for a connection to a cluster.
 ///
 /// The KuduClient class represents a connection to a cluster. From the user
@@ -473,6 +603,26 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   /// @return A new session object; caller is responsible for destroying it.
   sp::shared_ptr<KuduSession> NewSession();
 
+  /// Start a multi-row transaction.
+  ///
+  /// This method results in an RPC sent to a Kudu cluster to begin a multi-row
+  /// distributed transaction. In case of success, the resulting transaction
+  /// handle is output into the 'txn' parameter. That handle can be used
+  /// to create a new @c KuduSession using the
+  /// @c NewSession(const sp::shared_ptr<KuduSession>&) method. To commit or
+  /// rollback all single-row write operations performed in the context of
+  /// the newly created transaction, use @c KuduTransaction::Commit() and
+  /// @c KuduTransaction::Rollback() methods correspondingly.
+  ///
+  /// @warning This method is experimental and may change or disappear in future.
+  ///
+  /// @param txn [out]
+  ///   The resulting @c KuduTransaction object wrapped into a smart pointer.
+  ///   This 'out' parameter is populated iff the operation to begin
+  ///   a transaction was successful.
+  /// @return The status of underlying "begin transaction" operation.
+  Status NewTransaction(sp::shared_ptr<KuduTransaction>* txn);
+
   /// @cond PRIVATE_API
 
   /// Get tablet information for a tablet by ID.
@@ -628,6 +778,8 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
 
   template <class ReqClass, class RespClass>
   friend class internal::AsyncLeaderMasterRpc;
+  template <class ReqClass, class RespClass>
+  friend class internal::AsyncRandomTxnManagerRpc;
 
   friend class ClientTest;
   friend class ConnectToClusterBaseTest;
@@ -2067,11 +2219,13 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
  private:
   class KUDU_NO_EXPORT Data;
 
+  friend class ClientTest;
   friend class KuduClient;
+  friend class KuduTransaction;
   friend class internal::Batcher;
-  friend class ClientTest;
-  FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
+
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
+  FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
   FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
 
   explicit KuduSession(const sp::shared_ptr<KuduClient>& client);
@@ -2606,7 +2760,8 @@ class KUDU_EXPORT KuduScanToken {
 
   /// Serialize the token into a string.
   ///
-  /// Deserialize with KuduScanToken::DeserializeIntoScanner().
+  /// The resulting string can be deserialized with
+  /// @c KuduScanToken::Deserialize() to
   ///
   /// @param [out] buf
   ///   Result string to output the serialized token.
diff --git a/src/kudu/client/transaction-internal.cc b/src/kudu/client/transaction-internal.cc
new file mode 100644
index 0000000..af0fddc
--- /dev/null
+++ b/src/kudu/client/transaction-internal.cc
@@ -0,0 +1,300 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/client/transaction-internal.h"
+
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/client/client-internal.h"
+#include "kudu/client/client.h"
+#include "kudu/client/session-internal.h"
+#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/client/txn_manager_proxy_rpc.h"
+#include "kudu/common/txn_id.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/txn_manager.pb.h"
+#include "kudu/master/txn_manager.proxy.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/transactions/transactions.pb.h"
+#include "kudu/util/async_util.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+using kudu::client::internal::AsyncRandomTxnManagerRpc;
+using kudu::rpc::BackoffType;
+using kudu::transactions::AbortTransactionRequestPB;
+using kudu::transactions::AbortTransactionResponsePB;
+using kudu::transactions::BeginTransactionRequestPB;
+using kudu::transactions::BeginTransactionResponsePB;
+using kudu::transactions::CommitTransactionRequestPB;
+using kudu::transactions::CommitTransactionResponsePB;
+using kudu::transactions::GetTransactionStateRequestPB;
+using kudu::transactions::GetTransactionStateResponsePB;
+using kudu::transactions::TxnManagerServiceProxy;
+using kudu::transactions::TxnStatePB;
+using kudu::transactions::TxnTokenPB;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+namespace client {
+
+namespace {
+MonoTime GetRpcDeadline(const KuduClient* c) {
+  return MonoTime::Now() + c->default_admin_operation_timeout();
+}
+} // anonymous namespace
+
+KuduTransaction::Data::Data(const sp::shared_ptr<KuduClient>& client)
+    : weak_client_(client),
+      txn_keep_alive_ms_(0) {
+  CHECK(client);
+}
+
+Status KuduTransaction::Data::CreateSession(sp::shared_ptr<KuduSession>* session) {
+  auto c = weak_client_.lock();
+  if (!c) {
+    return Status::IllegalState("associated KuduClient is gone");
+  }
+  // We could check for the transaction status here before trying to return
+  // a session for a transaction that has been committed or abored already.
+  // However, it would mean to incur an extra RPC to TxnManager which isn't
+  // a good idea if thinking about this at scale. So, since tablet servers
+  // should perform the same kind of verification while processing write
+  // operations issued from the context of this session anyways,
+  // there isn't much sense duplicating that at the client side.
+  sp::shared_ptr<KuduSession> ret(new KuduSession(c, txn_id_));
+  ret->data_->Init(ret);
+  *session = std::move(ret);
+  return Status::OK();
+}
+
+Status KuduTransaction::Data::Begin() {
+  auto c = weak_client_.lock();
+  if (!c) {
+    return Status::IllegalState("associated KuduClient is gone");
+  }
+  const auto deadline = GetRpcDeadline(c.get());
+  Synchronizer sync;
+  BeginTransactionRequestPB req;
+  BeginTransactionResponsePB resp;
+  AsyncRandomTxnManagerRpc<
+      BeginTransactionRequestPB, BeginTransactionResponsePB> rpc(
+      deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
+      &TxnManagerServiceProxy::BeginTransactionAsync, "BeginTransaction",
+      sync.AsStatusCallback());
+  rpc.SendRpc();
+  RETURN_NOT_OK(sync.Wait());
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+
+  // TODO(aserbin): start sending regular hearbeats for the started transaction
+  CHECK(resp.has_txn_id());
+  txn_id_ = resp.txn_id();
+  CHECK(resp.has_keepalive_millis());
+  txn_keep_alive_ms_ = resp.keepalive_millis();
+  CHECK(txn_id_.IsValid());
+
+  return Status::OK();
+}
+
+Status KuduTransaction::Data::Commit(bool wait) {
+  CHECK(txn_id_.IsValid());
+  auto c = weak_client_.lock();
+  if (!c) {
+    return Status::IllegalState("associated KuduClient is gone");
+  }
+  const auto deadline = GetRpcDeadline(c.get());
+  Synchronizer sync;
+  CommitTransactionRequestPB req;
+  req.set_txn_id(txn_id_);
+  CommitTransactionResponsePB resp;
+  AsyncRandomTxnManagerRpc<CommitTransactionRequestPB,
+                           CommitTransactionResponsePB> rpc(
+      deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
+      &TxnManagerServiceProxy::CommitTransactionAsync, "CommitTransaction",
+      sync.AsStatusCallback());
+  rpc.SendRpc();
+  RETURN_NOT_OK(sync.Wait());
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  if (wait) {
+    RETURN_NOT_OK(WaitForTxnCommitToFinalize(c.get(), deadline, txn_id_));
+  }
+  return Status::OK();
+}
+
+Status KuduTransaction::Data::IsCommitComplete(
+    bool* is_complete, Status* completion_status) {
+  DCHECK(is_complete);
+  DCHECK(completion_status);
+  CHECK(txn_id_.IsValid());
+  auto c = weak_client_.lock();
+  if (!c) {
+    return Status::IllegalState("associated KuduClient is gone");
+  }
+  const auto deadline = GetRpcDeadline(c.get());
+  return IsCommitCompleteImpl(
+      c.get(), deadline, txn_id_, is_complete, completion_status);
+}
+
+Status KuduTransaction::Data::Rollback() {
+  DCHECK(txn_id_.IsValid());
+  auto c = weak_client_.lock();
+  if (!c) {
+    return Status::IllegalState("associated KuduClient is gone");
+  }
+  const auto deadline = GetRpcDeadline(c.get());
+  Synchronizer sync;
+  AbortTransactionRequestPB req;
+  req.set_txn_id(txn_id_);
+  AbortTransactionResponsePB resp;
+  AsyncRandomTxnManagerRpc<AbortTransactionRequestPB,
+                           AbortTransactionResponsePB> rpc(
+      deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
+      &TxnManagerServiceProxy::AbortTransactionAsync, "AbortTransaction",
+      sync.AsStatusCallback());
+  rpc.SendRpc();
+  RETURN_NOT_OK(sync.Wait());
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  return Status::OK();
+}
+
+Status KuduTransaction::Data::Serialize(string* serialized_txn) const {
+  CHECK(txn_id_.IsValid());
+  DCHECK(serialized_txn);
+  TxnTokenPB token;
+  token.set_txn_id(txn_id_);
+  if (txn_keep_alive_ms_ > 0) {
+    token.set_keepalive_millis(txn_keep_alive_ms_);
+  }
+  if (!token.SerializeToString(serialized_txn)) {
+    return Status::Corruption("unable to serialize transaction information");
+  }
+  return Status::OK();
+}
+
+Status KuduTransaction::Data::Deserialize(
+    const sp::shared_ptr<KuduClient>& client,
+    const string& serialized_txn,
+    sp::shared_ptr<KuduTransaction>* txn) {
+  // TODO(aserbin): should the owner of the transaction be taken into account
+  //                as well, i.e. not allow other than the user that created
+  //                the transaction to deserialize its transaction token?
+  TxnTokenPB token;
+  if (!token.ParseFromString(serialized_txn)) {
+    return Status::Corruption("unable to deserialize transaction information");
+  }
+  if (!token.has_txn_id()) {
+    return Status::Corruption("transaction identifier is missing");
+  }
+  if (!token.has_keepalive_millis()) {
+    return Status::Corruption("keepalive information is missing");
+  }
+
+  sp::shared_ptr<KuduTransaction> ret(new KuduTransaction(client));
+  ret->data_->txn_id_ = token.txn_id();
+  ret->data_->txn_keep_alive_ms_ = token.keepalive_millis();
+  *txn = std::move(ret);
+
+  return Status::OK();
+}
+
+Status KuduTransaction::Data::IsCommitCompleteImpl(
+    KuduClient* client,
+    const MonoTime& deadline,
+    const TxnId& txn_id,
+    bool* is_complete,
+    Status* completion_status) {
+  DCHECK(client);
+  Synchronizer sync;
+  GetTransactionStateRequestPB req;
+  req.set_txn_id(txn_id);
+  GetTransactionStateResponsePB resp;
+  AsyncRandomTxnManagerRpc<GetTransactionStateRequestPB,
+                           GetTransactionStateResponsePB> rpc(
+      deadline, client, BackoffType::EXPONENTIAL, req, &resp,
+      &TxnManagerServiceProxy::GetTransactionStateAsync, "GetTransactionState",
+      sync.AsStatusCallback());
+  rpc.SendRpc();
+  RETURN_NOT_OK(sync.Wait());
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+
+  DCHECK(resp.has_state());
+  const auto state = resp.state();
+  switch (state) {
+    case TxnStatePB::OPEN:
+      *is_complete = false;
+      *completion_status = Status::IllegalState("transaction is still open");
+      break;
+    case TxnStatePB::ABORTED:
+      *is_complete = true;
+      *completion_status = Status::Aborted("transaction has been aborted");
+      break;
+    case TxnStatePB::COMMIT_IN_PROGRESS:
+      *is_complete = false;
+      *completion_status = Status::Incomplete("commit is still in progress");
+      break;
+    case TxnStatePB::COMMITTED:
+      *is_complete = true;
+      *completion_status = Status::OK();
+      break;
+    default: {
+        auto errmsg = Substitute("$0: unknown transaction state", state);
+        LOG(DFATAL) << errmsg;
+        return Status::IllegalState(errmsg);
+      }
+  }
+  return Status::OK();
+}
+
+Status KuduTransaction::Data::WaitForTxnCommitToFinalize(
+    KuduClient* client, const MonoTime& deadline, const TxnId& txn_id) {
+  return RetryFunc(
+      deadline,
+      "waiting for transaction commit to be completed",
+      "timed out waiting for transaction commit to finalize",
+      [&](const MonoTime& deadline, bool* retry) {
+        bool is_complete = false;
+        Status status;
+        const auto s = KuduTransaction::Data::IsCommitCompleteImpl(
+            client, deadline, txn_id, &is_complete, &status);
+        if (!s.ok()) {
+          *retry = false;
+          return s;
+        }
+        *retry = !is_complete;
+        return status;
+      });
+}
+
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/client/transaction-internal.h b/src/kudu/client/transaction-internal.h
new file mode 100644
index 0000000..0780fd4
--- /dev/null
+++ b/src/kudu/client/transaction-internal.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "kudu/client/client.h"
+#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/common/txn_id.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class MonoTime;
+
+namespace client {
+
+// This class contains the implementation the functionality behind
+// kudu::client::KuduTransaction.
+class KuduTransaction::Data {
+ public:
+  explicit Data(const sp::shared_ptr<KuduClient>& client);
+
+  Status CreateSession(sp::shared_ptr<KuduSession>* session);
+
+  Status Begin();
+  Status Commit(bool wait);
+  Status IsCommitComplete(bool* is_complete, Status* completion_status);
+  Status Rollback();
+
+  Status Serialize(std::string* serialized_txn) const;
+
+  static Status Deserialize(const sp::shared_ptr<KuduClient>& client,
+                            const std::string& serialized_txn,
+                            sp::shared_ptr<KuduTransaction>* txn);
+
+  static Status IsCommitCompleteImpl(
+      KuduClient* client,
+      const MonoTime& deadline,
+      const TxnId& txn_id,
+      bool* is_complete,
+      Status* completion_status);
+
+  static Status WaitForTxnCommitToFinalize(
+      KuduClient* client, const MonoTime& deadline, const TxnId& txn_id);
+
+  sp::weak_ptr<KuduClient> weak_client_;
+  TxnId txn_id_;
+  uint32_t txn_keep_alive_ms_;
+
+  DISALLOW_COPY_AND_ASSIGN(Data);
+};
+
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/client/txn_manager_proxy_rpc.cc b/src/kudu/client/txn_manager_proxy_rpc.cc
new file mode 100644
index 0000000..7ad38e0
--- /dev/null
+++ b/src/kudu/client/txn_manager_proxy_rpc.cc
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/client/txn_manager_proxy_rpc.h"
+
+#include <algorithm>
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/client/client-internal.h"
+#include "kudu/client/client.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/txn_manager.pb.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
+
+using kudu::rpc::BackoffType;
+using kudu::rpc::ErrorStatusPB;
+using kudu::rpc::ResponseCallback;
+using kudu::rpc::Rpc;
+using kudu::rpc::RpcController;
+using kudu::transactions::AbortTransactionRequestPB;
+using kudu::transactions::AbortTransactionResponsePB;
+using kudu::transactions::BeginTransactionRequestPB;
+using kudu::transactions::BeginTransactionResponsePB;
+using kudu::transactions::CommitTransactionRequestPB;
+using kudu::transactions::CommitTransactionResponsePB;
+using kudu::transactions::GetTransactionStateRequestPB;
+using kudu::transactions::GetTransactionStateResponsePB;
+using kudu::transactions::TxnManagerServiceProxy;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+namespace client {
+namespace internal {
+
+template <class ReqClass, class RespClass>
+AsyncRandomTxnManagerRpc<ReqClass, RespClass>::AsyncRandomTxnManagerRpc(
+    const MonoTime& deadline,
+    KuduClient* client,
+    BackoffType backoff,
+    const ReqClass& req,
+    RespClass* resp,
+    const std::function<void(TxnManagerServiceProxy*,
+                             const ReqClass&, RespClass*,
+                             RpcController*,
+                             const ResponseCallback&)>& func,
+    string rpc_name,
+    StatusCallback user_cb)
+    : Rpc(deadline, client->data_->messenger_, backoff),
+      client_(client), req_(&req), resp_(resp), func_(func),
+      rpc_name_(std::move(rpc_name)),
+      user_cb_(std::move(user_cb)) {
+  DCHECK(deadline.Initialized());
+}
+
+template <class ReqClass, class RespClass>
+void AsyncRandomTxnManagerRpc<ReqClass, RespClass>::SendRpc() {
+  // Reset the deadline for a single RPC, and make sure it hasn't passed.
+  // Ensure the RPC neither passes the default RPC deadline nor our overall
+  // deadline.
+  MonoTime now = MonoTime::Now();
+  MonoTime deadline = std::min(retrier().deadline(),
+                               now + client_->default_rpc_timeout());
+  if (deadline < now) {
+    SendRpcCb(Status::TimedOut("timed out after deadline expired"));
+    return;
+  }
+  RpcController* controller = mutable_retrier()->mutable_controller();
+  controller->Reset();
+  controller->set_deadline(deadline);
+  func_(client_->data_->txn_manager_proxy().get(), *req_, resp_, controller,
+        [this]() { this->SendRpcCb(Status::OK()); });
+}
+
+template <class ReqClass, class RespClass>
+string AsyncRandomTxnManagerRpc<ReqClass, RespClass>::ToString() const {
+  return rpc_name_;
+}
+
+template <class ReqClass, class RespClass>
+void AsyncRandomTxnManagerRpc<ReqClass, RespClass>::SendRpcCb(const Status& status) {
+  Status s = status;
+  if (RetryIfNecessary(&s)) {
+    return;
+  }
+  // Pull out any application-level errors and return them to the user.
+  if (s.ok() && resp_->has_error()) {
+    s = StatusFromPB(resp_->error().status());
+  }
+  user_cb_(s);
+}
+
+template <class ReqClass, class RespClass>
+void AsyncRandomTxnManagerRpc<ReqClass, RespClass>::ResetTxnManagerAndRetry(
+    const Status& status) {
+  // TODO(aserbin): implement switching to other TxnManager in case if
+  //                many are available.
+  mutable_retrier()->DelayedRetry(this, status);
+}
+
+template <class ReqClass, class RespClass>
+bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
+    Status* status) {
+  const auto retry_warning =
+      Substitute("re-attempting $0 request to one of TxnManagers", rpc_name_);
+  auto warn_on_retry = MakeScopedCleanup([&retry_warning] {
+    // NOTE: we pass in a ref to 'retry_warning' rather than evaluating it here
+    // because any of the below retries may end up completing and calling
+    // 'user_cb_', which may very well destroy this Rpc object and render
+    // 'rpc_name_' and 'client_' inaccessible.
+    KLOG_EVERY_N_SECS(WARNING, 1) << retry_warning;
+  });
+  const bool multi_txn_manager = client_->IsMultiMaster();
+  // Pull out the RPC status.
+  Status s = *status;
+  if (s.ok()) {
+    s = retrier().controller().status();
+  }
+
+  // First, parse any RPC errors that may have occurred during the connection
+  // negotiation.
+
+  // TODO(aserbin): address the case of expired authn token
+
+  // Service unavailable errors during negotiation may indicate that current
+  // TxnManager could not verify authn token. Simply retry the operation with
+  // another TxnManager (if available) or re-send the RPC to the same TxnManager
+  // a bit later.
+  if (s.IsServiceUnavailable()) {
+    if (multi_txn_manager) {
+      ResetTxnManagerAndRetry(s);
+    } else {
+      mutable_retrier()->DelayedRetry(this, s);
+    }
+    return true;
+  }
+
+  // Network errors may be caused by errors in connecting sockets, which could
+  // mean a TxnManager is down or doesn't exist. If there's another TxnManager
+  // to connect to, connect to it. Otherwise, don't bother retrying.
+  if (s.IsNetworkError()) {
+    if (multi_txn_manager) {
+      ResetTxnManagerAndRetry(s);
+      return true;
+    }
+  }
+  if (s.IsTimedOut()) {
+    // If establishing connection failed with time out error before the overall
+    // deadline for RPC operation, retry the operation; if multiple TxnManagers
+    // are available, try with another one.
+    if (MonoTime::Now() < retrier().deadline()) {
+      if (multi_txn_manager) {
+        ResetTxnManagerAndRetry(s);
+      } else {
+        mutable_retrier()->DelayedRetry(this, s);
+      }
+      return true;
+    }
+    // And if we've passed the overall deadline, we shouldn't retry.
+    s = s.CloneAndPrepend(Substitute("$0 timed out after deadline expired", rpc_name_));
+  }
+
+  // Next, parse RPC errors that happened after the connection succeeded.
+  // Note: RemoteErrors from the controller are guaranteed to also return error
+  // responses, per RpcController's contract (see rpc_controller.h).
+  const ErrorStatusPB* err = retrier().controller().error_response();
+  if (s.IsRemoteError()) {
+    CHECK(err);
+    if (err->has_code() &&
+        (err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
+         err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) {
+      // If TxnManager is too busy, try another one if it's around or try again
+      // later with the same TxnManager.
+      if (multi_txn_manager) {
+        ResetTxnManagerAndRetry(s);
+      } else {
+        mutable_retrier()->DelayedRetry(this, s);
+      }
+      return true;
+    }
+    if (err->unsupported_feature_flags_size() > 0) {
+      s = Status::NotSupported(Substitute("cluster does not support $0",
+                                          rpc_name_));
+    }
+  }
+
+  warn_on_retry.cancel();
+  *status = s;
+  return false;
+}
+
+template class AsyncRandomTxnManagerRpc<AbortTransactionRequestPB,
+                                        AbortTransactionResponsePB>;
+template class AsyncRandomTxnManagerRpc<BeginTransactionRequestPB,
+                                        BeginTransactionResponsePB>;
+template class AsyncRandomTxnManagerRpc<CommitTransactionRequestPB,
+                                        CommitTransactionResponsePB>;
+template class AsyncRandomTxnManagerRpc<GetTransactionStateRequestPB,
+                                        GetTransactionStateResponsePB>;
+
+} // namespace internal
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/client/txn_manager_proxy_rpc.h b/src/kudu/client/txn_manager_proxy_rpc.h
new file mode 100644
index 0000000..77b7d8d
--- /dev/null
+++ b/src/kudu/client/txn_manager_proxy_rpc.h
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <functional>
+#include <string>
+
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/util/status_callback.h"
+
+namespace kudu {
+
+class MonoTime;
+class Status;
+namespace rpc {
+class RpcController;
+}  // namespace rpc
+
+namespace transactions {
+class TxnManagerServiceProxy;
+} // namespace transactions
+
+namespace client {
+
+class KuduClient;
+
+namespace internal {
+
+// Encapsulates RPCs that target a randomly chosen TxnManager, handling retries
+// and reconnections.
+//
+// TODO(aserbin): implement the re-connection to TxnManagers
+template <class ReqClass, class RespClass>
+class AsyncRandomTxnManagerRpc : public rpc::Rpc {
+ public:
+  // The input 'client' will be used to call the asynchonous TxnManager's proxy
+  // function 'func' on the currently used random TxnManager, sending over
+  // 'req'. Upon successful completion of the RPC, 'resp' is populated with the
+  // RPC response. Various errors (e.g. from the RPC layer or from the
+  // application layer) will direct the RPC to be retried until 'deadline'
+  // is reached. If the final result is an error, 'resp' may not be set,
+  // or may have an application error set.
+  //
+  // 'user_cb' will be called on the final result of the RPC (either OK,
+  // TimedOut, or some other non-retriable error).
+  //
+  // 'rpc_name' is a descriptor for the RPC used to add more context to logs
+  // and error messages.
+  //
+  // Retries will be done according to the backoff type specified by 'backoff'.
+  AsyncRandomTxnManagerRpc(
+      const MonoTime& deadline,
+      KuduClient* client,
+      rpc::BackoffType backoff,
+      const ReqClass& req,
+      RespClass* resp,
+      const std::function<void(transactions::TxnManagerServiceProxy*,
+                               const ReqClass&, RespClass*,
+                               rpc::RpcController*,
+                               const rpc::ResponseCallback&)>& func,
+      std::string rpc_name,
+      StatusCallback user_cb);
+
+  // Send the RPC using the TxnManagerService proxy's asynchonous API, ensuring
+  // that neither the per-RPC deadline nor the overall deadline has passed.
+  void SendRpc() override;
+
+  std::string ToString() const override;
+
+ protected:
+  // Handles 'status', retrying if necessary, and calling the user-provided
+  // callback as appropriate.
+  void SendRpcCb(const Status& status) override;
+
+  void ResetTxnManagerAndRetry(const Status& status);
+
+  // Uses 'status' and the contents of the RPC controller and RPC response to
+  // determine whether reconnections or retries should be performed, and if so,
+  // performs them. Additionally, updates 'status' to include more information
+  // based on the state of the RPC.
+  //
+  // Retries take the following kinds of errors into account:
+  // - TimedOut errors that indicate the operation retrier has passed its
+  //   deadline (distinct from TimedOut errors that surface in the RPC layer)
+  // - RPC errors that come from a failed connection, in which case the
+  //   controller status will be non-OK
+  // - generic RPC errors, in which case the controller status will be a
+  //   RemoteError and the controller will have an error response
+  //
+  // Returns true if a reconnection and/or retry was required and has been
+  // scheduled, in which case callers should ensure that this object remains
+  // alive.
+  bool RetryIfNecessary(Status* status);
+
+  KuduClient* client_;
+  const ReqClass* req_;
+  RespClass* resp_;
+
+  // Asynchronous function that sends an RPC to current TxnManager.
+  const std::function<void(transactions::TxnManagerServiceProxy*,
+                           const ReqClass&, RespClass*,
+                           rpc::RpcController*,
+                           const rpc::ResponseCallback&)> func_;
+
+  // Name of the RPC being sent. Since multiple template instantiations may
+  // exist for the same proxy function, this need not be exactly the proxy
+  // function name.
+  const std::string rpc_name_;
+
+  // Callback to call upon completion of the operation (whether the RPC itself
+  // was successful or not).
+  const StatusCallback user_cb_;
+};
+
+} // namespace internal
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/common/txn_id.h b/src/kudu/common/txn_id.h
index c7cd50e..5bbd63a 100644
--- a/src/kudu/common/txn_id.h
+++ b/src/kudu/common/txn_id.h
@@ -30,7 +30,7 @@ class TxnId {
   static const TxnId kInvalidTxnId;
 
   TxnId() noexcept : id_(-1) {}
-  explicit TxnId(int64_t id) noexcept;
+  TxnId(int64_t id) noexcept; // NOLINT
 
   int64_t value() const;
   bool IsValid() const { return id_ >= 0; }
@@ -48,7 +48,7 @@ class TxnId {
   friend bool operator>(const TxnId& lhs, const TxnId& rhs);
   friend bool operator>=(const TxnId& lhs, const TxnId& rhs);
 
-  const int64_t id_;
+  int64_t id_;
 };
 
 std::ostream& operator<<(std::ostream& o, const TxnId& txn_id);
diff --git a/src/kudu/transactions/transactions.proto b/src/kudu/transactions/transactions.proto
index 7013935..8d9d9a9 100644
--- a/src/kudu/transactions/transactions.proto
+++ b/src/kudu/transactions/transactions.proto
@@ -42,3 +42,17 @@ message TxnParticipantEntryPB {
   optional TxnStatePB state = 1;
 }
 
+// High-level metadata about a transaction. Serialized messages of this type
+// are used to pass information on an already opened transaction among multiple
+// Kudu clients (even if residing on different nodes), so they can issue write
+// operations in the context of the same multi-row distributed transaction.
+message TxnTokenPB {
+  // Transaction identifier assigned to the transaction.
+  optional int64 txn_id = 1;
+
+  // The keep-alive interval (in milliseconds) to keep the transaction alive.
+  // To avoid auto-aborting the transaction, TxnManager should receive
+  // keep-alive heartbeats spaced by intervals equal or shorter than
+  // 'keepalive_millis' milliseconds in duration.
+  optional uint32 keepalive_millis = 2;
+}