You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/12/12 03:08:34 UTC

[kudu] 03/03: KUDU-2612 keep-alive txn heartbeating for C++ client

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e75aad09a8dbe295a11cc2042903b74faf72623e
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sun Nov 22 18:46:03 2020 -0800

    KUDU-2612 keep-alive txn heartbeating for C++ client
    
    This patch adds keep-alive txn heartbeating into the Kudu C++ client.
    
    The txn keepalive heartbeating is performed automatically by the client,
    and no API is exposed to send keep-alive messages for a transaction.
    The txn keepalive heartbeating continues until the original transaction
    handle (i.e. the handle created by KuduClient::NewTransaction()) goes
    out of scope.  In contrast, if the transaction handle is created by
    KuduTransaction::Deserialize(), the keepalive messages are or aren't
    sent depending on the KuduTransactionSerializer::enable_keepalive()
    setting when serializing the source handle.  By default, keepalive
    messages are not sent for deserialized transaction handles.  This is
    because the most common use case for multiple actors working in the
    context of the same transaction is supposed to be of the
    "star topology", when a transaction is started and committed by
    a top-level application who shares the context of the transaction with
    other applications which only submit their data, but don't manage the
    lifecycle of the transaction.
    
    This patch also contains a couple of test scenarios to cover the
    newly introduced functionality.
    
    Change-Id: I0283d8e16908f641388f7a30b513a672df27a186
    Reviewed-on: http://gerrit.cloudera.org:8080/16779
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/client/client-test.cc                     | 118 ++++++++++++-
 src/kudu/client/client.cc                          |  26 ++-
 src/kudu/client/client.h                           | 129 +++++++++++---
 src/kudu/client/transaction-internal.cc            | 194 +++++++++++++++++++--
 src/kudu/client/transaction-internal.h             |  45 ++++-
 src/kudu/client/txn_manager_proxy_rpc.cc           |  23 ++-
 src/kudu/client/txn_manager_proxy_rpc.h            |   3 +
 .../integration-tests/txn_status_manager-itest.cc  |  43 ++++-
 src/kudu/transactions/transactions.proto           |   4 +
 9 files changed, 514 insertions(+), 71 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 2047d03..c1087ce 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -169,6 +169,8 @@ DECLARE_int64(on_disk_size_for_testing);
 DECLARE_string(location_mapping_cmd);
 DECLARE_string(superuser_acl);
 DECLARE_string(user_acl);
+DECLARE_uint32(txn_keepalive_interval_ms);
+DECLARE_uint32(txn_staleness_tracker_interval_ms);
 DECLARE_uint32(txn_manager_status_table_num_replicas);
 DEFINE_int32(test_scan_num_rows, 1000, "Number of rows to insert and scan");
 
