You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/12/12 03:08:31 UTC

[kudu] branch master updated (0ddb503 -> e75aad0)

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 0ddb503  [build] Add the option to use the protoc from thirdparty in the java build
     new 9197e2d  KUDU-2612: fuzz transactional inserts
     new c1d1b9c  [master-test] fix race in ConcurrentGetTableSchemaTest
     new e75aad0  KUDU-2612 keep-alive txn heartbeating for C++ client

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/client/client-test.cc                     | 118 +++-
 src/kudu/client/client.cc                          |  26 +-
 src/kudu/client/client.h                           | 135 +++-
 src/kudu/client/transaction-internal.cc            | 194 +++++-
 src/kudu/client/transaction-internal.h             |  45 +-
 src/kudu/client/txn_manager_proxy_rpc.cc           |  23 +-
 src/kudu/client/txn_manager_proxy_rpc.h            |   3 +
 src/kudu/integration-tests/fuzz-itest.cc           | 690 +++++++++++++++++----
 .../integration-tests/txn_status_manager-itest.cc  |  43 +-
 src/kudu/master/master-test.cc                     |   8 +-
 src/kudu/transactions/transactions.proto           |   4 +
 11 files changed, 1099 insertions(+), 190 deletions(-)


[kudu] 03/03: KUDU-2612 keep-alive txn heartbeating for C++ client

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e75aad09a8dbe295a11cc2042903b74faf72623e
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sun Nov 22 18:46:03 2020 -0800

    KUDU-2612 keep-alive txn heartbeating for C++ client
    
    This patch adds keep-alive txn heartbeating into the Kudu C++ client.
    
    The txn keepalive heartbeating is performed automatically by the client,
    and no API is exposed to send keep-alive messages for a transaction.
    The txn keepalive heartbeating continues until the original transaction
    handle (i.e. the handle created by KuduClient::NewTransaction()) goes
    out of scope.  In contrast, if the transaction handle is created by
    KuduTransaction::Deserialize(), the keepalive messages are or aren't
    sent depending on the KuduTransactionSerializer::enable_keepalive()
    setting when serializing the source handle.  By default, keepalive
    messages are not sent for deserialized transaction handles.  This is
    because the most common use case for multiple actors working in the
    context of the same transaction is supposed to be of the
    "star topology", when a transaction is started and committed by
    a top-level application who shares the context of the transaction with
    other applications which only submit their data, but don't manage the
    lifecycle of the transaction.
    
    This patch also contains a couple of test scenarios to cover the
    newly introduced functionality.
    
    Change-Id: I0283d8e16908f641388f7a30b513a672df27a186
    Reviewed-on: http://gerrit.cloudera.org:8080/16779
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/client/client-test.cc                     | 118 ++++++++++++-
 src/kudu/client/client.cc                          |  26 ++-
 src/kudu/client/client.h                           | 129 +++++++++++---
 src/kudu/client/transaction-internal.cc            | 194 +++++++++++++++++++--
 src/kudu/client/transaction-internal.h             |  45 ++++-
 src/kudu/client/txn_manager_proxy_rpc.cc           |  23 ++-
 src/kudu/client/txn_manager_proxy_rpc.h            |   3 +
 .../integration-tests/txn_status_manager-itest.cc  |  43 ++++-
 src/kudu/transactions/transactions.proto           |   4 +
 9 files changed, 514 insertions(+), 71 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 2047d03..c1087ce 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -169,6 +169,8 @@ DECLARE_int64(on_disk_size_for_testing);
 DECLARE_string(location_mapping_cmd);
 DECLARE_string(superuser_acl);
 DECLARE_string(user_acl);
+DECLARE_uint32(txn_keepalive_interval_ms);
+DECLARE_uint32(txn_staleness_tracker_interval_ms);
 DECLARE_uint32(txn_manager_status_table_num_replicas);
 DEFINE_int32(test_scan_num_rows, 1000, "Number of rows to insert and scan");
 