@@ -246,6 +248,15 @@ class ClientTest : public KuduTest {
     // replica of the transaction status table.
     FLAGS_txn_manager_status_table_num_replicas = 1;
 
+    // Speed up test scenarios which are related to txn keepalive interval.
+#if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
+    FLAGS_txn_keepalive_interval_ms = 1500;
+    FLAGS_txn_staleness_tracker_interval_ms = 300;
+#else
+    FLAGS_txn_keepalive_interval_ms = 250;
+    FLAGS_txn_staleness_tracker_interval_ms = 50;
+#endif
+
     SetLocationMappingCmd();
 
     // Start minicluster and wait for tablet servers to connect to master.
@@ -427,7 +438,7 @@ class ClientTest : public KuduTest {
   //                once the transaction orchestration is implemented
   Status FinalizeCommitTransaction(const shared_ptr<KuduTransaction>& txn) {
     string txn_token;
-    RETURN_NOT_OK(txn->Serialize(&txn_token));
+    RETURN_NOT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
     TxnTokenPB token;
     CHECK(token.ParseFromString(txn_token));
     CHECK(token.has_txn_id());
@@ -7126,7 +7137,7 @@ TEST_F(ClientTest, TxnCommit) {
       ASSERT_FALSE(is_complete);
       ASSERT_TRUE(cs.IsIncomplete()) << cs.ToString();
       ASSERT_STR_CONTAINS(cs.ToString(), "commit is still in progress");
-      ASSERT_OK(txn->Serialize(&txn_token));
+      ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
     }
 
     // Make sure the transaction isn't aborted once its KuduTransaction handle
@@ -7250,12 +7261,12 @@ TEST_F(ClientTest, TxnToken) {
   ASSERT_GT(txn_keepalive_ms, 0);
 
   string txn_token;
-  ASSERT_OK(txn->Serialize(&txn_token));
+  ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
 
   // Serializing the same transaction again produces the same result.
   {
     string token;
-    ASSERT_OK(txn->Serialize(&token));
+    ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&token));
     ASSERT_EQ(txn_token, token);
   }
 
@@ -7278,7 +7289,7 @@ TEST_F(ClientTest, TxnToken) {
   // Make sure the KuduTransaction object deserialized from a token is fully
   // functional.
   string serdes_txn_token;
-  ASSERT_OK(serdes_txn->Serialize(&serdes_txn_token));
+  ASSERT_OK(KuduTransactionSerializer(serdes_txn).Serialize(&serdes_txn_token));
   ASSERT_EQ(txn_token, serdes_txn_token);
 
   // TODO(awong): remove once we register participants automatically before
@@ -7294,7 +7305,7 @@ TEST_F(ClientTest, TxnToken) {
     // The state of a transaction isn't stored in the token, so initiating
     // commit of the transaction doesn't change the result of the serialization.
     string token;
-    ASSERT_OK(serdes_txn->Serialize(&token));
+    ASSERT_OK(KuduTransactionSerializer(serdes_txn).Serialize(&token));
     ASSERT_EQ(serdes_txn_token, token);
   }
 
@@ -7302,14 +7313,14 @@ TEST_F(ClientTest, TxnToken) {
   shared_ptr<KuduTransaction> other_txn;
   ASSERT_OK(client_->NewTransaction(&other_txn));
   string other_txn_token;
-  ASSERT_OK(other_txn->Serialize(&other_txn_token));
+  ASSERT_OK(KuduTransactionSerializer(other_txn).Serialize(&other_txn_token));
   ASSERT_NE(txn_token, other_txn_token);
 
   // The state of a transaction isn't stored in the token, so aborting
   // the doesn't change the result of the serialization.
   string token;
   ASSERT_OK(other_txn->Rollback());
-  ASSERT_OK(other_txn->Serialize(&token));
+  ASSERT_OK(KuduTransactionSerializer(other_txn).Serialize(&token));
   ASSERT_EQ(other_txn_token, token);
 }
 
@@ -7322,7 +7333,7 @@ TEST_F(ClientTest, AttemptToControlTxnByOtherUser) {
   shared_ptr<KuduTransaction> txn;
   ASSERT_OK(client_->NewTransaction(&txn));
   string txn_token;
-  ASSERT_OK(txn->Serialize(&txn_token));
+  ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
 
   // Transaction identifier is surfacing here only to build the reference error
   // message for Status::NotAuthorized() returned by attempts to perform
@@ -7414,6 +7425,95 @@ TEST_F(ClientTxnManagerProxyTest, RetryOnServiceUnavailable) {
   ASSERT_OK(client_->NewTransaction(&txn));
 }
 
+TEST_F(ClientTest, TxnKeepAlive) {
+  // Begin a transaction and wait for longer than the keepalive interval
+  // (with some margin). If there were no txn keepalive messages sent,
+  // the transaction would be automatically aborted. Since the client
+  // sends keepalive heartbeats under the hood, it's still possible to commit
+  // the transaction after some period of inactivity.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+
+    SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
+
+    ASSERT_OK(txn->Commit(false /* wait */));
+    ASSERT_OK(FinalizeCommitTransaction(txn));
+  }
+
+  // Begin a transaction and move its KuduTransaction object out of the
+  // scope. After txn keepalive interval (with some margin) the system should
+  // automatically abort the transaction.
+  {
+    string txn_token;
+    {
+      shared_ptr<KuduTransaction> txn;
+      ASSERT_OK(client_->NewTransaction(&txn));
+      ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
+    }
+
+    SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
+
+    // The transaction should be automatically aborted since no keepalive
+    // requests were sent once the original KuduTransaction object went out
+    // of the scope.
+    shared_ptr<KuduTransaction> serdes_txn;
+    ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn));
+    auto s = serdes_txn->Commit(false /* wait */);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORTED");
+  }
+
+  // Begin a new transaction and move the KuduTransaction object out of the
+  // scope. If the transaction handle is deserialized from a txn token that
+  // hadn't keepalive enabled when serialized, the transaction should be
+  // automatically aborted after txn keepalive timeout.
+  {
+    string txn_token;
+    {
+      shared_ptr<KuduTransaction> txn;
+      ASSERT_OK(client_->NewTransaction(&txn));
+      ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
+    }
+
+    shared_ptr<KuduTransaction> serdes_txn;
+    ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn));
+    ASSERT_NE(nullptr, serdes_txn.get());
+
+    SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
+
+    auto s = serdes_txn->Commit(false /* wait */);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORTED");
+  }
+
+  // Begin a new transaction and move the KuduTransaction object out of the
+  // scope. If the transaction handle is deserialized from a txn token that
+  // had keepalive enabled when serialized, the transaction should stay alive
+  // even if the original KuduTransaction object went out of the scope.
+  // It's assumed that no longer than the keepalive timeout interval has passed
+  // between the original transaction handle got out of scope and the new
+  // one has been created from the token.
+  {
+    string txn_token;
+    {
+      shared_ptr<KuduTransaction> txn;
+      ASSERT_OK(client_->NewTransaction(&txn));
+      ASSERT_OK(KuduTransactionSerializer(txn)
+                .enable_keepalive(true)
+                .Serialize(&txn_token));
+    }
+
+    shared_ptr<KuduTransaction> serdes_txn;
+    ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn));
+
+    SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
+
+    ASSERT_OK(serdes_txn->Commit(false /* wait */));
+    ASSERT_OK(FinalizeCommitTransaction(serdes_txn));
+  }
+}
+
 // Client test that assigns locations to clients and tablet servers.
 // For now, assigns a uniform location to all clients and tablet servers.
 class ClientWithLocationTest : public ClientTest {
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 61e82ca..ed25587 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -29,7 +29,6 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
-#include <boost/type_traits/decay.hpp>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/common.h>
 
@@ -425,16 +424,31 @@ Status KuduTransaction::Rollback() {
   return data_->Rollback();
 }
 
-Status KuduTransaction::Serialize(string* serialized_txn) const {
-  return data_->Serialize(serialized_txn);
-}
-
 Status KuduTransaction::Deserialize(const sp::shared_ptr<KuduClient>& client,
                                     const string& serialized_txn,
                                     sp::shared_ptr<KuduTransaction>* txn) {
   return Data::Deserialize(client, serialized_txn, txn);
 }
 
+KuduTransactionSerializer::KuduTransactionSerializer(
+    const sp::shared_ptr<KuduTransaction>& txn)
+    : data_(new KuduTransactionSerializer::Data(txn)) {
+}
+
+KuduTransactionSerializer::~KuduTransactionSerializer() {
+  delete data_;
+}
+
+KuduTransactionSerializer&
+KuduTransactionSerializer::enable_keepalive(bool enable) {
+  data_->enable_keepalive(enable);
+  return *this;
+}
+
+Status KuduTransactionSerializer::Serialize(string* serialized_txn) const {
+  return data_->Serialize(serialized_txn);
+}
+
 KuduClient::KuduClient()
   : data_(new KuduClient::Data()) {
   static ObjectIdGenerator oid_generator;
@@ -566,7 +580,7 @@ shared_ptr<KuduSession> KuduClient::NewSession() {
 
 Status KuduClient::NewTransaction(sp::shared_ptr<KuduTransaction>* txn) {
   shared_ptr<KuduTransaction> ret(new KuduTransaction(shared_from_this()));
-  const auto s = ret->data_->Begin();
+  const auto s = ret->data_->Begin(ret);
   if (s.ok()) {
     *txn = std::move(ret);
   }
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 66f7dbb..22d63c5 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -333,6 +333,11 @@ class KUDU_EXPORT KuduClientBuilder {
 /// @c KuduTransaction instance can be used to commit or rollback the underlying
 /// multi-row transaction and create a transactional session.
 ///
+/// @note The @c KuduTransaction should be kept in scope to maintain automatic
+///   keep-alive heartbeating for the corresponding transaction. Once this
+///   object goes out of scope, the heartbeating stops and the transaction may
+///   automatically be aborted soon if no other clients do the heartbeating.
+///
 /// @note There isn't any automation to rollback or commit the underlying
 ///   transaction upon destruction of an instance of this class.
 ///
@@ -409,30 +414,13 @@ class KUDU_EXPORT KuduTransaction :
   /// @return Operation result status.
   Status Rollback() WARN_UNUSED_RESULT;
 
-  /// Export the information on this transaction in a serialized form.
-  ///
-  /// The serialized information on a Kudu transaction can be passed among
-  /// different Kudu clients running at multiple nodes, so those separate
-  /// Kudu clients can perform operations to be a part of the same distributed
-  /// transaction. The resulting string is called "transaction token" and it
-  /// can be deserialized using the @c KuduTransaction::Deserialize() method.
-  /// The result of the latter is an instance of the @c KuduTransaction class
-  /// to work with in the run-time.
-  ///
-  /// This method doesn't perform any RPC under the hood.
-  ///
-  /// @note The representation of the data in the serialized form
-  ///   (i.e. the format of a Kudu transaction token) is an implementation
-  ///   detail, not a part of the public API.
-  ///
-  /// @param [out] serialized_txn
-  ///   Result string to output the serialized transaction information.
-  /// @return Operation result status.
-  Status Serialize(std::string* serialized_txn) const WARN_UNUSED_RESULT;
-
   /// Re-create KuduTransaction object given its serialized representation.
   ///
-  /// This method doesn't perform any RPC under the hood.
+  /// This method doesn't perform any RPC under the hood. The newly created
+  /// object automatically does or does not send keep-alive messages depending
+  /// on the @c KuduTransactionSerializer::enable_keepalive() setting when
+  /// the original @c KuduTransaction object was serialized using
+  /// @c KuduTransactionSerializer::Serialize().
   ///
   /// @param [in] client
   ///   Client instance to bound the result object to.
@@ -447,6 +435,7 @@ class KUDU_EXPORT KuduTransaction :
  private:
   friend class KuduClient;
   friend class KuduSession;
+  friend class KuduTransactionSerializer;
   FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
   FRIEND_TEST(ClientTest, TxnToken);
 
@@ -456,6 +445,96 @@ class KUDU_EXPORT KuduTransaction :
   Data* data_; // Owned.
 };
 
+/// A utility class to help with serialization of @c KuduTransaction handles.
+///
+/// The purpose of this class is to control the keepalive behavior for the
+/// @c KuduTransaction handle once it's deserialized from the token. In future,
+/// the list of configurable parameters might be extended (e.g., by adding
+/// commit and abort permissions, i.e. whether a handle can be used to
+/// commit and/or abort the transaction).
+class KUDU_EXPORT KuduTransactionSerializer {
+ public:
+  /// Create a serializer for the specified @c KuduTransaction object.
+  ///
+  /// @param [out] txn
+  ///   Smart pointer to the @c KuduTransaction object to be serialized
+  ///   by the @c Serialize() method.
+  explicit KuduTransactionSerializer(const sp::shared_ptr<KuduTransaction>& txn);
+
+  ~KuduTransactionSerializer();
+
+  /// Whether to enable sending keepalive messages for the @c KuduTransaction
+  /// handle when it's deserialized from the string produced by
+  /// @c KuduTransactionSerializer::Serialize().
+  ///
+  /// No keepalive heartbeat messages are sent from a transaction handle whose
+  /// source token was created with the default "keepalive disabled" setting.
+  /// The idea here is that the most common use case for using transaction
+  /// tokens is of the "start topology" (see below), so it's enough to have
+  /// just one top-level handle sending keepalive messages. Overall, having more
+  /// than one actor sending keepalive messages for a transaction is acceptable
+  /// but it puts needless load on a cluster.
+  ///
+  /// The most common use case for a transaction's handle
+  /// serialization/deserialization is of the "star topology": a transaction is
+  /// started by a top-level application which sends the transaction token
+  /// produced by serializing the original transaction handle to other worker
+  /// applications running concurrently, where the latter write their data
+  /// in the context of the same transaction and report back to the top-level
+  /// application, which in its turn initiates committing the transaction
+  /// as needed. The important point is that the top-level application keeps the
+  /// transaction handle around all the time from the start of the transaction
+  /// to the very point when transaction is committed. Under the hood, the
+  /// original transaction handle sends keepalive messages as required until
+  /// commit phase is initiated, so the deserialized transaction handles which
+  /// are used by the worker applications don't need to send keepalive messages.
+  ///
+  /// The other (less common) use case is of the "ring topology": a chain of
+  /// applications work sequentially as a part of the same transaction, where
+  /// the very first application starts the transaction, writes its data, and
+  /// hands over the responsibility of managing the lifecycle of the transaction
+  /// to other application down the chain. After doing so it may exit, so now
+  /// only the next application has the active transaction handle, and so on it
+  /// goes until the transaction is committed by the application in the end
+  /// of the chain. In this scenario, every deserialized handle have to send
+  /// keepalive messages to avoid automatic rollback of the transaction,
+  /// and every application in the chain should call
+  /// @c KuduTransactionSerializer::enable_keepalive() when serializing its
+  /// transaction handle into a transaction token to pass to the application
+  /// next in the chain.
+  ///
+  /// @param [in] enable
+  ///   Whether to enable sending keepalive messages for the @c KuduTransaction
+  ///   object once it's deserialized from the string to be produced by
+  ///   @c KuduTransactionSerializer::Serialize().
+  /// @return Reference to the updated object.
+  KuduTransactionSerializer& enable_keepalive(bool enable);
+
+  /// Export the information on this transaction in a serialized form.
+  ///
+  /// The serialized information on a Kudu transaction can be passed among
+  /// different Kudu clients running at multiple nodes, so those separate
+  /// Kudu clients can perform operations to be a part of the same distributed
+  /// transaction. The resulting string is referred as "transaction token" and
+  /// it can be deserialized into an object of the @c KuduTransaction class via
+  /// the @c KuduTransaction::Deserialize() method.
+  ///
+  /// This method doesn't perform any RPC under the hood.
+  ///
+  /// @note The representation of the data in the serialized form
+  ///   (i.e. the format of a Kudu transaction token) is an implementation
+  ///   detail, not a part of the public API.
+  ///
+  /// @param [out] serialized_txn
+  ///   Result string to output the serialized transaction information.
+  /// @return Operation result status.
+  Status Serialize(std::string* serialized_txn) const WARN_UNUSED_RESULT;
+
+ private:
+  class KUDU_NO_EXPORT Data;
+  Data* data_; // Owned.
+};
+
 /// @brief A handle for a connection to a cluster.
 ///
 /// The KuduClient class represents a connection to a cluster. From the user
@@ -618,6 +697,11 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   /// the newly created transaction, use @c KuduTransaction::Commit() and
   /// @c KuduTransaction::Rollback() methods correspondingly.
   ///
+  /// @note The newly created object starts sending keep-alive messages for
+  ///   the newly opened transaction as required by the keep-alive interval
+  ///   assigned to the transaction by the system. To keep the heartbeating,
+  ///   the newly created @c KuduTransaction should be kept in scope.
+  ///
   /// @warning This method is experimental and may change or disappear in future.
   ///
   /// @param txn [out]
@@ -789,6 +873,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   friend class ConnectToClusterBaseTest;
   friend class KuduClientBuilder;
   friend class KuduPartitionerBuilder;
+  friend class KuduTransaction;
   friend class KuduScanToken;
   friend class KuduScanTokenBuilder;
   friend class KuduScanner;
diff --git a/src/kudu/client/transaction-internal.cc b/src/kudu/client/transaction-internal.cc
index af0fddc..31e09ce 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -17,11 +17,11 @@
 
 #include "kudu/client/transaction-internal.h"
 
+#include <algorithm>
 #include <functional>
 #include <memory>
 #include <ostream>
 #include <string>
-#include <utility>
 
 #include <glog/logging.h>
 
@@ -32,15 +32,18 @@
 #include "kudu/client/txn_manager_proxy_rpc.h"
 #include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/txn_manager.pb.h"
 #include "kudu/master/txn_manager.proxy.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc.h"
 #include "kudu/transactions/transactions.pb.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
 
 using kudu::client::internal::AsyncRandomTxnManagerRpc;
 using kudu::rpc::BackoffType;
@@ -52,10 +55,13 @@ using kudu::transactions::CommitTransactionRequestPB;
 using kudu::transactions::CommitTransactionResponsePB;
 using kudu::transactions::GetTransactionStateRequestPB;
 using kudu::transactions::GetTransactionStateResponsePB;
+using kudu::transactions::KeepTransactionAliveRequestPB;
+using kudu::transactions::KeepTransactionAliveResponsePB;
 using kudu::transactions::TxnManagerServiceProxy;
 using kudu::transactions::TxnStatePB;
 using kudu::transactions::TxnTokenPB;
 using std::string;
+using std::unique_ptr;
 using strings::Substitute;
 
 namespace kudu {
@@ -91,7 +97,7 @@ Status KuduTransaction::Data::CreateSession(sp::shared_ptr<KuduSession>* session
   return Status::OK();
 }
 
-Status KuduTransaction::Data::Begin() {
+Status KuduTransaction::Data::Begin(const sp::shared_ptr<KuduTransaction>& txn) {
   auto c = weak_client_.lock();
   if (!c) {
     return Status::IllegalState("associated KuduClient is gone");
@@ -100,6 +106,7 @@ Status KuduTransaction::Data::Begin() {
   Synchronizer sync;
   BeginTransactionRequestPB req;
   BeginTransactionResponsePB resp;
+  // TODO(aserbin): should there be no retries for sending keepalive heartbeats?
   AsyncRandomTxnManagerRpc<
       BeginTransactionRequestPB, BeginTransactionResponsePB> rpc(
       deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
@@ -111,18 +118,33 @@ Status KuduTransaction::Data::Begin() {
     return StatusFromPB(resp.error().status());
   }
 
-  // TODO(aserbin): start sending regular hearbeats for the started transaction
-  CHECK(resp.has_txn_id());
+  DCHECK(resp.has_txn_id());
   txn_id_ = resp.txn_id();
-  CHECK(resp.has_keepalive_millis());
+  DCHECK(txn_id_.IsValid());
+  DCHECK(resp.has_keepalive_millis());
   txn_keep_alive_ms_ = resp.keepalive_millis();
-  CHECK(txn_id_.IsValid());
+  DCHECK_GT(txn_keep_alive_ms_, 0);
+
+  // Start sending regular heartbeats for the new transaction.
+  auto next_run_after = MonoDelta::FromMilliseconds(
+      std::max<uint32_t>(1, txn_keep_alive_ms_ / 2));
+  auto m = c->data_->messenger_;
+  if (PREDICT_FALSE(!m)) {
+    return Status::IllegalState("null messenger in Kudu client");
+  }
+
+  sp::weak_ptr<KuduTransaction> weak_txn(txn);
+  m->ScheduleOnReactor(
+      [weak_txn](const Status& s) {
+        SendTxnKeepAliveTask(s, weak_txn);
+      },
+      next_run_after);
 
   return Status::OK();
 }
 
 Status KuduTransaction::Data::Commit(bool wait) {
-  CHECK(txn_id_.IsValid());
+  DCHECK(txn_id_.IsValid());
   auto c = weak_client_.lock();
   if (!c) {
     return Status::IllegalState("associated KuduClient is gone");
@@ -152,7 +174,7 @@ Status KuduTransaction::Data::IsCommitComplete(
     bool* is_complete, Status* completion_status) {
   DCHECK(is_complete);
   DCHECK(completion_status);
-  CHECK(txn_id_.IsValid());
+  DCHECK(txn_id_.IsValid());
   auto c = weak_client_.lock();
   if (!c) {
     return Status::IllegalState("associated KuduClient is gone");
@@ -186,14 +208,14 @@ Status KuduTransaction::Data::Rollback() {
   return Status::OK();
 }
 
-Status KuduTransaction::Data::Serialize(string* serialized_txn) const {
-  CHECK(txn_id_.IsValid());
+Status KuduTransaction::Data::Serialize(string* serialized_txn,
+                                        bool enable_keepalive) const {
   DCHECK(serialized_txn);
+  DCHECK(txn_id_.IsValid());
   TxnTokenPB token;
   token.set_txn_id(txn_id_);
-  if (txn_keep_alive_ms_ > 0) {
-    token.set_keepalive_millis(txn_keep_alive_ms_);
-  }
+  token.set_keepalive_millis(txn_keep_alive_ms_);
+  token.set_enable_keepalive(enable_keepalive);
   if (!token.SerializeToString(serialized_txn)) {
     return Status::Corruption("unable to serialize transaction information");
   }
@@ -204,6 +226,8 @@ Status KuduTransaction::Data::Deserialize(
     const sp::shared_ptr<KuduClient>& client,
     const string& serialized_txn,
     sp::shared_ptr<KuduTransaction>* txn) {
+  DCHECK(client);
+
   // TODO(aserbin): should the owner of the transaction be taken into account
   //                as well, i.e. not allow other than the user that created
   //                the transaction to deserialize its transaction token?
@@ -219,10 +243,28 @@ Status KuduTransaction::Data::Deserialize(
   }
 
   sp::shared_ptr<KuduTransaction> ret(new KuduTransaction(client));
-  ret->data_->txn_id_ = token.txn_id();
   ret->data_->txn_keep_alive_ms_ = token.keepalive_millis();
-  *txn = std::move(ret);
+  DCHECK_GT(ret->data_->txn_keep_alive_ms_, 0);
+  ret->data_->txn_id_ = token.txn_id();
+  DCHECK(ret->data_->txn_id_.IsValid());
+
+  // Start sending periodic txn keepalive requests for the deserialized
+  // transaction, as specified in the source txn token.
+  if (token.has_enable_keepalive() && token.enable_keepalive()) {
+    auto m = client->data_->messenger_;
+    if (PREDICT_TRUE(m)) {
+      sp::weak_ptr<KuduTransaction> weak_txn(ret);
+      auto next_run_after = MonoDelta::FromMilliseconds(
+          std::max<uint32_t>(1, ret->data_->txn_keep_alive_ms_ / 2));
+      m->ScheduleOnReactor(
+          [weak_txn](const Status& s) {
+            SendTxnKeepAliveTask(s, weak_txn);
+          },
+          next_run_after);
+    }
+  }
 
+  *txn = std::move(ret);
   return Status::OK();
 }
 
@@ -296,5 +338,127 @@ Status KuduTransaction::Data::WaitForTxnCommitToFinalize(
       });
 }
 
+// A structure to pass around metadata on KeepTransactionAlive() when invoking
+// the RPC asynchronously.
+struct KeepaliveRpcCtx {
+  KeepTransactionAliveResponsePB resp;
+  unique_ptr<AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+                                      KeepTransactionAliveResponsePB>> rpc;
+  sp::weak_ptr<KuduTransaction> weak_txn;
+};
+
+void KuduTransaction::Data::SendTxnKeepAliveTask(
+    const Status& status,
+    sp::weak_ptr<KuduTransaction> weak_txn) {
+  VLOG(2) << Substitute("SendTxnKeepAliveTask() is run");
+  if (PREDICT_FALSE(!status.ok())) {
+    // This means there was an error executing the task on reactor. As of now,
+    // this can only happen if the reactor is being shutdown and the task is
+    // de-scheduled from the queue, so the only possible error status here is
+    // Status::Aborted().
+    VLOG(1) << Substitute("SendTxnKeepAliveTask did not run: $0",
+                          status.ToString());
+    DCHECK(status.IsAborted());
+    return;
+  }
+
+  // Check if the transaction object is still around.
+  sp::shared_ptr<KuduTransaction> txn(weak_txn.lock());
+  if (PREDICT_FALSE(!txn)) {
+    return;
+  }
+
+  auto c = txn->data_->weak_client_.lock();
+  if (PREDICT_FALSE(!c)) {
+    return;
+  }
+
+  const auto& txn_id = txn->data_->txn_id_;
+  const auto next_run_after = MonoDelta::FromMilliseconds(
+      std::max<uint32_t>(1, txn->data_->txn_keep_alive_ms_ / 2));
+  auto deadline = MonoTime::Now() + next_run_after;
+  KeepTransactionAliveRequestPB req;
+  req.set_txn_id(txn_id);
+
+  sp::shared_ptr<KeepaliveRpcCtx> ctx(new KeepaliveRpcCtx);
+  ctx->weak_txn = weak_txn;
+
+  unique_ptr<AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+                                      KeepTransactionAliveResponsePB>> rpc(
+      new AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+                                   KeepTransactionAliveResponsePB>(
+          deadline, c.get(), BackoffType::LINEAR,
+          req, &ctx->resp,
+          &TxnManagerServiceProxy::KeepTransactionAliveAsync,
+          "KeepTransactionAlive",
+          [ctx](const Status& s) {
+            TxnKeepAliveCb(s, std::move(ctx));
+          }
+  ));
+  ctx->rpc = std::move(rpc);
+
+  // Send the RPC and handle the response asynchronously.
+  ctx->rpc->SendRpc();
+}
+
+void KuduTransaction::Data::TxnKeepAliveCb(
+    const Status& s, sp::shared_ptr<KeepaliveRpcCtx> ctx) {
+  // Break the circular reference to 'ctx'. The circular reference is there
+  // because KeepaliveRpcCtx::rpc captures the 'ctx' for ResponseCallback.
+  ctx->rpc.reset();
+
+  // Check if the transaction object is still around.
+  sp::shared_ptr<KuduTransaction> txn(ctx->weak_txn.lock());
+  if (PREDICT_FALSE(!txn)) {
+    return;
+  }
+  const auto& resp = ctx->resp;
+  if (s.ok() && resp.has_error()) {
+    auto s = StatusFromPB(resp.error().status());
+  }
+  const auto& txn_id = txn->data_->txn_id_;
+  if (s.IsIllegalState() || s.IsAborted()) {
+    // Transaction's state changed a to terminal one, no need to send
+    // keepalive requests anymore.
+    VLOG(1) << Substitute("KeepTransactionAlive() returned $0: "
+                          "stopping keepalive requests for transaction ID $1",
+                          s.ToString(), txn_id.value());
+    return;
+  }
+
+  // Re-schedule the task, so it will send another keepalive heartbeat as
+  // necessary.
+  sp::shared_ptr<KuduClient> c(txn->data_->weak_client_.lock());
+  if (PREDICT_FALSE(!c)) {
+    return;
+  }
+  auto m = c->data_->messenger_;
+  if (PREDICT_TRUE(m)) {
+    auto weak_txn = ctx->weak_txn;
+    const auto next_run_after = MonoDelta::FromMilliseconds(
+        std::max<uint32_t>(1, txn->data_->txn_keep_alive_ms_ / 2));
+    m->ScheduleOnReactor(
+        [weak_txn](const Status& s) {
+          SendTxnKeepAliveTask(s, weak_txn);
+        },
+        next_run_after);
+  }
+}
+
+KuduTransactionSerializer::Data::Data(
+    const sp::shared_ptr<KuduTransaction>& txn)
+    : txn_(txn),
+      enable_keepalive_(false) {
+  DCHECK(txn_);
+}
+
+Status KuduTransactionSerializer::Data::Serialize(string* txn_token) const {
+  DCHECK(txn_token);
+  if (PREDICT_FALSE(!txn_)) {
+    return Status::InvalidArgument("no transaction to serialize");
+  }
+  return txn_->data_->Serialize(txn_token, enable_keepalive_);
+}
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/transaction-internal.h b/src/kudu/client/transaction-internal.h
index 0780fd4..ce0db97 100644
--- a/src/kudu/client/transaction-internal.h
+++ b/src/kudu/client/transaction-internal.h
@@ -32,21 +32,21 @@ class MonoTime;
 
 namespace client {
 
-// This class contains the implementation the functionality behind
-// kudu::client::KuduTransaction.
+struct KeepaliveRpcCtx;
+
+// This class implements the functionality of kudu::client::KuduTransaction.
 class KuduTransaction::Data {
  public:
   explicit Data(const sp::shared_ptr<KuduClient>& client);
 
   Status CreateSession(sp::shared_ptr<KuduSession>* session);
 
-  Status Begin();
+  Status Begin(const sp::shared_ptr<KuduTransaction>& txn);
   Status Commit(bool wait);
   Status IsCommitComplete(bool* is_complete, Status* completion_status);
   Status Rollback();
 
-  Status Serialize(std::string* serialized_txn) const;
-
+  Status Serialize(std::string* serialized_txn, bool enable_keepalive) const;
   static Status Deserialize(const sp::shared_ptr<KuduClient>& client,
                             const std::string& serialized_txn,
                             sp::shared_ptr<KuduTransaction>* txn);
@@ -61,12 +61,45 @@ class KuduTransaction::Data {
   static Status WaitForTxnCommitToFinalize(
       KuduClient* client, const MonoTime& deadline, const TxnId& txn_id);
 
+  // The self-rescheduling task to send KeepTransactionAlive() RPC periodically
+  // for a transaction. The task re-schedules itself as needed.
+  // The 'status' parameter is to report on the status of scheduling the task
+  // on reactor.
+  static void SendTxnKeepAliveTask(const Status& status,
+                                   sp::weak_ptr<KuduTransaction> weak_txn);
+
+  // This helper member function is to analyze the status of the
+  // KeepTransactionAlive() RPC and re-schedule the SendTxnKeepAliveTask(),
+  // if necessary. The 'status' parameter contains the status of
+  // KeepTransactionAlive() RPC called by SendTxnKeepAliveTask(). As soon as
+  // TxnManager returns Status::IllegalState() or Status::Aborted(), the task
+  // stops rescheduling itself, so transactions in terminal states are no longer
+  // receiving keepalive heartbeats from the client.
+  static void TxnKeepAliveCb(const Status& status,
+                             sp::shared_ptr<KeepaliveRpcCtx> ctx);
+
   sp::weak_ptr<KuduClient> weak_client_;
-  TxnId txn_id_;
   uint32_t txn_keep_alive_ms_;
+  TxnId txn_id_;
 
   DISALLOW_COPY_AND_ASSIGN(Data);
 };
 
+// This class implements the functionality of
+// kudu::client::KuduTransactionSerializer.
+class KuduTransactionSerializer::Data {
+ public:
+  explicit Data(const sp::shared_ptr<KuduTransaction>& client);
+
+  void enable_keepalive(bool enable) {
+    enable_keepalive_ = enable;
+  }
+
+  Status Serialize(std::string* txn_token) const;
+
+  sp::shared_ptr<KuduTransaction> txn_;
+  bool enable_keepalive_;
+};
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/txn_manager_proxy_rpc.cc b/src/kudu/client/txn_manager_proxy_rpc.cc
index 7ab55e8..12bf81c 100644
--- a/src/kudu/client/txn_manager_proxy_rpc.cc
+++ b/src/kudu/client/txn_manager_proxy_rpc.cc
@@ -53,6 +53,8 @@ using kudu::transactions::CommitTransactionRequestPB;
 using kudu::transactions::CommitTransactionResponsePB;
 using kudu::transactions::GetTransactionStateRequestPB;
 using kudu::transactions::GetTransactionStateResponsePB;
+using kudu::transactions::KeepTransactionAliveRequestPB;
+using kudu::transactions::KeepTransactionAliveResponsePB;
 using kudu::transactions::TxnManagerServiceProxy;
 using kudu::transactions::TxnManagerErrorPB;
 using std::string;
@@ -76,9 +78,13 @@ AsyncRandomTxnManagerRpc<ReqClass, RespClass>::AsyncRandomTxnManagerRpc(
     string rpc_name,
     StatusCallback user_cb)
     : Rpc(deadline, client->data_->messenger_, backoff),
-      client_(client), req_(&req), resp_(resp), func_(func),
+      client_(client),
+      req_(&req),
+      resp_(resp),
+      func_(func),
       rpc_name_(std::move(rpc_name)),
-      user_cb_(std::move(user_cb)) {
+      user_cb_(std::move(user_cb)),
+      multi_txn_manager_(client_->IsMultiMaster()) {
   DCHECK(deadline.Initialized());
 }
 
@@ -139,7 +145,6 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
     // 'rpc_name_' and 'client_' inaccessible.
     KLOG_EVERY_N_SECS(WARNING, 1) << retry_warning;
   });
-  const bool multi_txn_manager = client_->IsMultiMaster();
   // Pull out the RPC status.
   Status s = *status;
   if (s.ok()) {
@@ -156,7 +161,7 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
   // another TxnManager (if available) or re-send the RPC to the same TxnManager
   // a bit later.
   if (s.IsServiceUnavailable()) {
-    if (multi_txn_manager) {
+    if (multi_txn_manager_) {
       ResetTxnManagerAndRetry(s);
     } else {
       mutable_retrier()->DelayedRetry(this, s);
@@ -168,7 +173,7 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
   // mean a TxnManager is down or doesn't exist. If there's another TxnManager
   // to connect to, connect to it. Otherwise, don't bother retrying.
   if (s.IsNetworkError()) {
-    if (multi_txn_manager) {
+    if (multi_txn_manager_) {
       ResetTxnManagerAndRetry(s);
       return true;
     }
@@ -178,7 +183,7 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
     // deadline for RPC operation, retry the operation; if multiple TxnManagers
     // are available, try with another one.
     if (MonoTime::Now() < retrier().deadline()) {
-      if (multi_txn_manager) {
+      if (multi_txn_manager_) {
         ResetTxnManagerAndRetry(s);
       } else {
         mutable_retrier()->DelayedRetry(this, s);
@@ -200,7 +205,7 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
          err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) {
       // If TxnManager is too busy, try another one if it's around or try again
       // later with the same TxnManager.
-      if (multi_txn_manager) {
+      if (multi_txn_manager_) {
         ResetTxnManagerAndRetry(s);
       } else {
         mutable_retrier()->DelayedRetry(this, s);
@@ -225,7 +230,7 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
     const auto app_status = StatusFromPB(app_err.status());
     DCHECK(!app_status.ok());
     if (app_status.IsServiceUnavailable()) {
-      if (multi_txn_manager) {
+      if (multi_txn_manager_) {
         ResetTxnManagerAndRetry(app_status);
       } else {
         mutable_retrier()->DelayedRetry(this, app_status);
@@ -249,6 +254,8 @@ template class AsyncRandomTxnManagerRpc<CommitTransactionRequestPB,
                                         CommitTransactionResponsePB>;
 template class AsyncRandomTxnManagerRpc<GetTransactionStateRequestPB,
                                         GetTransactionStateResponsePB>;
+template class AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+                                        KeepTransactionAliveResponsePB>;
 
 } // namespace internal
 } // namespace client
diff --git a/src/kudu/client/txn_manager_proxy_rpc.h b/src/kudu/client/txn_manager_proxy_rpc.h
index 77b7d8d..501c9e7 100644
--- a/src/kudu/client/txn_manager_proxy_rpc.h
+++ b/src/kudu/client/txn_manager_proxy_rpc.h
@@ -125,6 +125,9 @@ class AsyncRandomTxnManagerRpc : public rpc::Rpc {
   // Callback to call upon completion of the operation (whether the RPC itself
   // was successful or not).
   const StatusCallback user_cb_;
+
+  // Whether this is a multi-master cluster to work with.
+  const bool multi_txn_manager_;
 };
 
 } // namespace internal
diff --git a/src/kudu/integration-tests/txn_status_manager-itest.cc b/src/kudu/integration-tests/txn_status_manager-itest.cc
index 00807b1..38e060f 100644
--- a/src/kudu/integration-tests/txn_status_manager-itest.cc
+++ b/src/kudu/integration-tests/txn_status_manager-itest.cc
@@ -28,6 +28,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/client/client.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/gutil/map-util.h"
@@ -48,6 +49,9 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using kudu::client::KuduClient;
+using kudu::client::KuduTransaction;
+using kudu::client::sp::shared_ptr;
 using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::cluster::TabletIdAndTableName;
 using kudu::itest::TServerDetails;
@@ -64,10 +68,8 @@ using kudu::transactions::KeepTransactionAliveRequestPB;
 using kudu::transactions::KeepTransactionAliveResponsePB;
 using kudu::transactions::TxnManagerServiceProxy;
 using kudu::transactions::TxnStatePB;
-using std::shared_ptr;
 using std::string;
 using std::thread;
-using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -230,8 +232,8 @@ class TxnStatusManagerITest : public ExternalMiniClusterITestBase {
   ExternalMiniClusterOptions cluster_opts_;
   string txn_status_tablet_id_;
 
-  shared_ptr<rpc::Messenger> messenger_;
-  unique_ptr<TxnManagerServiceProxy> txn_manager_proxy_;
+  std::shared_ptr<rpc::Messenger> messenger_;
+  std::unique_ptr<TxnManagerServiceProxy> txn_manager_proxy_;
 };
 
 const MonoDelta TxnStatusManagerITest::kTimeout = MonoDelta::FromSeconds(15);
@@ -356,7 +358,7 @@ TEST_F(TxnStatusManagerITest, DISABLED_TxnKeepAliveMultiTxnStatusManagerInstance
     const auto period = MonoDelta::FromMilliseconds(keepalive_interval_ms / 2);
     const auto timeout = MonoDelta::FromMilliseconds(keepalive_interval_ms / 4);
     // Keepalive thread uses its own messenger and proxy.
-    shared_ptr<rpc::Messenger> m;
+    std::shared_ptr<rpc::Messenger> m;
     rpc::MessengerBuilder b("txn-keepalive");
     ASSERT_OK(b.Build(&m));
     const auto rpc_addr = cluster_->master()->bound_rpc_addr();
@@ -466,4 +468,35 @@ TEST_F(TxnStatusManagerITest, DISABLED_TxnKeepAliveMultiTxnStatusManagerInstance
   NO_FATALS(cluster_->AssertNoCrashes());
 }
 
+// Check that internal txn heartbeater in KuduClient keeps sending
+// KeepTransactionAlive requests even if no TxnStatusManager instance is
+// accessible for some time, and the txn keepalive messages reach the
+// destination after TxnStatusManager is back online. So, the txn should not be
+// auto-aborted when its KuduTransaction objects is kept in the scope.
+TEST_F(TxnStatusManagerITest, DISABLED_TxnKeptAliveByClientIfStatusManagerRestarted) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  shared_ptr<KuduClient> c;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &c));
+
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(c->NewTransaction(&txn));
+
+  for (auto idx = 0; idx < cluster_->num_tablet_servers(); ++idx) {
+    auto* ts = cluster_->tablet_server(idx);
+    ts->Shutdown();
+  }
+
+  SleepFor(MonoDelta::FromMilliseconds(3 * kTxnKeepaliveIntervalMs));
+
+  for (auto idx = 0; idx < cluster_->num_tablet_servers(); ++idx) {
+    auto* ts = cluster_->tablet_server(idx);
+    ASSERT_OK(ts->Restart());
+  }
+
+  SleepFor(MonoDelta::FromMilliseconds(5 * kTxnKeepaliveIntervalMs));
+
+  ASSERT_OK(txn->Commit(false /* wait */));
+  NO_FATALS(cluster_->AssertNoCrashes());
+}
+
 } // namespace kudu
diff --git a/src/kudu/transactions/transactions.proto b/src/kudu/transactions/transactions.proto
index 8d9d9a9..042ab9c 100644
--- a/src/kudu/transactions/transactions.proto
+++ b/src/kudu/transactions/transactions.proto
@@ -55,4 +55,8 @@ message TxnTokenPB {
   // keep-alive heartbeats spaced by intervals equal or shorter than
   // 'keepalive_millis' milliseconds in duration.
   optional uint32 keepalive_millis = 2;
+
+  // Whether the client should automatically send keepalive messages once
+  // this token is deserialized into a runtime transaction handle.
+  optional bool enable_keepalive = 3;
 }