@@ -246,6 +248,15 @@ class ClientTest : public KuduTest {
     // replica of the transaction status table.
     FLAGS_txn_manager_status_table_num_replicas = 1;
 
+    // Speed up test scenarios which are related to txn keepalive interval.
+#if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
+    FLAGS_txn_keepalive_interval_ms = 1500;
+    FLAGS_txn_staleness_tracker_interval_ms = 300;
+#else
+    FLAGS_txn_keepalive_interval_ms = 250;
+    FLAGS_txn_staleness_tracker_interval_ms = 50;
+#endif
+
     SetLocationMappingCmd();
 
     // Start minicluster and wait for tablet servers to connect to master.
@@ -427,7 +438,7 @@ class ClientTest : public KuduTest {
   //                once the transaction orchestration is implemented
   Status FinalizeCommitTransaction(const shared_ptr<KuduTransaction>& txn) {
     string txn_token;
-    RETURN_NOT_OK(txn->Serialize(&txn_token));
+    RETURN_NOT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
     TxnTokenPB token;
     CHECK(token.ParseFromString(txn_token));
     CHECK(token.has_txn_id());
@@ -7126,7 +7137,7 @@ TEST_F(ClientTest, TxnCommit) {
       ASSERT_FALSE(is_complete);
       ASSERT_TRUE(cs.IsIncomplete()) << cs.ToString();
       ASSERT_STR_CONTAINS(cs.ToString(), "commit is still in progress");
-      ASSERT_OK(txn->Serialize(&txn_token));
+      ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
     }
 
     // Make sure the transaction isn't aborted once its KuduTransaction handle
@@ -7250,12 +7261,12 @@ TEST_F(ClientTest, TxnToken) {
   ASSERT_GT(txn_keepalive_ms, 0);
 
   string txn_token;
-  ASSERT_OK(txn->Serialize(&txn_token));
+  ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
 
   // Serializing the same transaction again produces the same result.
   {
     string token;
-    ASSERT_OK(txn->Serialize(&token));
+    ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&token));
     ASSERT_EQ(txn_token, token);
   }
 
@@ -7278,7 +7289,7 @@ TEST_F(ClientTest, TxnToken) {
   // Make sure the KuduTransaction object deserialized from a token is fully
   // functional.
   string serdes_txn_token;
-  ASSERT_OK(serdes_txn->Serialize(&serdes_txn_token));
+  ASSERT_OK(KuduTransactionSerializer(serdes_txn).Serialize(&serdes_txn_token));
   ASSERT_EQ(txn_token, serdes_txn_token);
 
   // TODO(awong): remove once we register participants automatically before
@@ -7294,7 +7305,7 @@ TEST_F(ClientTest, TxnToken) {
     // The state of a transaction isn't stored in the token, so initiating
     // commit of the transaction doesn't change the result of the serialization.
     string token;
-    ASSERT_OK(serdes_txn->Serialize(&token));
+    ASSERT_OK(KuduTransactionSerializer(serdes_txn).Serialize(&token));
     ASSERT_EQ(serdes_txn_token, token);
   }
 
@@ -7302,14 +7313,14 @@ TEST_F(ClientTest, TxnToken) {
   shared_ptr<KuduTransaction> other_txn;
   ASSERT_OK(client_->NewTransaction(&other_txn));
   string other_txn_token;
-  ASSERT_OK(other_txn->Serialize(&other_txn_token));
+  ASSERT_OK(KuduTransactionSerializer(other_txn).Serialize(&other_txn_token));
   ASSERT_NE(txn_token, other_txn_token);
 
   // The state of a transaction isn't stored in the token, so aborting
   // the doesn't change the result of the serialization.
   string token;
   ASSERT_OK(other_txn->Rollback());
-  ASSERT_OK(other_txn->Serialize(&token));
+  ASSERT_OK(KuduTransactionSerializer(other_txn).Serialize(&token));
   ASSERT_EQ(other_txn_token, token);
 }
 
@@ -7322,7 +7333,7 @@ TEST_F(ClientTest, AttemptToControlTxnByOtherUser) {
   shared_ptr<KuduTransaction> txn;
   ASSERT_OK(client_->NewTransaction(&txn));
   string txn_token;
-  ASSERT_OK(txn->Serialize(&txn_token));
+  ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
 
   // Transaction identifier is surfacing here only to build the reference error
   // message for Status::NotAuthorized() returned by attempts to perform
@@ -7414,6 +7425,95 @@ TEST_F(ClientTxnManagerProxyTest, RetryOnServiceUnavailable) {
   ASSERT_OK(client_->NewTransaction(&txn));
 }
 
+TEST_F(ClientTest, TxnKeepAlive) {
+  // Begin a transaction and wait for longer than the keepalive interval
+  // (with some margin). If there were no txn keepalive messages sent,
+  // the transaction would be automatically aborted. Since the client
+  // sends keepalive heartbeats under the hood, it's still possible to commit
+  // the transaction after some period of inactivity.
+  {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+
+    SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
+
+    ASSERT_OK(txn->Commit(false /* wait */));
+    ASSERT_OK(FinalizeCommitTransaction(txn));
+  }
+
+  // Begin a transaction and move its KuduTransaction object out of the
+  // scope. After txn keepalive interval (with some margin) the system should
+  // automatically abort the transaction.
+  {
+    string txn_token;
+    {
+      shared_ptr<KuduTransaction> txn;
+      ASSERT_OK(client_->NewTransaction(&txn));
+      ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
+    }
+
+    SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
+
+    // The transaction should be automatically aborted since no keepalive
+    // requests were sent once the original KuduTransaction object went out
+    // of the scope.
+    shared_ptr<KuduTransaction> serdes_txn;
+    ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn));
+    auto s = serdes_txn->Commit(false /* wait */);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORTED");
+  }
+
+  // Begin a new transaction and move the KuduTransaction object out of the
+  // scope. If the transaction handle is deserialized from a txn token that
+  // hadn't keepalive enabled when serialized, the transaction should be
+  // automatically aborted after txn keepalive timeout.
+  {
+    string txn_token;
+    {
+      shared_ptr<KuduTransaction> txn;
+      ASSERT_OK(client_->NewTransaction(&txn));
+      ASSERT_OK(KuduTransactionSerializer(txn).Serialize(&txn_token));
+    }
+
+    shared_ptr<KuduTransaction> serdes_txn;
+    ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn));
+    ASSERT_NE(nullptr, serdes_txn.get());
+
+    SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
+
+    auto s = serdes_txn->Commit(false /* wait */);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORTED");
+  }
+
+  // Begin a new transaction and move the KuduTransaction object out of the
+  // scope. If the transaction handle is deserialized from a txn token that
+  // had keepalive enabled when serialized, the transaction should stay alive
+  // even if the original KuduTransaction object went out of the scope.
+  // It's assumed that no longer than the keepalive timeout interval has passed
+  // between the original transaction handle got out of scope and the new
+  // one has been created from the token.
+  {
+    string txn_token;
+    {
+      shared_ptr<KuduTransaction> txn;
+      ASSERT_OK(client_->NewTransaction(&txn));
+      ASSERT_OK(KuduTransactionSerializer(txn)
+                .enable_keepalive(true)
+                .Serialize(&txn_token));
+    }
+
+    shared_ptr<KuduTransaction> serdes_txn;
+    ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn));
+
+    SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
+
+    ASSERT_OK(serdes_txn->Commit(false /* wait */));
+    ASSERT_OK(FinalizeCommitTransaction(serdes_txn));
+  }
+}
+
 // Client test that assigns locations to clients and tablet servers.
 // For now, assigns a uniform location to all clients and tablet servers.
 class ClientWithLocationTest : public ClientTest {
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 61e82ca..ed25587 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -29,7 +29,6 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
-#include <boost/type_traits/decay.hpp>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/common.h>
 
@@ -425,16 +424,31 @@ Status KuduTransaction::Rollback() {
   return data_->Rollback();
 }
 
-Status KuduTransaction::Serialize(string* serialized_txn) const {
-  return data_->Serialize(serialized_txn);
-}
-
 Status KuduTransaction::Deserialize(const sp::shared_ptr<KuduClient>& client,
                                     const string& serialized_txn,
                                     sp::shared_ptr<KuduTransaction>* txn) {
   return Data::Deserialize(client, serialized_txn, txn);
 }
 
+KuduTransactionSerializer::KuduTransactionSerializer(
+    const sp::shared_ptr<KuduTransaction>& txn)
+    : data_(new KuduTransactionSerializer::Data(txn)) {
+}
+
+KuduTransactionSerializer::~KuduTransactionSerializer() {
+  delete data_;
+}
+
+KuduTransactionSerializer&
+KuduTransactionSerializer::enable_keepalive(bool enable) {
+  data_->enable_keepalive(enable);
+  return *this;
+}
+
+Status KuduTransactionSerializer::Serialize(string* serialized_txn) const {
+  return data_->Serialize(serialized_txn);
+}
+
 KuduClient::KuduClient()
   : data_(new KuduClient::Data()) {
   static ObjectIdGenerator oid_generator;
@@ -566,7 +580,7 @@ shared_ptr<KuduSession> KuduClient::NewSession() {
 
 Status KuduClient::NewTransaction(sp::shared_ptr<KuduTransaction>* txn) {
   shared_ptr<KuduTransaction> ret(new KuduTransaction(shared_from_this()));
-  const auto s = ret->data_->Begin();
+  const auto s = ret->data_->Begin(ret);
   if (s.ok()) {
     *txn = std::move(ret);
   }
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 66f7dbb..22d63c5 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -333,6 +333,11 @@ class KUDU_EXPORT KuduClientBuilder {
 /// @c KuduTransaction instance can be used to commit or rollback the underlying
 /// multi-row transaction and create a transactional session.
 ///
+/// @note The @c KuduTransaction should be kept in scope to maintain automatic
+///   keep-alive heartbeating for the corresponding transaction. Once this
+///   object goes out of scope, the heartbeating stops and the transaction may
+///   automatically be aborted soon if no other clients do the heartbeating.
+///
 /// @note There isn't any automation to rollback or commit the underlying
 ///   transaction upon destruction of an instance of this class.
 ///
@@ -409,30 +414,13 @@ class KUDU_EXPORT KuduTransaction :
   /// @return Operation result status.
   Status Rollback() WARN_UNUSED_RESULT;
 
-  /// Export the information on this transaction in a serialized form.
-  ///
-  /// The serialized information on a Kudu transaction can be passed among
-  /// different Kudu clients running at multiple nodes, so those separate
-  /// Kudu clients can perform operations to be a part of the same distributed
-  /// transaction. The resulting string is called "transaction token" and it
-  /// can be deserialized using the @c KuduTransaction::Deserialize() method.
-  /// The result of the latter is an instance of the @c KuduTransaction class
-  /// to work with in the run-time.
-  ///
-  /// This method doesn't perform any RPC under the hood.
-  ///
-  /// @note The representation of the data in the serialized form
-  ///   (i.e. the format of a Kudu transaction token) is an implementation
-  ///   detail, not a part of the public API.
-  ///
-  /// @param [out] serialized_txn
-  ///   Result string to output the serialized transaction information.
-  /// @return Operation result status.
-  Status Serialize(std::string* serialized_txn) const WARN_UNUSED_RESULT;
-
   /// Re-create KuduTransaction object given its serialized representation.
   ///
-  /// This method doesn't perform any RPC under the hood.
+  /// This method doesn't perform any RPC under the hood. The newly created
+  /// object automatically does or does not send keep-alive messages depending
+  /// on the @c KuduTransactionSerializer::enable_keepalive() setting when
+  /// the original @c KuduTransaction object was serialized using
+  /// @c KuduTransactionSerializer::Serialize().
   ///
   /// @param [in] client
   ///   Client instance to bound the result object to.
@@ -447,6 +435,7 @@ class KUDU_EXPORT KuduTransaction :
  private:
   friend class KuduClient;
   friend class KuduSession;
+  friend class KuduTransactionSerializer;
   FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
   FRIEND_TEST(ClientTest, TxnToken);
 
@@ -456,6 +445,96 @@ class KUDU_EXPORT KuduTransaction :
   Data* data_; // Owned.
 };
 
+/// A utility class to help with serialization of @c KuduTransaction handles.
+///
+/// The purpose of this class is to control the keepalive behavior for the
+/// @c KuduTransaction handle once it's deserialized from the token. In future,
+/// the list of configurable parameters might be extended (e.g., by adding
+/// commit and abort permissions, i.e. whether a handle can be used to
+/// commit and/or abort the transaction).
+class KUDU_EXPORT KuduTransactionSerializer {
+ public:
+  /// Create a serializer for the specified @c KuduTransaction object.
+  ///
+  /// @param [out] txn
+  ///   Smart pointer to the @c KuduTransaction object to be serialized
+  ///   by the @c Serialize() method.
+  explicit KuduTransactionSerializer(const sp::shared_ptr<KuduTransaction>& txn);
+
+  ~KuduTransactionSerializer();
+
+  /// Whether to enable sending keepalive messages for the @c KuduTransaction
+  /// handle when it's deserialized from the string produced by
+  /// @c KuduTransactionSerializer::Serialize().
+  ///
+  /// No keepalive heartbeat messages are sent from a transaction handle whose
+  /// source token was created with the default "keepalive disabled" setting.
+  /// The idea here is that the most common use case for using transaction
+  /// tokens is of the "start topology" (see below), so it's enough to have
+  /// just one top-level handle sending keepalive messages. Overall, having more
+  /// than one actor sending keepalive messages for a transaction is acceptable
+  /// but it puts needless load on a cluster.
+  ///
+  /// The most common use case for a transaction's handle
+  /// serialization/deserialization is of the "star topology": a transaction is
+  /// started by a top-level application which sends the transaction token
+  /// produced by serializing the original transaction handle to other worker
+  /// applications running concurrently, where the latter write their data
+  /// in the context of the same transaction and report back to the top-level
+  /// application, which in its turn initiates committing the transaction
+  /// as needed. The important point is that the top-level application keeps the
+  /// transaction handle around all the time from the start of the transaction
+  /// to the very point when transaction is committed. Under the hood, the
+  /// original transaction handle sends keepalive messages as required until
+  /// commit phase is initiated, so the deserialized transaction handles which
+  /// are used by the worker applications don't need to send keepalive messages.
+  ///
+  /// The other (less common) use case is of the "ring topology": a chain of
+  /// applications work sequentially as a part of the same transaction, where
+  /// the very first application starts the transaction, writes its data, and
+  /// hands over the responsibility of managing the lifecycle of the transaction
+  /// to other application down the chain. After doing so it may exit, so now
+  /// only the next application has the active transaction handle, and so on it
+  /// goes until the transaction is committed by the application in the end
+  /// of the chain. In this scenario, every deserialized handle have to send
+  /// keepalive messages to avoid automatic rollback of the transaction,
+  /// and every application in the chain should call
+  /// @c KuduTransactionSerializer::enable_keepalive() when serializing its
+  /// transaction handle into a transaction token to pass to the application
+  /// next in the chain.
+  ///
+  /// @param [in] enable
+  ///   Whether to enable sending keepalive messages for the @c KuduTransaction
+  ///   object once it's deserialized from the string to be produced by
+  ///   @c KuduTransactionSerializer::Serialize().
+  /// @return Reference to the updated object.
+  KuduTransactionSerializer& enable_keepalive(bool enable);
+
+  /// Export the information on this transaction in a serialized form.
+  ///
+  /// The serialized information on a Kudu transaction can be passed among
+  /// different Kudu clients running at multiple nodes, so those separate
+  /// Kudu clients can perform operations to be a part of the same distributed
+  /// transaction. The resulting string is referred as "transaction token" and
+  /// it can be deserialized into an object of the @c KuduTransaction class via
+  /// the @c KuduTransaction::Deserialize() method.
+  ///
+  /// This method doesn't perform any RPC under the hood.
+  ///
+  /// @note The representation of the data in the serialized form
+  ///   (i.e. the format of a Kudu transaction token) is an implementation
+  ///   detail, not a part of the public API.
+  ///
+  /// @param [out] serialized_txn
+  ///   Result string to output the serialized transaction information.
+  /// @return Operation result status.
+  Status Serialize(std::string* serialized_txn) const WARN_UNUSED_RESULT;
+
+ private:
+  class KUDU_NO_EXPORT Data;
+  Data* data_; // Owned.
+};
+
 /// @brief A handle for a connection to a cluster.
 ///
 /// The KuduClient class represents a connection to a cluster. From the user
@@ -618,6 +697,11 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   /// the newly created transaction, use @c KuduTransaction::Commit() and
   /// @c KuduTransaction::Rollback() methods correspondingly.
   ///
+  /// @note The newly created object starts sending keep-alive messages for
+  ///   the newly opened transaction as required by the keep-alive interval
+  ///   assigned to the transaction by the system. To keep the heartbeating,
+  ///   the newly created @c KuduTransaction should be kept in scope.
+  ///
   /// @warning This method is experimental and may change or disappear in future.
   ///
   /// @param txn [out]
@@ -789,6 +873,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   friend class ConnectToClusterBaseTest;
   friend class KuduClientBuilder;
   friend class KuduPartitionerBuilder;
+  friend class KuduTransaction;
   friend class KuduScanToken;
   friend class KuduScanTokenBuilder;
   friend class KuduScanner;
diff --git a/src/kudu/client/transaction-internal.cc b/src/kudu/client/transaction-internal.cc
index af0fddc..31e09ce 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -17,11 +17,11 @@
 
 #include "kudu/client/transaction-internal.h"
 
+#include <algorithm>
 #include <functional>
 #include <memory>
 #include <ostream>
 #include <string>
-#include <utility>
 
 #include <glog/logging.h>
 
@@ -32,15 +32,18 @@
 #include "kudu/client/txn_manager_proxy_rpc.h"
 #include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/txn_manager.pb.h"
 #include "kudu/master/txn_manager.proxy.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc.h"
 #include "kudu/transactions/transactions.pb.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
 
 using kudu::client::internal::AsyncRandomTxnManagerRpc;
 using kudu::rpc::BackoffType;
@@ -52,10 +55,13 @@ using kudu::transactions::CommitTransactionRequestPB;
 using kudu::transactions::CommitTransactionResponsePB;
 using kudu::transactions::GetTransactionStateRequestPB;
 using kudu::transactions::GetTransactionStateResponsePB;
+using kudu::transactions::KeepTransactionAliveRequestPB;
+using kudu::transactions::KeepTransactionAliveResponsePB;
 using kudu::transactions::TxnManagerServiceProxy;
 using kudu::transactions::TxnStatePB;
 using kudu::transactions::TxnTokenPB;
 using std::string;
+using std::unique_ptr;
 using strings::Substitute;
 
 namespace kudu {
@@ -91,7 +97,7 @@ Status KuduTransaction::Data::CreateSession(sp::shared_ptr<KuduSession>* session
   return Status::OK();
 }
 
-Status KuduTransaction::Data::Begin() {
+Status KuduTransaction::Data::Begin(const sp::shared_ptr<KuduTransaction>& txn) {
   auto c = weak_client_.lock();
   if (!c) {
     return Status::IllegalState("associated KuduClient is gone");
@@ -100,6 +106,7 @@ Status KuduTransaction::Data::Begin() {
   Synchronizer sync;
   BeginTransactionRequestPB req;
   BeginTransactionResponsePB resp;
+  // TODO(aserbin): should there be no retries for sending keepalive heartbeats?
   AsyncRandomTxnManagerRpc<
       BeginTransactionRequestPB, BeginTransactionResponsePB> rpc(
       deadline, c.get(), BackoffType::EXPONENTIAL, req, &resp,
@@ -111,18 +118,33 @@ Status KuduTransaction::Data::Begin() {
     return StatusFromPB(resp.error().status());
   }
 
-  // TODO(aserbin): start sending regular hearbeats for the started transaction
-  CHECK(resp.has_txn_id());
+  DCHECK(resp.has_txn_id());
   txn_id_ = resp.txn_id();
-  CHECK(resp.has_keepalive_millis());
+  DCHECK(txn_id_.IsValid());
+  DCHECK(resp.has_keepalive_millis());
   txn_keep_alive_ms_ = resp.keepalive_millis();
-  CHECK(txn_id_.IsValid());
+  DCHECK_GT(txn_keep_alive_ms_, 0);
+
+  // Start sending regular heartbeats for the new transaction.
+  auto next_run_after = MonoDelta::FromMilliseconds(
+      std::max<uint32_t>(1, txn_keep_alive_ms_ / 2));
+  auto m = c->data_->messenger_;
+  if (PREDICT_FALSE(!m)) {
+    return Status::IllegalState("null messenger in Kudu client");
+  }
+
+  sp::weak_ptr<KuduTransaction> weak_txn(txn);
+  m->ScheduleOnReactor(
+      [weak_txn](const Status& s) {
+        SendTxnKeepAliveTask(s, weak_txn);
+      },
+      next_run_after);
 
   return Status::OK();
 }
 
 Status KuduTransaction::Data::Commit(bool wait) {
-  CHECK(txn_id_.IsValid());
+  DCHECK(txn_id_.IsValid());
   auto c = weak_client_.lock();
   if (!c) {
     return Status::IllegalState("associated KuduClient is gone");
@@ -152,7 +174,7 @@ Status KuduTransaction::Data::IsCommitComplete(
     bool* is_complete, Status* completion_status) {
   DCHECK(is_complete);
   DCHECK(completion_status);
-  CHECK(txn_id_.IsValid());
+  DCHECK(txn_id_.IsValid());
   auto c = weak_client_.lock();
   if (!c) {
     return Status::IllegalState("associated KuduClient is gone");
@@ -186,14 +208,14 @@ Status KuduTransaction::Data::Rollback() {
   return Status::OK();
 }
 
-Status KuduTransaction::Data::Serialize(string* serialized_txn) const {
-  CHECK(txn_id_.IsValid());
+Status KuduTransaction::Data::Serialize(string* serialized_txn,
+                                        bool enable_keepalive) const {
   DCHECK(serialized_txn);
+  DCHECK(txn_id_.IsValid());
   TxnTokenPB token;
   token.set_txn_id(txn_id_);
-  if (txn_keep_alive_ms_ > 0) {
-    token.set_keepalive_millis(txn_keep_alive_ms_);
-  }
+  token.set_keepalive_millis(txn_keep_alive_ms_);
+  token.set_enable_keepalive(enable_keepalive);
   if (!token.SerializeToString(serialized_txn)) {
     return Status::Corruption("unable to serialize transaction information");
   }
@@ -204,6 +226,8 @@ Status KuduTransaction::Data::Deserialize(
     const sp::shared_ptr<KuduClient>& client,
     const string& serialized_txn,
     sp::shared_ptr<KuduTransaction>* txn) {
+  DCHECK(client);
+
   // TODO(aserbin): should the owner of the transaction be taken into account
   //                as well, i.e. not allow other than the user that created
   //                the transaction to deserialize its transaction token?
@@ -219,10 +243,28 @@ Status KuduTransaction::Data::Deserialize(
   }
 
   sp::shared_ptr<KuduTransaction> ret(new KuduTransaction(client));
-  ret->data_->txn_id_ = token.txn_id();
   ret->data_->txn_keep_alive_ms_ = token.keepalive_millis();
-  *txn = std::move(ret);
+  DCHECK_GT(ret->data_->txn_keep_alive_ms_, 0);
+  ret->data_->txn_id_ = token.txn_id();
+  DCHECK(ret->data_->txn_id_.IsValid());
+
+  // Start sending periodic txn keepalive requests for the deserialized
+  // transaction, as specified in the source txn token.
+  if (token.has_enable_keepalive() && token.enable_keepalive()) {
+    auto m = client->data_->messenger_;
+    if (PREDICT_TRUE(m)) {
+      sp::weak_ptr<KuduTransaction> weak_txn(ret);
+      auto next_run_after = MonoDelta::FromMilliseconds(
+          std::max<uint32_t>(1, ret->data_->txn_keep_alive_ms_ / 2));
+      m->ScheduleOnReactor(
+          [weak_txn](const Status& s) {
+            SendTxnKeepAliveTask(s, weak_txn);
+          },
+          next_run_after);
+    }
+  }
 
+  *txn = std::move(ret);
   return Status::OK();
 }
 
@@ -296,5 +338,127 @@ Status KuduTransaction::Data::WaitForTxnCommitToFinalize(
       });
 }
 
+// A structure to pass around metadata on KeepTransactionAlive() when invoking
+// the RPC asynchronously.
+struct KeepaliveRpcCtx {
+  KeepTransactionAliveResponsePB resp;
+  unique_ptr<AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+                                      KeepTransactionAliveResponsePB>> rpc;
+  sp::weak_ptr<KuduTransaction> weak_txn;
+};
+
+void KuduTransaction::Data::SendTxnKeepAliveTask(
+    const Status& status,
+    sp::weak_ptr<KuduTransaction> weak_txn) {
+  VLOG(2) << Substitute("SendTxnKeepAliveTask() is run");
+  if (PREDICT_FALSE(!status.ok())) {
+    // This means there was an error executing the task on reactor. As of now,
+    // this can only happen if the reactor is being shutdown and the task is
+    // de-scheduled from the queue, so the only possible error status here is
+    // Status::Aborted().
+    VLOG(1) << Substitute("SendTxnKeepAliveTask did not run: $0",
+                          status.ToString());
+    DCHECK(status.IsAborted());
+    return;
+  }
+
+  // Check if the transaction object is still around.
+  sp::shared_ptr<KuduTransaction> txn(weak_txn.lock());
+  if (PREDICT_FALSE(!txn)) {
+    return;
+  }
+
+  auto c = txn->data_->weak_client_.lock();
+  if (PREDICT_FALSE(!c)) {
+    return;
+  }
+
+  const auto& txn_id = txn->data_->txn_id_;
+  const auto next_run_after = MonoDelta::FromMilliseconds(
+      std::max<uint32_t>(1, txn->data_->txn_keep_alive_ms_ / 2));
+  auto deadline = MonoTime::Now() + next_run_after;
+  KeepTransactionAliveRequestPB req;
+  req.set_txn_id(txn_id);
+
+  sp::shared_ptr<KeepaliveRpcCtx> ctx(new KeepaliveRpcCtx);
+  ctx->weak_txn = weak_txn;
+
+  unique_ptr<AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+                                      KeepTransactionAliveResponsePB>> rpc(
+      new AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+                                   KeepTransactionAliveResponsePB>(
+          deadline, c.get(), BackoffType::LINEAR,
+          req, &ctx->resp,
+          &TxnManagerServiceProxy::KeepTransactionAliveAsync,
+          "KeepTransactionAlive",
+          [ctx](const Status& s) {
+            TxnKeepAliveCb(s, std::move(ctx));
+          }
+  ));
+  ctx->rpc = std::move(rpc);
+
+  // Send the RPC and handle the response asynchronously.
+  ctx->rpc->SendRpc();
+}
+
+void KuduTransaction::Data::TxnKeepAliveCb(
+    const Status& s, sp::shared_ptr<KeepaliveRpcCtx> ctx) {
+  // Break the circular reference to 'ctx'. The circular reference is there
+  // because KeepaliveRpcCtx::rpc captures the 'ctx' for ResponseCallback.
+  ctx->rpc.reset();
+
+  // Check if the transaction object is still around.
+  sp::shared_ptr<KuduTransaction> txn(ctx->weak_txn.lock());
+  if (PREDICT_FALSE(!txn)) {
+    return;
+  }
+  const auto& resp = ctx->resp;
+  if (s.ok() && resp.has_error()) {
+    auto s = StatusFromPB(resp.error().status());
+  }
+  const auto& txn_id = txn->data_->txn_id_;
+  if (s.IsIllegalState() || s.IsAborted()) {
+    // Transaction's state changed a to terminal one, no need to send
+    // keepalive requests anymore.
+    VLOG(1) << Substitute("KeepTransactionAlive() returned $0: "
+                          "stopping keepalive requests for transaction ID $1",
+                          s.ToString(), txn_id.value());
+    return;
+  }
+
+  // Re-schedule the task, so it will send another keepalive heartbeat as
+  // necessary.
+  sp::shared_ptr<KuduClient> c(txn->data_->weak_client_.lock());
+  if (PREDICT_FALSE(!c)) {
+    return;
+  }
+  auto m = c->data_->messenger_;
+  if (PREDICT_TRUE(m)) {
+    auto weak_txn = ctx->weak_txn;
+    const auto next_run_after = MonoDelta::FromMilliseconds(
+        std::max<uint32_t>(1, txn->data_->txn_keep_alive_ms_ / 2));
+    m->ScheduleOnReactor(
+        [weak_txn](const Status& s) {
+          SendTxnKeepAliveTask(s, weak_txn);
+        },
+        next_run_after);
+  }
+}
+
+KuduTransactionSerializer::Data::Data(
+    const sp::shared_ptr<KuduTransaction>& txn)
+    : txn_(txn),
+      enable_keepalive_(false) {
+  DCHECK(txn_);
+}
+
+Status KuduTransactionSerializer::Data::Serialize(string* txn_token) const {
+  DCHECK(txn_token);
+  if (PREDICT_FALSE(!txn_)) {
+    return Status::InvalidArgument("no transaction to serialize");
+  }
+  return txn_->data_->Serialize(txn_token, enable_keepalive_);
+}
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/transaction-internal.h b/src/kudu/client/transaction-internal.h
index 0780fd4..ce0db97 100644
--- a/src/kudu/client/transaction-internal.h
+++ b/src/kudu/client/transaction-internal.h
@@ -32,21 +32,21 @@ class MonoTime;
 
 namespace client {
 
-// This class contains the implementation the functionality behind
-// kudu::client::KuduTransaction.
+struct KeepaliveRpcCtx;
+
+// This class implements the functionality of kudu::client::KuduTransaction.
 class KuduTransaction::Data {
  public:
   explicit Data(const sp::shared_ptr<KuduClient>& client);
 
   Status CreateSession(sp::shared_ptr<KuduSession>* session);
 
-  Status Begin();
+  Status Begin(const sp::shared_ptr<KuduTransaction>& txn);
   Status Commit(bool wait);
   Status IsCommitComplete(bool* is_complete, Status* completion_status);
   Status Rollback();
 
-  Status Serialize(std::string* serialized_txn) const;
-
+  Status Serialize(std::string* serialized_txn, bool enable_keepalive) const;
   static Status Deserialize(const sp::shared_ptr<KuduClient>& client,
                             const std::string& serialized_txn,
                             sp::shared_ptr<KuduTransaction>* txn);
@@ -61,12 +61,45 @@ class KuduTransaction::Data {
   static Status WaitForTxnCommitToFinalize(
       KuduClient* client, const MonoTime& deadline, const TxnId& txn_id);
 
+  // The self-rescheduling task to send KeepTransactionAlive() RPC periodically
+  // for a transaction. The task re-schedules itself as needed.
+  // The 'status' parameter is to report on the status of scheduling the task
+  // on reactor.
+  static void SendTxnKeepAliveTask(const Status& status,
+                                   sp::weak_ptr<KuduTransaction> weak_txn);
+
+  // This helper member function is to analyze the status of the
+  // KeepTransactionAlive() RPC and re-schedule the SendTxnKeepAliveTask(),
+  // if necessary. The 'status' parameter contains the status of
+  // KeepTransactionAlive() RPC called by SendTxnKeepAliveTask(). As soon as
+  // TxnManager returns Status::IllegalState() or Status::Aborted(), the task
+  // stops rescheduling itself, so transactions in terminal states are no longer
+  // receiving keepalive heartbeats from the client.
+  static void TxnKeepAliveCb(const Status& status,
+                             sp::shared_ptr<KeepaliveRpcCtx> ctx);
+
   sp::weak_ptr<KuduClient> weak_client_;
-  TxnId txn_id_;
   uint32_t txn_keep_alive_ms_;
+  TxnId txn_id_;
 
   DISALLOW_COPY_AND_ASSIGN(Data);
 };
 
+// This class implements the functionality of
+// kudu::client::KuduTransactionSerializer.
+class KuduTransactionSerializer::Data {
+ public:
+  explicit Data(const sp::shared_ptr<KuduTransaction>& client);
+
+  void enable_keepalive(bool enable) {
+    enable_keepalive_ = enable;
+  }
+
+  Status Serialize(std::string* txn_token) const;
+
+  sp::shared_ptr<KuduTransaction> txn_;
+  bool enable_keepalive_;
+};
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/txn_manager_proxy_rpc.cc b/src/kudu/client/txn_manager_proxy_rpc.cc
index 7ab55e8..12bf81c 100644
--- a/src/kudu/client/txn_manager_proxy_rpc.cc
+++ b/src/kudu/client/txn_manager_proxy_rpc.cc
@@ -53,6 +53,8 @@ using kudu::transactions::CommitTransactionRequestPB;
 using kudu::transactions::CommitTransactionResponsePB;
 using kudu::transactions::GetTransactionStateRequestPB;
 using kudu::transactions::GetTransactionStateResponsePB;
+using kudu::transactions::KeepTransactionAliveRequestPB;
+using kudu::transactions::KeepTransactionAliveResponsePB;
 using kudu::transactions::TxnManagerServiceProxy;
 using kudu::transactions::TxnManagerErrorPB;
 using std::string;
@@ -76,9 +78,13 @@ AsyncRandomTxnManagerRpc<ReqClass, RespClass>::AsyncRandomTxnManagerRpc(
     string rpc_name,
     StatusCallback user_cb)
     : Rpc(deadline, client->data_->messenger_, backoff),
-      client_(client), req_(&req), resp_(resp), func_(func),
+      client_(client),
+      req_(&req),
+      resp_(resp),
+      func_(func),
       rpc_name_(std::move(rpc_name)),
-      user_cb_(std::move(user_cb)) {
+      user_cb_(std::move(user_cb)),
+      multi_txn_manager_(client_->IsMultiMaster()) {
   DCHECK(deadline.Initialized());
 }
 
@@ -139,7 +145,6 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
     // 'rpc_name_' and 'client_' inaccessible.
     KLOG_EVERY_N_SECS(WARNING, 1) << retry_warning;
   });
-  const bool multi_txn_manager = client_->IsMultiMaster();
   // Pull out the RPC status.
   Status s = *status;
   if (s.ok()) {
@@ -156,7 +161,7 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
   // another TxnManager (if available) or re-send the RPC to the same TxnManager
   // a bit later.
   if (s.IsServiceUnavailable()) {
-    if (multi_txn_manager) {
+    if (multi_txn_manager_) {
       ResetTxnManagerAndRetry(s);
     } else {
       mutable_retrier()->DelayedRetry(this, s);
@@ -168,7 +173,7 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
   // mean a TxnManager is down or doesn't exist. If there's another TxnManager
   // to connect to, connect to it. Otherwise, don't bother retrying.
   if (s.IsNetworkError()) {
-    if (multi_txn_manager) {
+    if (multi_txn_manager_) {
       ResetTxnManagerAndRetry(s);
       return true;
     }
@@ -178,7 +183,7 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
     // deadline for RPC operation, retry the operation; if multiple TxnManagers
     // are available, try with another one.
     if (MonoTime::Now() < retrier().deadline()) {
-      if (multi_txn_manager) {
+      if (multi_txn_manager_) {
         ResetTxnManagerAndRetry(s);
       } else {
         mutable_retrier()->DelayedRetry(this, s);
@@ -200,7 +205,7 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
          err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) {
       // If TxnManager is too busy, try another one if it's around or try again
       // later with the same TxnManager.
-      if (multi_txn_manager) {
+      if (multi_txn_manager_) {
         ResetTxnManagerAndRetry(s);
       } else {
         mutable_retrier()->DelayedRetry(this, s);
@@ -225,7 +230,7 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
     const auto app_status = StatusFromPB(app_err.status());
     DCHECK(!app_status.ok());
     if (app_status.IsServiceUnavailable()) {
-      if (multi_txn_manager) {
+      if (multi_txn_manager_) {
         ResetTxnManagerAndRetry(app_status);
       } else {
         mutable_retrier()->DelayedRetry(this, app_status);
@@ -249,6 +254,8 @@ template class AsyncRandomTxnManagerRpc<CommitTransactionRequestPB,
                                         CommitTransactionResponsePB>;
 template class AsyncRandomTxnManagerRpc<GetTransactionStateRequestPB,
                                         GetTransactionStateResponsePB>;
+template class AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
+                                        KeepTransactionAliveResponsePB>;
 
 } // namespace internal
 } // namespace client
diff --git a/src/kudu/client/txn_manager_proxy_rpc.h b/src/kudu/client/txn_manager_proxy_rpc.h
index 77b7d8d..501c9e7 100644
--- a/src/kudu/client/txn_manager_proxy_rpc.h
+++ b/src/kudu/client/txn_manager_proxy_rpc.h
@@ -125,6 +125,9 @@ class AsyncRandomTxnManagerRpc : public rpc::Rpc {
   // Callback to call upon completion of the operation (whether the RPC itself
   // was successful or not).
   const StatusCallback user_cb_;
+
+  // Whether this is a multi-master cluster to work with.
+  const bool multi_txn_manager_;
 };
 
 } // namespace internal
diff --git a/src/kudu/integration-tests/txn_status_manager-itest.cc b/src/kudu/integration-tests/txn_status_manager-itest.cc
index 00807b1..38e060f 100644
--- a/src/kudu/integration-tests/txn_status_manager-itest.cc
+++ b/src/kudu/integration-tests/txn_status_manager-itest.cc
@@ -28,6 +28,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/client/client.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/gutil/map-util.h"
@@ -48,6 +49,9 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using kudu::client::KuduClient;
+using kudu::client::KuduTransaction;
+using kudu::client::sp::shared_ptr;
 using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::cluster::TabletIdAndTableName;
 using kudu::itest::TServerDetails;
@@ -64,10 +68,8 @@ using kudu::transactions::KeepTransactionAliveRequestPB;
 using kudu::transactions::KeepTransactionAliveResponsePB;
 using kudu::transactions::TxnManagerServiceProxy;
 using kudu::transactions::TxnStatePB;
-using std::shared_ptr;
 using std::string;
 using std::thread;
-using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -230,8 +232,8 @@ class TxnStatusManagerITest : public ExternalMiniClusterITestBase {
   ExternalMiniClusterOptions cluster_opts_;
   string txn_status_tablet_id_;
 
-  shared_ptr<rpc::Messenger> messenger_;
-  unique_ptr<TxnManagerServiceProxy> txn_manager_proxy_;
+  std::shared_ptr<rpc::Messenger> messenger_;
+  std::unique_ptr<TxnManagerServiceProxy> txn_manager_proxy_;
 };
 
 const MonoDelta TxnStatusManagerITest::kTimeout = MonoDelta::FromSeconds(15);
@@ -356,7 +358,7 @@ TEST_F(TxnStatusManagerITest, DISABLED_TxnKeepAliveMultiTxnStatusManagerInstance
     const auto period = MonoDelta::FromMilliseconds(keepalive_interval_ms / 2);
     const auto timeout = MonoDelta::FromMilliseconds(keepalive_interval_ms / 4);
     // Keepalive thread uses its own messenger and proxy.
-    shared_ptr<rpc::Messenger> m;
+    std::shared_ptr<rpc::Messenger> m;
     rpc::MessengerBuilder b("txn-keepalive");
     ASSERT_OK(b.Build(&m));
     const auto rpc_addr = cluster_->master()->bound_rpc_addr();
@@ -466,4 +468,35 @@ TEST_F(TxnStatusManagerITest, DISABLED_TxnKeepAliveMultiTxnStatusManagerInstance
   NO_FATALS(cluster_->AssertNoCrashes());
 }
 
+// Check that internal txn heartbeater in KuduClient keeps sending
+// KeepTransactionAlive requests even if no TxnStatusManager instance is
+// accessible for some time, and the txn keepalive messages reach the
+// destination after TxnStatusManager is back online. So, the txn should not be
+// auto-aborted when its KuduTransaction objects is kept in the scope.
+TEST_F(TxnStatusManagerITest, DISABLED_TxnKeptAliveByClientIfStatusManagerRestarted) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  shared_ptr<KuduClient> c;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &c));
+
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(c->NewTransaction(&txn));
+
+  for (auto idx = 0; idx < cluster_->num_tablet_servers(); ++idx) {
+    auto* ts = cluster_->tablet_server(idx);
+    ts->Shutdown();
+  }
+
+  SleepFor(MonoDelta::FromMilliseconds(3 * kTxnKeepaliveIntervalMs));
+
+  for (auto idx = 0; idx < cluster_->num_tablet_servers(); ++idx) {
+    auto* ts = cluster_->tablet_server(idx);
+    ASSERT_OK(ts->Restart());
+  }
+
+  SleepFor(MonoDelta::FromMilliseconds(5 * kTxnKeepaliveIntervalMs));
+
+  ASSERT_OK(txn->Commit(false /* wait */));
+  NO_FATALS(cluster_->AssertNoCrashes());
+}
+
 } // namespace kudu
diff --git a/src/kudu/transactions/transactions.proto b/src/kudu/transactions/transactions.proto
index 8d9d9a9..042ab9c 100644
--- a/src/kudu/transactions/transactions.proto
+++ b/src/kudu/transactions/transactions.proto
@@ -55,4 +55,8 @@ message TxnTokenPB {
   // keep-alive heartbeats spaced by intervals equal or shorter than
   // 'keepalive_millis' milliseconds in duration.
   optional uint32 keepalive_millis = 2;
+
+  // Whether the client should automatically send keepalive messages once
+  // this token is deserialized into a runtime transaction handle.
+  optional bool enable_keepalive = 3;
 }


[kudu] 02/03: [master-test] fix race in ConcurrentGetTableSchemaTest

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c1d1b9ccebe73bd60d3f6c8c8d8f9c3c426bdb6b
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Dec 11 18:07:17 2020 -0800

    [master-test] fix race in ConcurrentGetTableSchemaTest
    
    This patch adds explicit joining for the worker threads before
    trying to accumulate per-thread counters.  Prior to this patch,
    a rare race was reported in TSAN builds.
    
    Change-Id: I5bc620c61320587fea92706c94b113a3bbd3e91e
    Reviewed-on: http://gerrit.cloudera.org:8080/16865
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/master/master-test.cc | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 0caf538..a897b00 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -1357,14 +1357,12 @@ TEST_P(ConcurrentGetTableSchemaTest, DirectMethodCall) {
       }
     });
   }
-  SCOPED_CLEANUP({
-    for (auto& t : caller_threads) {
-      t.join();
-    }
-  });
 
   SleepFor(kRunInterval);
   done.Store(true);
+  for (auto& t : caller_threads) {
+    t.join();
+  }
 
   const auto errors = accumulate(error_counters.begin(), error_counters.end(), 0UL);
   if (errors != 0) {


[kudu] 01/03: KUDU-2612: fuzz transactional inserts

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9197e2d5c96b94f227407e5c135c53f1b0182a65
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Mon Nov 2 22:58:35 2020 -0800

    KUDU-2612: fuzz transactional inserts
    
    This patch updates fuzz-itest to include transactionality to the test.
    Since transactionality keeps ops hidden until commit time, this meant
    adding some maps to keep track of new expected and pending state while
    ops were uncommitted.
    
    I also included a couple of test cases that were found to cause issues
    that were addressed in previous patches.
    
    For now, I've commented out the inclusion of transactional ops because
    I've found them to be flaky on account of a potential debug crash when
    merging transactional MRSs. This will be addressed in a follow-up
    commit, but I'd like to merge this first, as this test has been useful
    in testing the follow-up change. When fixed, however, the patches
    together passed fuzz-itest 1000/1000 times with slow tests enabled and
    disabled.
    
    Change-Id: I719d42327ab18fda874332c9d6e1ae34aca8e846
    Reviewed-on: http://gerrit.cloudera.org:8080/16699
    Reviewed-by: Grant Henke <gr...@apache.org>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/client.h                 |   6 +-
 src/kudu/integration-tests/fuzz-itest.cc | 690 ++++++++++++++++++++++++++-----
 2 files changed, 582 insertions(+), 114 deletions(-)

diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 8b8557a..66f7dbb 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -67,6 +67,10 @@ class KuduClient;
 class KuduTable;
 } // namespace client
 
+namespace tablet {
+class FuzzTest;
+} // namespace tablet
+
 namespace transactions {
 class CoordinatorRpc;
 class TxnSystemClient;
@@ -2228,7 +2232,7 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
   friend class KuduClient;
   friend class KuduTransaction;
   friend class internal::Batcher;
-
+  friend class tablet::FuzzTest;
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
   FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index b17eac9..468ff6c 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -24,6 +24,8 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -39,6 +41,7 @@
 #include "kudu/client/scan_batch.h"
 #include "kudu/client/scan_predicate.h"
 #include "kudu/client/schema.h"
+#include "kudu/client/session-internal.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/client/value.h"
 #include "kudu/client/write_op.h"
@@ -47,8 +50,12 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/schema.h"
+#include "kudu/common/txn_id.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -58,15 +65,19 @@
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/txn_participant-test-util.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
 DEFINE_int32(keyspace_size, 5,  "number of distinct primary keys to test with");
+DEFINE_int32(max_open_txns, 5,  "maximum number of open transactions to test with");
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(scanner_allow_snapshot_scans_with_logical_timestamps);
 DECLARE_bool(use_hybrid_clock);
@@ -88,11 +99,17 @@ using kudu::client::KuduWriteOperation;
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
+using kudu::tserver::ParticipantOpPB;
+using kudu::tserver::ParticipantResponsePB;
 using std::list;
 using std::map;
+using std::make_pair;
+using std::pair;
 using std::string;
 using std::vector;
 using std::unique_ptr;
+using std::unordered_map;
+using std::unordered_set;
 using strings::Substitute;
 
 namespace kudu {
@@ -120,6 +137,9 @@ enum TestOpType {
   TEST_RESTART_TS,
   TEST_SCAN_AT_TIMESTAMP,
   TEST_DIFF_SCAN,
+  TEST_BEGIN_TXN,
+  TEST_COMMIT_TXN,
+  TEST_ABORT_TXN,
   TEST_NUM_OP_TYPES // max value for enum
 };
 
@@ -143,13 +163,20 @@ const char* TestOpType_names[] = {
   "TEST_COMPACT_TABLET",
   "TEST_RESTART_TS",
   "TEST_SCAN_AT_TIMESTAMP",
-  "TEST_DIFF_SCAN"
+  "TEST_DIFF_SCAN",
+  "TEST_BEGIN_TXN",
+  "TEST_COMMIT_TXN",
+  "TEST_ABORT_TXN",
 };
+constexpr const int kNoTxnId = -1;
+// Identical to kNoTxnId but more generic-sounding for ops that don't use
+// transaction IDs.
+constexpr const int kNoVal = -1;
 
 // An operation in a fuzz-test sequence.
 struct TestOp {
   // NOLINTNEXTLINE(google-explicit-constructor)
-  TestOp(TestOpType t, int v1 = 0, int v2 = 0) // NOLINT(runtime/explicit)
+  TestOp(TestOpType t, int v1 = kNoVal, int v2 = kNoVal) // NOLINT(runtime/explicit)
       : type(t),
         val(v1),
         val2(v2) {}
@@ -160,16 +187,18 @@ struct TestOp {
   // For INSERT/UPSERT/UPDATE/DELETE, the key of the row to be modified.
   // For SCAN_AT_TIMESTAMP the timestamp of the scan.
   // For DIFF_SCAN the start timestamp of the scan.
+  // For BEGIN_TXN/COMMIT_TXN/ABORT_TXN, the txn ID to operate on.
+  // For FLUSH_OPS, the txn ID to operate on.
   // Otherwise, unused.
   int val;
 
+  // For INSERT, the transaction ID to insert with (kNoTxnId means none).
   // For DIFF_SCAN, the end timestamp of the scan.
   // Otherwise, unused.
   int val2;
 
   string ToString() const {
     switch (type) {
-      case TEST_FLUSH_OPS:
       case TEST_FLUSH_TABLET:
       case TEST_COMPACT_TABLET:
       case TEST_FLUSH_DELTAS:
@@ -177,10 +206,7 @@ struct TestOp {
       case TEST_MINOR_COMPACT_DELTAS:
       case TEST_RESTART_TS:
         return strings::Substitute("{$0}", TestOpType_names[type]);
-      case TEST_INSERT:
-      case TEST_INSERT_PK_ONLY:
-      case TEST_INSERT_IGNORE:
-      case TEST_INSERT_IGNORE_PK_ONLY:
+      case TEST_FLUSH_OPS:
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY:
       case TEST_UPDATE:
@@ -188,7 +214,14 @@ struct TestOp {
       case TEST_DELETE:
       case TEST_DELETE_IGNORE:
       case TEST_SCAN_AT_TIMESTAMP:
+      case TEST_BEGIN_TXN:
+      case TEST_COMMIT_TXN:
+      case TEST_ABORT_TXN:
         return strings::Substitute("{$0, $1}", TestOpType_names[type], val);
+      case TEST_INSERT:
+      case TEST_INSERT_PK_ONLY:
+      case TEST_INSERT_IGNORE:
+      case TEST_INSERT_IGNORE_PK_ONLY:
       case TEST_DIFF_SCAN:
         return strings::Substitute("{$0, $1, $2}", TestOpType_names[type], val, val2);
       default:
@@ -223,6 +256,8 @@ struct Redo {
   optional<int32_t> val;
 };
 
+// TODO(awong): Merging multiple transactional MRSs together can sometimes lead
+// to a crash. Uncomment the transactional ops once fixed.
 const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_INSERT_PK_ONLY,
                                   TEST_INSERT_IGNORE,
@@ -242,7 +277,14 @@ const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_RESTART_TS,
                                   TEST_SCAN_AT_TIMESTAMP,
                                   TEST_DIFF_SCAN};
-
+                                  // TEST_BEGIN_TXN,
+                                  // TEST_COMMIT_TXN,
+                                  // TEST_ABORT_TXN};
+
+// Ops that focus on hammering workloads in which rows come in and out of
+// existence.
+// TODO(awong): Merging multiple transactional MRSs together can sometimes lead
+// to a crash. Uncomment the transactional ops once fixed.
 const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
                                      TEST_INSERT_IGNORE_PK_ONLY,
                                      TEST_UPSERT_PK_ONLY,
@@ -257,6 +299,9 @@ const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
                                      TEST_RESTART_TS,
                                      TEST_SCAN_AT_TIMESTAMP,
                                      TEST_DIFF_SCAN};
+                                     // TEST_BEGIN_TXN,
+                                     // TEST_COMMIT_TXN,
+                                     // TEST_ABORT_TXN};
 
 // Test which does only random operations against a tablet, including update and random
 // get (ie scans with equal lower and upper bounds).
@@ -331,11 +376,23 @@ class FuzzTest : public KuduTest {
     return tablet_replica_->tablet();
   }
 
+  Status CallParticipantOpCheckResp(int64_t txn_id, ParticipantOpPB::ParticipantOpType op_type,
+                                    int64_t ts_val) {
+    RETURN_NOT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(
+        MonoDelta::FromSeconds(10)));
+    ParticipantResponsePB resp;
+    RETURN_NOT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op_type, ts_val, &resp));
+    if (resp.has_error()) {
+      return StatusFromPB(resp.error().status());
+    }
+    return Status::OK();
+  }
+
   // Adds an insert for the given key/value pair to 'ops', returning the new contents
   // of the row.
   ExpectedKeyValueRow InsertOrUpsertRow(int key, int val,
                                         optional<ExpectedKeyValueRow> old_row,
-                                        TestOpType type) {
+                                        TestOpType type, int txn_id) {
     ExpectedKeyValueRow ret;
     unique_ptr<KuduWriteOperation> op;
     if (type == TEST_INSERT || type == TEST_INSERT_PK_ONLY) {
@@ -375,7 +432,11 @@ class FuzzTest : public KuduTest {
       }
       default: LOG(FATAL) << "Invalid test op type: " << TestOpType_names[type];
     }
-    CHECK_OK(session_->Apply(op.release()));
+    if (txn_id == kNoTxnId) {
+      CHECK_OK(session_->Apply(op.release()));
+    } else {
+      CHECK_OK(FindOrDie(txn_sessions_, txn_id)->Apply(op.release()));
+    }
     return ret;
   }
 
@@ -392,7 +453,7 @@ class FuzzTest : public KuduTest {
     KuduPartialRow* row = op->mutable_row();
     CHECK_OK(row->SetInt32(0, key));
     ret.key = key;
-    if (new_val & 1) {
+    if (new_val % 2) {
       CHECK_OK(row->SetNull(1));
     } else {
       CHECK_OK(row->SetInt32(1, new_val));
@@ -680,7 +741,7 @@ class FuzzTest : public KuduTest {
   //
   // Useful when using the 'delta' test case reduction tool to allow
   // it to skip invalid test cases.
-  void ValidateFuzzCase(const vector<TestOp>& test_ops);
+  static void ValidateFuzzCase(const vector<TestOp>& test_ops);
   void RunFuzzCase(const vector<TestOp>& test_ops,
                    int update_multiplier);
 
@@ -688,6 +749,7 @@ class FuzzTest : public KuduTest {
   unique_ptr<InternalMiniCluster> cluster_;
   shared_ptr<KuduClient> client_;
   shared_ptr<KuduSession> session_;
+  unordered_map<int, shared_ptr<KuduSession>> txn_sessions_;
   shared_ptr<KuduTable> table_;
 
   map<int,
@@ -736,12 +798,76 @@ bool IsMutation(const TestOpType& op) {
   }
 }
 
-// Generate a random valid sequence of operations for use as a
-// fuzz test.
+// Generate a random valid sequence of operations for use as a fuzz test, i.e.
+// a set of operations that, when run, will not run into any logical errors
+// (e.g. no "key already present" or "key not found" errors).
+//
+// To generate this sequence, we schedule one op at a time, keeping track of
+// what rows exist in the tablet, what row mutations are pending, what rows are
+// being mutated by in-flight ops and transactions, etc. We only select ops
+// that we know to be valid, using the tracked state.
 void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
+  // Bitset indicating whether the given row has been committed as a part of a
+  // transaction, or successfully inserted outside of a transaction.
   vector<bool> exists(FLAGS_keyspace_size);
+
+  // The transaction ID that is operating on each row, or kNoTxnId if being
+  // operated on outside the context of a transaction. 'none' if the row isn't
+  // being operated on. If a row is not 'none', an entry must exist for it in
+  // 'pending_existence_per_txn'.
+  // TODO(awong): This is necessary so we don't have transactions trying to
+  // write the same rows, which currently is unprotected. Once we begin locking
+  // rows for the duration of the transaction, we shouldn't need this.
+  vector<optional<int>> txn_touching_row(FLAGS_keyspace_size);
+
+  // Represents the pending mutations to rows that are not yet visible to other
+  // actors, and the resulting existence status of the row if we were to flush
+  // a non-transactional session (keyed as kNoTxnId) or commit a transaction
+  // (keyed by any other value).
+  //
+  // This is used to update 'exists' when scheduling a non-transactional
+  // session flush or a transaction commit. It is also used to determine
+  // whether we can operate on pending values, e.g. insert a row and then
+  // delete it in the same batched op.
+  //
+  // A row key can exist in at most one IsPresentByRowKey in any form (i.e. row
+  // 1's existence cannot be to false in two transactions), ensuring only a
+  // single actor operates on a row at a time.
+  typedef std::map<int, bool> IsPresentByRowKey;
+  unordered_map<int, IsPresentByRowKey> pending_existence_per_txn;
+  EmplaceOrDie(&pending_existence_per_txn, kNoTxnId, IsPresentByRowKey{});
+
+  // The transactions that have client sessions that need to be flushed.
+  unordered_set<int> txns_needing_session_flush;
+
+  // Returns whether there are any open transactions.
+  const auto no_open_txns = [&pending_existence_per_txn] {
+    // A single entry exists for kNoTxnId.
+    return pending_existence_per_txn.size() == 1;
+  };
+  // Helper that deterministically (based on rand()) selects a transaction ID
+  // from those in flight, or, if 'maybe_none' is true, kNoTxnId to indicate a
+  // non-transactional operation.
+  const auto pick_txn_id = [&] (bool maybe_none) -> int {
+    if (no_open_txns() || (maybe_none && rand() % 2)) {
+      // Shouldn't be called when maybe_none is false if there are no pending
+      // transactions.
+      DCHECK(maybe_none);
+      return kNoTxnId;
+    }
+    const auto& num_txns = pending_existence_per_txn.size() - 1;
+    vector<int> txn_ids;
+    txn_ids.reserve(num_txns);
+    for (const auto& txn_id_and_rows : pending_existence_per_txn) {
+      if (txn_id_and_rows.first == kNoTxnId) continue;
+      txn_ids.emplace_back(txn_id_and_rows.first);
+    }
+    std::sort(txn_ids.begin(), txn_ids.end());
+    return txn_ids[rand() % num_txns];
+  };
+
+  int next_txn_id = 0;
   int op_timestamps = 0;
-  bool ops_pending = false;
   bool data_in_mrs = false;
   bool worth_compacting = false;
   bool data_in_dms = false;
@@ -758,87 +884,198 @@ void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
 
     switch (r) {
       case TEST_INSERT:
-      case TEST_INSERT_PK_ONLY:
-        if (exists[row_key]) continue;
-        ops->emplace_back(r, row_key);
-        exists[row_key] = true;
-        ops_pending = true;
-        data_in_mrs = true;
+      case TEST_INSERT_PK_ONLY: {
+        const auto& txn_id = pick_txn_id(/*maybe_none*/true);
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != txn_id) {
+          // The row is being operated on by another txn.
+          continue;
+        }
+        if (txn_operating_on_row && *txn_operating_on_row == txn_id &&
+            FindOrDie(FindOrDie(pending_existence_per_txn, txn_id), row_key)) {
+          // The row is being operated on by this txn and the row pending
+          // state exists.
+          continue;
+        }
+        if (!txn_operating_on_row && exists[row_key]) {
+          // The row is not being operated on by another txn, but the row
+          // exists.
+          continue;
+        }
+        ops->emplace_back(r, row_key, txn_id);
+        txn_touching_row[row_key] = txn_id;
+        FindOrDie(pending_existence_per_txn, txn_id)[row_key] = true;
+        EmplaceIfNotPresent(&txns_needing_session_flush, txn_id);
+        if (txn_id == kNoTxnId) {
+          data_in_mrs = true;
+        }
         break;
+      }
       case TEST_INSERT_IGNORE:
-      case TEST_INSERT_IGNORE_PK_ONLY:
-        ops->emplace_back(r, row_key);
-        ops_pending = true;
-        // If the row doesn't currently exist, this will act like an insert
-        // and put it into MRS.
-        if (!exists[row_key]) {
+      case TEST_INSERT_IGNORE_PK_ONLY: {
+        const auto& txn_id = pick_txn_id(/*maybe_none*/true);
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != txn_id) {
+          // The row is being operated on by another txn.
+          continue;
+        }
+        ops->emplace_back(r, row_key, txn_id);
+        txn_touching_row[row_key] = txn_id;
+        FindOrDie(pending_existence_per_txn, txn_id)[row_key] = true;
+        EmplaceIfNotPresent(&txns_needing_session_flush, txn_id);
+        if (txn_id == kNoTxnId && !exists[row_key]) {
           data_in_mrs = true;
         }
-        exists[row_key] = true;
         break;
+      }
       case TEST_UPSERT:
-      case TEST_UPSERT_PK_ONLY:
+      case TEST_UPSERT_PK_ONLY: {
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
+          // The row is being operated on by a txn.
+          continue;
+        }
         ops->emplace_back(r, row_key);
-        ops_pending = true;
-        // If the row doesn't currently exist, this will act like an insert
-        // and put it into MRS.
-        if (!exists[row_key]) {
+        txn_touching_row[row_key] = kNoTxnId;
+        auto& row_exists = FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key];
+        if (!row_exists) {
           data_in_mrs = true;
-        } else if (!data_in_mrs) {
-          // If it does exist, but not in MRS, then this will put data into
-          // a DMS.
+        } else {
           data_in_dms = true;
         }
-        exists[row_key] = true;
+        row_exists = true;
+        EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
         break;
-      case TEST_UPDATE:
-        if (!exists[row_key]) continue;
-        ops->emplace_back(r, row_key);
-        ops_pending = true;
-        if (!data_in_mrs) {
-          data_in_dms = true;
+      }
+      case TEST_UPDATE_IGNORE: {
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
+          // The row is being operated on by another txn.
+          continue;
         }
-        break;
-      case TEST_UPDATE_IGNORE:
-        ops->emplace_back(r, row_key);
-        ops_pending = true;
-        // If it does exist, this will act like an update and put it into
-        // a DMS.
-        if (exists[row_key] && !data_in_mrs) {
-          data_in_dms = true;
+        if (!txn_operating_on_row) {
+          if (exists[row_key] && !data_in_mrs) {
+            // The row is not being operated on by another txn, and it exists and
+            // has been flushed, meaning this op will result in a DMS mutation.
+            data_in_dms = true;
+          }
+          // The existence status shouldn't change.
+          FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = exists[row_key];
         }
+        ops->emplace_back(r, row_key);
+        txn_touching_row[row_key] = kNoTxnId;
+        EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
         break;
-      case TEST_DELETE:
-        if (!exists[row_key]) continue;
+      }
+      case TEST_UPDATE: {
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
+          // The row is being operated on by another txn.
+          continue;
+        }
+        if (txn_operating_on_row && *txn_operating_on_row == kNoTxnId &&
+            !FindOrDie(FindOrDie(pending_existence_per_txn, kNoTxnId), row_key)) {
+          // The row is being operated on by an in-flight op, but the pending
+          // row state doesn't exist.
+          continue;
+        }
+        if (!txn_operating_on_row) {
+          if (!exists[row_key]) {
+            // The row is not being operated on by another txn, but the row
+            // doesn't exist so we can't update anything.
+            continue;
+          }
+          if (!data_in_mrs) {
+            // The row is not being operated on by another txn, and it exists
+            // in a DRS, meaning this op will result in a DMS mutation.
+            data_in_dms = true;
+          }
+          // The existence status shouldn't change.
+          FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = exists[row_key];
+        }
         ops->emplace_back(r, row_key);
-        ops_pending = true;
-        if (!data_in_mrs) {
+        txn_touching_row[row_key] = kNoTxnId;
+        EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
+        break;
+      }
+      case TEST_DELETE_IGNORE: {
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
+          // The row is being operated on by another txn.
+          continue;
+        }
+        if (!txn_operating_on_row && exists[row_key] && !data_in_mrs) {
+          // The row is not being operated on by another txn, and it exists in
+          // a DRS, meaning this op will result in a DMS mutation.
           data_in_dms = true;
         }
-        exists[row_key] = false;
-        break;
-      case TEST_DELETE_IGNORE:
         ops->emplace_back(r, row_key);
-        ops_pending = true;
-        // If it does exist, this will act like a delete and put it into
-        // a DMS.
-        if (exists[row_key] && !data_in_mrs) {
+        FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = false;
+        txn_touching_row[row_key] = kNoTxnId;
+        EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
+        break;
+      }
+      case TEST_DELETE: {
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
+          // The row is being operated on by a txn. Since we don't support
+          // mutating a row while it is participating in a transaction, we must
+          // wait for transaction to complete before doing anything.
+          continue;
+        }
+        if (txn_operating_on_row && *txn_operating_on_row == kNoTxnId &&
+            !FindOrDie(FindOrDie(pending_existence_per_txn, kNoTxnId), row_key)) {
+          // The row is being operated on by a non-transactional in-flight op,
+          // meaning we can only correctly delete the row if the in-flight op
+          // were to insert the row, making the row's existence pending.
+          // Otherwise, we cannot schedule a delete.
+          continue;
+        }
+        if (!txn_operating_on_row && !exists[row_key]) {
+          // The row is not being operated on by another txn, but the row
+          // doesn't exist, so we cannot schedule a delete.
+          continue;
+        }
+        ops->emplace_back(TEST_DELETE, row_key);
+        FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = false;
+        txn_touching_row[row_key] = kNoTxnId;
+        EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
+        if (!data_in_mrs) {
+          // The row exists in a DRS, so this op will result in a DMS mutation.
           data_in_dms = true;
         }
-        exists[row_key] = false;
         break;
-      case TEST_FLUSH_OPS:
-        if (ops_pending) {
-          ops->emplace_back(TEST_FLUSH_OPS);
-          ops_pending = false;
-          op_timestamps++;
+      }
+      case TEST_FLUSH_OPS: {
+        const auto& txn_id = pick_txn_id(/*maybe_none*/true);
+        // If the picked transaction doesn't have any ops buffered in its
+        // session yet, pick a new action.
+        if (!ContainsKey(txns_needing_session_flush, txn_id)) continue;
+        if (txn_id == kNoTxnId) {
+          // If flushing rows that aren't part of any transaction, apply
+          // their state immediately.
+          auto& pending_existence_per_row = FindOrDie(pending_existence_per_txn, kNoTxnId);
+          for (const auto& key_and_exists : pending_existence_per_row) {
+            const auto& row_key = key_and_exists.first;
+            exists[row_key] = key_and_exists.second;
+            DCHECK_EQ(kNoTxnId, txn_touching_row[row_key]);
+            txn_touching_row[row_key] = boost::none;
+          }
+          pending_existence_per_row.clear();
         }
+        op_timestamps++;
+        txns_needing_session_flush.erase(txn_id);
+        ops->emplace_back(TEST_FLUSH_OPS, txn_id);
         break;
+      }
       case TEST_FLUSH_TABLET:
         if (data_in_mrs) {
-          if (ops_pending) {
+          // Non-transactions eagerly set 'data_in_mrs', expecting a session op
+          // to be scheduled alongside the tablet flush. Transactions don't do
+          // this -- they only set 'data_in_mrs' once committed.
+          if (ContainsKey(txns_needing_session_flush, kNoTxnId)) {
             ops->emplace_back(TEST_FLUSH_OPS);
-            ops_pending = false;
+            txns_needing_session_flush.erase(kNoTxnId);
           }
           ops->emplace_back(TEST_FLUSH_TABLET);
           data_in_mrs = false;
@@ -847,9 +1084,9 @@ void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
         break;
       case TEST_COMPACT_TABLET:
         if (worth_compacting) {
-          if (ops_pending) {
+          if (ContainsKey(txns_needing_session_flush, kNoTxnId)) {
             ops->emplace_back(TEST_FLUSH_OPS);
-            ops_pending = false;
+            txns_needing_session_flush.erase(kNoTxnId);
           }
           ops->emplace_back(TEST_COMPACT_TABLET);
           worth_compacting = false;
@@ -857,9 +1094,9 @@ void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
         break;
       case TEST_FLUSH_DELTAS:
         if (data_in_dms) {
-          if (ops_pending) {
+          if (ContainsKey(txns_needing_session_flush, kNoTxnId)) {
             ops->emplace_back(TEST_FLUSH_OPS);
-            ops_pending = false;
+            txns_needing_session_flush.erase(kNoTxnId);
           }
           ops->emplace_back(TEST_FLUSH_DELTAS);
           data_in_dms = false;
@@ -895,6 +1132,55 @@ void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
         ops->emplace_back(TEST_DIFF_SCAN, start_timestamp, end_timestamp);
         break;
       }
+      case TEST_BEGIN_TXN: {
+        // If we have --max_open_txns open transactions, we can't begin a new
+        // transaction. NOTE: 'pending_existence_per_txn' also includes
+        // kNoTxnId, hence the extra count.
+        if (pending_existence_per_txn.size() == 1 + FLAGS_max_open_txns) continue;
+        const auto txn_id = next_txn_id++;
+        ops->emplace_back(r, txn_id);
+        EmplaceOrDie(&pending_existence_per_txn, txn_id, IsPresentByRowKey{});
+        op_timestamps++;
+        break;
+      }
+      case TEST_COMMIT_TXN: {
+        if (no_open_txns()) continue;
+        const auto txn_id = pick_txn_id(/*maybe_none*/false);
+        DCHECK_NE(kNoTxnId, txn_id);
+        // If there are ops pending for this transaction, we need to flush them too.
+        if (ContainsKey(txns_needing_session_flush, txn_id)) {
+          op_timestamps++;
+          txns_needing_session_flush.erase(txn_id);
+          ops->emplace_back(TEST_FLUSH_OPS, txn_id);
+        }
+        ops->emplace_back(r, txn_id);
+        auto pending_existence = EraseKeyReturnValuePtr(&pending_existence_per_txn, txn_id);
+        for (const auto& key_and_exists : pending_existence) {
+          const auto& key = key_and_exists.first;
+          const auto& key_exists = key_and_exists.second;
+          DCHECK(key_exists); // only support inserts
+          // Since we're commiting the transaction, its MRS should hold state
+          // if there are any inserted rows.
+          data_in_mrs = true;
+          exists[key] = true;
+          txn_touching_row[key] = boost::none;
+        }
+        // Commit replicates two ops (BEGIN_COMMIT and FINALIZE_COMMIT).
+        op_timestamps += 2;
+        break;
+      }
+      case TEST_ABORT_TXN: {
+        if (no_open_txns()) continue;
+        const auto txn_id = pick_txn_id(/*maybe_none*/false);
+        DCHECK_NE(kNoTxnId, txn_id);
+        ops->emplace_back(r, txn_id);
+        auto pending_existence = EraseKeyReturnValuePtr(&pending_existence_per_txn, txn_id);
+        for (const auto& key_and_exists : pending_existence) {
+          txn_touching_row[key_and_exists.first] = boost::none;
+        }
+        op_timestamps++;
+        break;
+      }
       default:
         LOG(FATAL) << "Invalid op type: " << r;
     }
@@ -911,15 +1197,26 @@ string DumpTestCase(const vector<TestOp>& ops) {
 
 void FuzzTest::ValidateFuzzCase(const vector<TestOp>& test_ops) {
   vector<bool> exists(FLAGS_keyspace_size);
+  unordered_map<int, vector<std::pair<int, TestOpType>>> pending_rows_per_txn;
   for (const auto& test_op : test_ops) {
     switch (test_op.type) {
       case TEST_INSERT:
       case TEST_INSERT_PK_ONLY:
         CHECK(!exists[test_op.val]) << "invalid case: inserting already-existing row";
-        exists[test_op.val] = true;
-        break;
+        FALLTHROUGH_INTENDED;
       case TEST_INSERT_IGNORE:
-      case TEST_INSERT_IGNORE_PK_ONLY:
+      case TEST_INSERT_IGNORE_PK_ONLY: {
+        const auto& txn_id = test_op.val2;
+        if (txn_id == kNoTxnId) {
+          exists[test_op.val] = true;
+        } else {
+          auto& rows = FindOrDie(pending_rows_per_txn, txn_id);
+          rows.emplace_back(make_pair(test_op.val, test_op.type));
+        }
+        break;
+      }
+      // TODO(awong): UPSERT, UPDATE, and DELETE ops should account for
+      // 'pending_rows_per_txn' once we begin supporting transactions.
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY:
         exists[test_op.val] = true;
@@ -937,6 +1234,49 @@ void FuzzTest::ValidateFuzzCase(const vector<TestOp>& test_ops) {
       case TEST_DELETE_IGNORE:
         exists[test_op.val] = false;
         break;
+      case TEST_BEGIN_TXN:
+        EmplaceOrDie(&pending_rows_per_txn, test_op.val, vector<pair<int, TestOpType>>());
+        break;
+      case TEST_COMMIT_TXN: {
+        auto rows_and_ops = EraseKeyReturnValuePtr(&pending_rows_per_txn, test_op.val);
+        for (const auto& row_and_op : rows_and_ops) {
+          const auto& row = row_and_op.first;
+          const auto& op = row_and_op.second;
+          switch (op) {
+            case TEST_INSERT:
+            case TEST_INSERT_PK_ONLY:
+              CHECK(!exists[row]);
+              FALLTHROUGH_INTENDED;
+            case TEST_INSERT_IGNORE:
+            case TEST_INSERT_IGNORE_PK_ONLY:
+              exists[row] = true;
+              break;
+            default:
+              LOG(DFATAL) << "transactions only support insert operations";
+          }
+        }
+        break;
+      }
+      case TEST_ABORT_TXN: {
+        // Ensure that the rows this transaction was operating on were valid.
+        auto rows_and_ops = EraseKeyReturnValuePtr(&pending_rows_per_txn, test_op.val);
+        for (const auto& row_and_op : rows_and_ops) {
+          const auto& row = row_and_op.first;
+          const auto& op = row_and_op.second;
+          switch (op) {
+            case TEST_INSERT:
+            case TEST_INSERT_PK_ONLY:
+              CHECK(!exists[row]);
+              break;
+            case TEST_INSERT_IGNORE:
+            case TEST_INSERT_IGNORE_PK_ONLY:
+              break;
+            default:
+              LOG(DFATAL) << "transactions only support insert operations";
+          }
+        }
+        break;
+      }
       default:
         break;
     }
@@ -951,17 +1291,37 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
   // into a test method in order to reproduce a failure.
   LOG(INFO) << "test case:\n" << DumpTestCase(test_ops);
 
+  // Represent the expected state of the table if we were to flush ops.
   vector<optional<ExpectedKeyValueRow>> cur_val(FLAGS_keyspace_size);
-  vector<optional<ExpectedKeyValueRow>> pending_val(FLAGS_keyspace_size);
-  vector<Redo> pending_redos;
+
+  // The ops that are current pending in a session (not flushed yet).
+  typedef unordered_map<int, optional<ExpectedKeyValueRow>> ValueByRowKey;
+  unordered_map<int, ValueByRowKey> pending_vals_per_txn;
+  EmplaceOrDie(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{});
+
+  // We keep track of the redos too so it's easier to piece together the
+  // expected results of a diff scan.
+  unordered_map<int, vector<Redo>> pending_redos_per_txn;
+  EmplaceOrDie(&pending_redos_per_txn, kNoTxnId, vector<Redo>{});
+
+  // Returns the latest value for the given 'row_key' that is pending for the
+  // given transaction. If no mutations are pending for the 'row_key' in the
+  // given transaction, returns the latest committed value.
+  const auto pending_row_by_key_for_txn = [&] (int row_key, int txn_id) {
+    auto* pending_row_by_key = FindOrNull(pending_vals_per_txn, txn_id);
+    if (pending_row_by_key && ContainsKey(*pending_row_by_key, row_key)) {
+      return (*pending_row_by_key)[row_key];
+    }
+    return cur_val[row_key];
+  };
 
   int i = 0;
   for (const TestOp& test_op : test_ops) {
+    LOG(INFO) << "Running op " << test_op.ToString();
     if (IsMutation(test_op.type)) {
       EXPECT_EQ(cur_val[test_op.val], GetRow(test_op.val));
     }
 
-    LOG(INFO) << test_op.ToString();
     switch (test_op.type) {
       case TEST_INSERT:
       case TEST_INSERT_PK_ONLY:
@@ -969,69 +1329,90 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
       case TEST_INSERT_IGNORE_PK_ONLY:
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY: {
-        RedoType rtype = pending_val[test_op.val] ? UPDATE : INSERT;
-        pending_val[test_op.val] = InsertOrUpsertRow(
-            test_op.val, i++, pending_val[test_op.val], test_op.type);
+        const auto& row_key = test_op.val;
+        const auto& txn_id = test_op.val2;
+        const auto& old_row = pending_row_by_key_for_txn(row_key, txn_id);
+        RedoType rtype = old_row ? UPDATE : INSERT;
+        auto pending_row = InsertOrUpsertRow(
+            row_key, i++, old_row, test_op.type, txn_id);
 
-        // An insert ignore on a row that already exists will be dropped server-side.
-        // We must do the same.
-        if ((test_op.type == TEST_INSERT_IGNORE || test_op.type == TEST_INSERT_IGNORE_PK_ONLY) &&
-            rtype == UPDATE) {
-          break;
-        }
+        auto& pending_row_by_key = LookupOrEmplace(&pending_vals_per_txn, txn_id, ValueByRowKey{});
+        EmplaceOrUpdate(&pending_row_by_key, row_key, pending_row);
 
+        // An insert ignore on a row that already exists will be dropped server-side.
         // An "upsert PK-only" that is converted into an update will be dropped server-side.
         // We must do the same.
-        if (test_op.type == TEST_UPSERT_PK_ONLY && rtype == UPDATE) {
+        if ((test_op.type == TEST_INSERT_IGNORE ||
+             test_op.type == TEST_INSERT_IGNORE_PK_ONLY ||
+             test_op.type == TEST_UPSERT_PK_ONLY) &&
+            rtype == UPDATE) {
           break;
         }
 
-        pending_redos.emplace_back(rtype, test_op.val, pending_val[test_op.val]->val);
+        // There will actually be an effect on the server-side state so keep
+        // track of the change.
+        FindOrDie(pending_redos_per_txn, txn_id).emplace_back(rtype, row_key, pending_row.val);
         break;
       }
       case TEST_UPDATE:
       case TEST_UPDATE_IGNORE: {
-        // An update ignore on a row that doesn't exist will be dropped server-side.
-        // We must do the same.
-        if (test_op.type == TEST_UPDATE_IGNORE && !pending_val[test_op.val]) {
+        const auto& row_key = test_op.val;
+        if (test_op.type == TEST_UPDATE_IGNORE && !pending_row_by_key_for_txn(row_key, kNoTxnId)) {
           // Still call MutateRow to apply the UPDATE_IGNORE operations to the session.
           // However don't adjust the pending values given the operation will be ignored.
           for (int j = 0; j < update_multiplier; j++) {
-            MutateRow(test_op.val, i++, test_op.type);
+            MutateRow(row_key, i++, test_op.type);
           }
           break;
         }
-
+        ExpectedKeyValueRow latest_update;
         for (int j = 0; j < update_multiplier; j++) {
-          pending_val[test_op.val] = MutateRow(test_op.val, i++, test_op.type);
-          pending_redos.emplace_back(UPDATE, test_op.val, pending_val[test_op.val]->val);
+          latest_update = MutateRow(row_key, i++, test_op.type);
         }
+        FindOrDie(pending_redos_per_txn, kNoTxnId).emplace_back(UPDATE, row_key, latest_update.val);
+        auto& pending_row_by_key =
+            LookupOrEmplace(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{});
+        EmplaceOrUpdate(&pending_row_by_key, row_key, latest_update);
         break;
       }
       case TEST_DELETE:
       case TEST_DELETE_IGNORE: {
-        // A delete ignore on a row that doesn't exist will be dropped server-side.
-        // We must do the same.
-        if (test_op.type == TEST_DELETE_IGNORE && !pending_val[test_op.val]) {
+        const auto& row_key = test_op.val;
+        DeleteRow(test_op.val, test_op.type);
+        if (test_op.type == TEST_DELETE_IGNORE && !pending_row_by_key_for_txn(row_key, kNoTxnId)) {
           // Still call DeleteRow to apply the DELETE_IGNORE operation to the session.
           // However don't adjust the pending values given the operation will be ignored.
-          DeleteRow(test_op.val, test_op.type);
           break;
         }
-
-        pending_val[test_op.val] = DeleteRow(test_op.val, test_op.type);
-        pending_redos.emplace_back(DELETE, test_op.val, boost::none);
+        FindOrDie(pending_redos_per_txn, kNoTxnId).emplace_back(DELETE, row_key, boost::none);
+        auto& pending_row_by_key =
+            LookupOrEmplace(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{});
+        EmplaceOrUpdate(&pending_row_by_key, row_key, boost::none);
         break;
       }
       case TEST_FLUSH_OPS: {
-        FlushSessionOrDie(session_);
-        cur_val = pending_val;
-        int current_time = down_cast<kudu::clock::LogicalClock*>(
-            tablet()->clock())->GetCurrentTime();
-        VLOG(1) << "Current time: " << current_time;
-        saved_values_[current_time] = cur_val;
-        saved_redos_[current_time] = pending_redos;
-        pending_redos.clear();
+        const auto& txn_id = test_op.val;
+        auto session = txn_id == kNoTxnId ? session_ : FindOrDie(txn_sessions_, txn_id);
+        FlushSessionOrDie(session);
+        // Only update the saved and pending values if the flush is _not_ part
+        // of a transaction. Transactional mutations should only take effect
+        // once committed.
+        if (txn_id == kNoTxnId) {
+          int current_time = down_cast<kudu::clock::LogicalClock*>(
+              tablet()->clock())->GetCurrentTime();
+          VLOG(1) << "Current time: " << current_time;
+
+          auto& pending_vals_no_txn = FindOrDie(pending_vals_per_txn, kNoTxnId);
+          for (const auto& kv : pending_vals_no_txn) {
+            cur_val[kv.first] = kv.second;
+          }
+          pending_vals_no_txn.clear();
+          saved_values_[current_time] = cur_val;
+
+          auto& pending_redos_no_txn = FindOrDie(pending_redos_per_txn, kNoTxnId);
+          saved_redos_[current_time] = pending_redos_no_txn;
+          pending_redos_no_txn.clear();
+        }
         break;
       }
       case TEST_FLUSH_TABLET:
@@ -1058,6 +1439,47 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
       case TEST_DIFF_SCAN:
         NO_FATALS(CheckDiffScan(test_op.val, test_op.val2));
         break;
+      case TEST_BEGIN_TXN: {
+        const auto& txn_id = test_op.val;
+        shared_ptr<KuduSession> s(new KuduSession(client_, TxnId(txn_id)));
+        s->data_->Init(s);
+        ASSERT_OK(s->SetFlushMode(KuduSession::MANUAL_FLUSH));
+        s->SetTimeoutMillis(60 * 1000);
+        EmplaceOrDie(&txn_sessions_, txn_id, std::move(s));
+        EmplaceOrDie(&pending_vals_per_txn, txn_id, ValueByRowKey{});
+        EmplaceOrDie(&pending_redos_per_txn, txn_id, vector<Redo>{});
+        ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::BEGIN_TXN, -1));
+        break;
+      }
+      case TEST_COMMIT_TXN: {
+        const auto& txn_id = test_op.val;
+        // Before committing, flush all the rows we have pending for this transaction.
+        FlushSessionOrDie(FindOrDie(txn_sessions_, txn_id));
+
+        ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::BEGIN_COMMIT, -1));
+        int current_time = down_cast<kudu::clock::LogicalClock*>(
+            tablet()->clock())->GetCurrentTime();
+        ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::FINALIZE_COMMIT,
+                                             current_time));
+        VLOG(1) << "Current time: " << current_time;
+        auto txn_pending_vals = EraseKeyReturnValuePtr(&pending_vals_per_txn, txn_id);
+        for (const auto& kv : txn_pending_vals) {
+          cur_val[kv.first] = kv.second;
+        }
+        saved_values_[current_time] = cur_val;
+
+        auto txn_pending_redos = EraseKeyReturnValuePtr(&pending_redos_per_txn, txn_id);
+        saved_redos_[current_time] = txn_pending_redos;
+
+        txn_sessions_.erase(txn_id);
+        break;
+      }
+      case TEST_ABORT_TXN: {
+        const auto& txn_id = test_op.val;
+        ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::ABORT_TXN, -1));
+        txn_sessions_.erase(txn_id);
+        break;
+      }
       default:
         LOG(FATAL) << test_op.type;
     }
@@ -1379,6 +1801,48 @@ TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanDRS) {
       {TEST_DIFF_SCAN, 4, 7}
     });
 }
+TEST_F(FuzzTest, TestReplayDeletesOnTxnRowsets) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+      {TEST_INSERT_PK_ONLY, 1, -1},
+      {TEST_FLUSH_OPS, -1},
+      {TEST_FLUSH_TABLET},
+
+      {TEST_BEGIN_TXN, 2},
+      {TEST_INSERT_IGNORE_PK_ONLY, 0, 2},
+      {TEST_FLUSH_OPS, 2},
+      {TEST_COMMIT_TXN, 2},
+
+      {TEST_DELETE, 0},
+      {TEST_DELETE, 1},
+      {TEST_FLUSH_OPS, -1},
+
+      {TEST_FLUSH_DELTAS},
+      {TEST_RESTART_TS},
+    });
+}
+
+TEST_F(FuzzTest, TestFlushMRSsWithInvisibleRows) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+    {TEST_BEGIN_TXN, 0},
+    {TEST_INSERT_IGNORE, 1, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_COMMIT_TXN, 0},
+
+    {TEST_INSERT_PK_ONLY, 0, -1},
+    {TEST_INSERT_IGNORE_PK_ONLY, 0, -1},
+    {TEST_DELETE, 0},
+    {TEST_FLUSH_OPS, -1},
+
+    {TEST_RESTART_TS},
+    {TEST_MAJOR_COMPACT_DELTAS},
+
+    {TEST_DELETE, 1},
+    {TEST_FLUSH_OPS, -1},
+    {TEST_FLUSH_TABLET},
+  });
+}
 
 // Regression test for KUDU-3108. Previously caused us to have divergent 'hot'
 // and 'hotmaxes' containers in the merge iterator, causing us to read invalid