You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/12/12 03:08:32 UTC

[kudu] 01/03: KUDU-2612: fuzz transactional inserts

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9197e2d5c96b94f227407e5c135c53f1b0182a65
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Mon Nov 2 22:58:35 2020 -0800

    KUDU-2612: fuzz transactional inserts
    
    This patch updates fuzz-itest to include transactionality to the test.
    Since transactionality keeps ops hidden until commit time, this meant
    adding some maps to keep track of new expected and pending state while
    ops were uncommitted.
    
    I also included a couple of test cases that were found to cause issues
    that were addressed in previous patches.
    
    For now, I've commented out the inclusion of transactional ops because
    I've found them to be flaky on account of a potential debug crash when
    merging transactional MRSs. This will be addressed in a follow-up
    commit, but I'd like to merge this first, as this test has been useful
    in testing the follow-up change. When fixed, however, the patches
    together passed fuzz-itest 1000/1000 times with slow tests enabled and
    disabled.
    
    Change-Id: I719d42327ab18fda874332c9d6e1ae34aca8e846
    Reviewed-on: http://gerrit.cloudera.org:8080/16699
    Reviewed-by: Grant Henke <gr...@apache.org>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/client.h                 |   6 +-
 src/kudu/integration-tests/fuzz-itest.cc | 690 ++++++++++++++++++++++++++-----
 2 files changed, 582 insertions(+), 114 deletions(-)

diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 8b8557a..66f7dbb 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -67,6 +67,10 @@ class KuduClient;
 class KuduTable;
 } // namespace client
 
+namespace tablet {
+class FuzzTest;
+} // namespace tablet
+
 namespace transactions {
 class CoordinatorRpc;
 class TxnSystemClient;
@@ -2228,7 +2232,7 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
   friend class KuduClient;
   friend class KuduTransaction;
   friend class internal::Batcher;
-
+  friend class tablet::FuzzTest;
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
   FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
   FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index b17eac9..468ff6c 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -24,6 +24,8 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -39,6 +41,7 @@
 #include "kudu/client/scan_batch.h"
 #include "kudu/client/scan_predicate.h"
 #include "kudu/client/schema.h"
+#include "kudu/client/session-internal.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/client/value.h"
 #include "kudu/client/write_op.h"
@@ -47,8 +50,12 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/schema.h"
+#include "kudu/common/txn_id.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -58,15 +65,19 @@
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/txn_participant-test-util.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
 DEFINE_int32(keyspace_size, 5,  "number of distinct primary keys to test with");
+DEFINE_int32(max_open_txns, 5,  "maximum number of open transactions to test with");
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(scanner_allow_snapshot_scans_with_logical_timestamps);
 DECLARE_bool(use_hybrid_clock);
@@ -88,11 +99,17 @@ using kudu::client::KuduWriteOperation;
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
+using kudu::tserver::ParticipantOpPB;
+using kudu::tserver::ParticipantResponsePB;
 using std::list;
 using std::map;
+using std::make_pair;
+using std::pair;
 using std::string;
 using std::vector;
 using std::unique_ptr;
+using std::unordered_map;
+using std::unordered_set;
 using strings::Substitute;
 
 namespace kudu {
@@ -120,6 +137,9 @@ enum TestOpType {
   TEST_RESTART_TS,
   TEST_SCAN_AT_TIMESTAMP,
   TEST_DIFF_SCAN,
+  TEST_BEGIN_TXN,
+  TEST_COMMIT_TXN,
+  TEST_ABORT_TXN,
   TEST_NUM_OP_TYPES // max value for enum
 };
 
@@ -143,13 +163,20 @@ const char* TestOpType_names[] = {
   "TEST_COMPACT_TABLET",
   "TEST_RESTART_TS",
   "TEST_SCAN_AT_TIMESTAMP",
-  "TEST_DIFF_SCAN"
+  "TEST_DIFF_SCAN",
+  "TEST_BEGIN_TXN",
+  "TEST_COMMIT_TXN",
+  "TEST_ABORT_TXN",
 };
+constexpr const int kNoTxnId = -1;
+// Identical to kNoTxnId but more generic-sounding for ops that don't use
+// transaction IDs.
+constexpr const int kNoVal = -1;
 
 // An operation in a fuzz-test sequence.
 struct TestOp {
   // NOLINTNEXTLINE(google-explicit-constructor)
-  TestOp(TestOpType t, int v1 = 0, int v2 = 0) // NOLINT(runtime/explicit)
+  TestOp(TestOpType t, int v1 = kNoVal, int v2 = kNoVal) // NOLINT(runtime/explicit)
       : type(t),
         val(v1),
         val2(v2) {}
@@ -160,16 +187,18 @@ struct TestOp {
   // For INSERT/UPSERT/UPDATE/DELETE, the key of the row to be modified.
   // For SCAN_AT_TIMESTAMP the timestamp of the scan.
   // For DIFF_SCAN the start timestamp of the scan.
+  // For BEGIN_TXN/COMMIT_TXN/ABORT_TXN, the txn ID to operate on.
+  // For FLUSH_OPS, the txn ID to operate on.
   // Otherwise, unused.
   int val;
 
+  // For INSERT, the transaction ID to insert with (kNoTxnId means none).
   // For DIFF_SCAN, the end timestamp of the scan.
   // Otherwise, unused.
   int val2;
 
   string ToString() const {
     switch (type) {
-      case TEST_FLUSH_OPS:
       case TEST_FLUSH_TABLET:
       case TEST_COMPACT_TABLET:
       case TEST_FLUSH_DELTAS:
@@ -177,10 +206,7 @@ struct TestOp {
       case TEST_MINOR_COMPACT_DELTAS:
       case TEST_RESTART_TS:
         return strings::Substitute("{$0}", TestOpType_names[type]);
-      case TEST_INSERT:
-      case TEST_INSERT_PK_ONLY:
-      case TEST_INSERT_IGNORE:
-      case TEST_INSERT_IGNORE_PK_ONLY:
+      case TEST_FLUSH_OPS:
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY:
       case TEST_UPDATE:
@@ -188,7 +214,14 @@ struct TestOp {
       case TEST_DELETE:
       case TEST_DELETE_IGNORE:
       case TEST_SCAN_AT_TIMESTAMP:
+      case TEST_BEGIN_TXN:
+      case TEST_COMMIT_TXN:
+      case TEST_ABORT_TXN:
         return strings::Substitute("{$0, $1}", TestOpType_names[type], val);
+      case TEST_INSERT:
+      case TEST_INSERT_PK_ONLY:
+      case TEST_INSERT_IGNORE:
+      case TEST_INSERT_IGNORE_PK_ONLY:
       case TEST_DIFF_SCAN:
         return strings::Substitute("{$0, $1, $2}", TestOpType_names[type], val, val2);
       default:
@@ -223,6 +256,8 @@ struct Redo {
   optional<int32_t> val;
 };
 
+// TODO(awong): Merging multiple transactional MRSs together can sometimes lead
+// to a crash. Uncomment the transactional ops once fixed.
 const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_INSERT_PK_ONLY,
                                   TEST_INSERT_IGNORE,
@@ -242,7 +277,14 @@ const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_RESTART_TS,
                                   TEST_SCAN_AT_TIMESTAMP,
                                   TEST_DIFF_SCAN};
-
+                                  // TEST_BEGIN_TXN,
+                                  // TEST_COMMIT_TXN,
+                                  // TEST_ABORT_TXN};
+
+// Ops that focus on hammering workloads in which rows come in and out of
+// existence.
+// TODO(awong): Merging multiple transactional MRSs together can sometimes lead
+// to a crash. Uncomment the transactional ops once fixed.
 const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
                                      TEST_INSERT_IGNORE_PK_ONLY,
                                      TEST_UPSERT_PK_ONLY,
@@ -257,6 +299,9 @@ const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
                                      TEST_RESTART_TS,
                                      TEST_SCAN_AT_TIMESTAMP,
                                      TEST_DIFF_SCAN};
+                                     // TEST_BEGIN_TXN,
+                                     // TEST_COMMIT_TXN,
+                                     // TEST_ABORT_TXN};
 
 // Test which does only random operations against a tablet, including update and random
 // get (ie scans with equal lower and upper bounds).
@@ -331,11 +376,23 @@ class FuzzTest : public KuduTest {
     return tablet_replica_->tablet();
   }
 
+  Status CallParticipantOpCheckResp(int64_t txn_id, ParticipantOpPB::ParticipantOpType op_type,
+                                    int64_t ts_val) {
+    RETURN_NOT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(
+        MonoDelta::FromSeconds(10)));
+    ParticipantResponsePB resp;
+    RETURN_NOT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op_type, ts_val, &resp));
+    if (resp.has_error()) {
+      return StatusFromPB(resp.error().status());
+    }
+    return Status::OK();
+  }
+
   // Adds an insert for the given key/value pair to 'ops', returning the new contents
   // of the row.
   ExpectedKeyValueRow InsertOrUpsertRow(int key, int val,
                                         optional<ExpectedKeyValueRow> old_row,
-                                        TestOpType type) {
+                                        TestOpType type, int txn_id) {
     ExpectedKeyValueRow ret;
     unique_ptr<KuduWriteOperation> op;
     if (type == TEST_INSERT || type == TEST_INSERT_PK_ONLY) {
@@ -375,7 +432,11 @@ class FuzzTest : public KuduTest {
       }
       default: LOG(FATAL) << "Invalid test op type: " << TestOpType_names[type];
     }
-    CHECK_OK(session_->Apply(op.release()));
+    if (txn_id == kNoTxnId) {
+      CHECK_OK(session_->Apply(op.release()));
+    } else {
+      CHECK_OK(FindOrDie(txn_sessions_, txn_id)->Apply(op.release()));
+    }
     return ret;
   }
 
@@ -392,7 +453,7 @@ class FuzzTest : public KuduTest {
     KuduPartialRow* row = op->mutable_row();
     CHECK_OK(row->SetInt32(0, key));
     ret.key = key;
-    if (new_val & 1) {
+    if (new_val % 2) {
       CHECK_OK(row->SetNull(1));
     } else {
       CHECK_OK(row->SetInt32(1, new_val));
@@ -680,7 +741,7 @@ class FuzzTest : public KuduTest {
   //
   // Useful when using the 'delta' test case reduction tool to allow
   // it to skip invalid test cases.
-  void ValidateFuzzCase(const vector<TestOp>& test_ops);
+  static void ValidateFuzzCase(const vector<TestOp>& test_ops);
   void RunFuzzCase(const vector<TestOp>& test_ops,
                    int update_multiplier);
 
@@ -688,6 +749,7 @@ class FuzzTest : public KuduTest {
   unique_ptr<InternalMiniCluster> cluster_;
   shared_ptr<KuduClient> client_;
   shared_ptr<KuduSession> session_;
+  unordered_map<int, shared_ptr<KuduSession>> txn_sessions_;
   shared_ptr<KuduTable> table_;
 
   map<int,
@@ -736,12 +798,76 @@ bool IsMutation(const TestOpType& op) {
   }
 }
 
-// Generate a random valid sequence of operations for use as a
-// fuzz test.
+// Generate a random valid sequence of operations for use as a fuzz test, i.e.
+// a set of operations that, when run, will not run into any logical errors
+// (e.g. no "key already present" or "key not found" errors).
+//
+// To generate this sequence, we schedule one op at a time, keeping track of
+// what rows exist in the tablet, what row mutations are pending, what rows are
+// being mutated by in-flight ops and transactions, etc. We only select ops
+// that we know to be valid, using the tracked state.
 void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
+  // Bitset indicating whether the given row has been committed as a part of a
+  // transaction, or successfully inserted outside of a transaction.
   vector<bool> exists(FLAGS_keyspace_size);
+
+  // The transaction ID that is operating on each row, or kNoTxnId if being
+  // operated on outside the context of a transaction. 'none' if the row isn't
+  // being operated on. If a row is not 'none', an entry must exist for it in
+  // 'pending_existence_per_txn'.
+  // TODO(awong): This is necessary so we don't have transactions trying to
+  // write the same rows, which currently is unprotected. Once we begin locking
+  // rows for the duration of the transaction, we shouldn't need this.
+  vector<optional<int>> txn_touching_row(FLAGS_keyspace_size);
+
+  // Represents the pending mutations to rows that are not yet visible to other
+  // actors, and the resulting existence status of the row if we were to flush
+  // a non-transactional session (keyed as kNoTxnId) or commit a transaction
+  // (keyed by any other value).
+  //
+  // This is used to update 'exists' when scheduling a non-transactional
+  // session flush or a transaction commit. It is also used to determine
+  // whether we can operate on pending values, e.g. insert a row and then
+  // delete it in the same batched op.
+  //
+  // A row key can exist in at most one IsPresentByRowKey in any form (i.e. row
+  // 1's existence cannot be to false in two transactions), ensuring only a
+  // single actor operates on a row at a time.
+  typedef std::map<int, bool> IsPresentByRowKey;
+  unordered_map<int, IsPresentByRowKey> pending_existence_per_txn;
+  EmplaceOrDie(&pending_existence_per_txn, kNoTxnId, IsPresentByRowKey{});
+
+  // The transactions that have client sessions that need to be flushed.
+  unordered_set<int> txns_needing_session_flush;
+
+  // Returns whether there are any open transactions.
+  const auto no_open_txns = [&pending_existence_per_txn] {
+    // A single entry exists for kNoTxnId.
+    return pending_existence_per_txn.size() == 1;
+  };
+  // Helper that deterministically (based on rand()) selects a transaction ID
+  // from those in flight, or, if 'maybe_none' is true, kNoTxnId to indicate a
+  // non-transactional operation.
+  const auto pick_txn_id = [&] (bool maybe_none) -> int {
+    if (no_open_txns() || (maybe_none && rand() % 2)) {
+      // Shouldn't be called when maybe_none is false if there are no pending
+      // transactions.
+      DCHECK(maybe_none);
+      return kNoTxnId;
+    }
+    const auto& num_txns = pending_existence_per_txn.size() - 1;
+    vector<int> txn_ids;
+    txn_ids.reserve(num_txns);
+    for (const auto& txn_id_and_rows : pending_existence_per_txn) {
+      if (txn_id_and_rows.first == kNoTxnId) continue;
+      txn_ids.emplace_back(txn_id_and_rows.first);
+    }
+    std::sort(txn_ids.begin(), txn_ids.end());
+    return txn_ids[rand() % num_txns];
+  };
+
+  int next_txn_id = 0;
   int op_timestamps = 0;
-  bool ops_pending = false;
   bool data_in_mrs = false;
   bool worth_compacting = false;
   bool data_in_dms = false;
@@ -758,87 +884,198 @@ void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
 
     switch (r) {
       case TEST_INSERT:
-      case TEST_INSERT_PK_ONLY:
-        if (exists[row_key]) continue;
-        ops->emplace_back(r, row_key);
-        exists[row_key] = true;
-        ops_pending = true;
-        data_in_mrs = true;
+      case TEST_INSERT_PK_ONLY: {
+        const auto& txn_id = pick_txn_id(/*maybe_none*/true);
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != txn_id) {
+          // The row is being operated on by another txn.
+          continue;
+        }
+        if (txn_operating_on_row && *txn_operating_on_row == txn_id &&
+            FindOrDie(FindOrDie(pending_existence_per_txn, txn_id), row_key)) {
+          // The row is being operated on by this txn and the row pending
+          // state exists.
+          continue;
+        }
+        if (!txn_operating_on_row && exists[row_key]) {
+          // The row is not being operated on by another txn, but the row
+          // exists.
+          continue;
+        }
+        ops->emplace_back(r, row_key, txn_id);
+        txn_touching_row[row_key] = txn_id;
+        FindOrDie(pending_existence_per_txn, txn_id)[row_key] = true;
+        EmplaceIfNotPresent(&txns_needing_session_flush, txn_id);
+        if (txn_id == kNoTxnId) {
+          data_in_mrs = true;
+        }
         break;
+      }
       case TEST_INSERT_IGNORE:
-      case TEST_INSERT_IGNORE_PK_ONLY:
-        ops->emplace_back(r, row_key);
-        ops_pending = true;
-        // If the row doesn't currently exist, this will act like an insert
-        // and put it into MRS.
-        if (!exists[row_key]) {
+      case TEST_INSERT_IGNORE_PK_ONLY: {
+        const auto& txn_id = pick_txn_id(/*maybe_none*/true);
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != txn_id) {
+          // The row is being operated on by another txn.
+          continue;
+        }
+        ops->emplace_back(r, row_key, txn_id);
+        txn_touching_row[row_key] = txn_id;
+        FindOrDie(pending_existence_per_txn, txn_id)[row_key] = true;
+        EmplaceIfNotPresent(&txns_needing_session_flush, txn_id);
+        if (txn_id == kNoTxnId && !exists[row_key]) {
           data_in_mrs = true;
         }
-        exists[row_key] = true;
         break;
+      }
       case TEST_UPSERT:
-      case TEST_UPSERT_PK_ONLY:
+      case TEST_UPSERT_PK_ONLY: {
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
+          // The row is being operated on by a txn.
+          continue;
+        }
         ops->emplace_back(r, row_key);
-        ops_pending = true;
-        // If the row doesn't currently exist, this will act like an insert
-        // and put it into MRS.
-        if (!exists[row_key]) {
+        txn_touching_row[row_key] = kNoTxnId;
+        auto& row_exists = FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key];
+        if (!row_exists) {
           data_in_mrs = true;
-        } else if (!data_in_mrs) {
-          // If it does exist, but not in MRS, then this will put data into
-          // a DMS.
+        } else {
           data_in_dms = true;
         }
-        exists[row_key] = true;
+        row_exists = true;
+        EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
         break;
-      case TEST_UPDATE:
-        if (!exists[row_key]) continue;
-        ops->emplace_back(r, row_key);
-        ops_pending = true;
-        if (!data_in_mrs) {
-          data_in_dms = true;
+      }
+      case TEST_UPDATE_IGNORE: {
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
+          // The row is being operated on by another txn.
+          continue;
         }
-        break;
-      case TEST_UPDATE_IGNORE:
-        ops->emplace_back(r, row_key);
-        ops_pending = true;
-        // If it does exist, this will act like an update and put it into
-        // a DMS.
-        if (exists[row_key] && !data_in_mrs) {
-          data_in_dms = true;
+        if (!txn_operating_on_row) {
+          if (exists[row_key] && !data_in_mrs) {
+            // The row is not being operated on by another txn, and it exists and
+            // has been flushed, meaning this op will result in a DMS mutation.
+            data_in_dms = true;
+          }
+          // The existence status shouldn't change.
+          FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = exists[row_key];
         }
+        ops->emplace_back(r, row_key);
+        txn_touching_row[row_key] = kNoTxnId;
+        EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
         break;
-      case TEST_DELETE:
-        if (!exists[row_key]) continue;
+      }
+      case TEST_UPDATE: {
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
+          // The row is being operated on by another txn.
+          continue;
+        }
+        if (txn_operating_on_row && *txn_operating_on_row == kNoTxnId &&
+            !FindOrDie(FindOrDie(pending_existence_per_txn, kNoTxnId), row_key)) {
+          // The row is being operated on by an in-flight op, but the pending
+          // row state doesn't exist.
+          continue;
+        }
+        if (!txn_operating_on_row) {
+          if (!exists[row_key]) {
+            // The row is not being operated on by another txn, but the row
+            // doesn't exist so we can't update anything.
+            continue;
+          }
+          if (!data_in_mrs) {
+            // The row is not being operated on by another txn, and it exists
+            // in a DRS, meaning this op will result in a DMS mutation.
+            data_in_dms = true;
+          }
+          // The existence status shouldn't change.
+          FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = exists[row_key];
+        }
         ops->emplace_back(r, row_key);
-        ops_pending = true;
-        if (!data_in_mrs) {
+        txn_touching_row[row_key] = kNoTxnId;
+        EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
+        break;
+      }
+      case TEST_DELETE_IGNORE: {
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
+          // The row is being operated on by another txn.
+          continue;
+        }
+        if (!txn_operating_on_row && exists[row_key] && !data_in_mrs) {
+          // The row is not being operated on by another txn, and it exists in
+          // a DRS, meaning this op will result in a DMS mutation.
           data_in_dms = true;
         }
-        exists[row_key] = false;
-        break;
-      case TEST_DELETE_IGNORE:
         ops->emplace_back(r, row_key);
-        ops_pending = true;
-        // If it does exist, this will act like a delete and put it into
-        // a DMS.
-        if (exists[row_key] && !data_in_mrs) {
+        FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = false;
+        txn_touching_row[row_key] = kNoTxnId;
+        EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
+        break;
+      }
+      case TEST_DELETE: {
+        const auto& txn_operating_on_row = txn_touching_row[row_key];
+        if (txn_operating_on_row && *txn_operating_on_row != kNoTxnId) {
+          // The row is being operated on by a txn. Since we don't support
+          // mutating a row while it is participating in a transaction, we must
+          // wait for transaction to complete before doing anything.
+          continue;
+        }
+        if (txn_operating_on_row && *txn_operating_on_row == kNoTxnId &&
+            !FindOrDie(FindOrDie(pending_existence_per_txn, kNoTxnId), row_key)) {
+          // The row is being operated on by a non-transactional in-flight op,
+          // meaning we can only correctly delete the row if the in-flight op
+          // were to insert the row, making the row's existence pending.
+          // Otherwise, we cannot schedule a delete.
+          continue;
+        }
+        if (!txn_operating_on_row && !exists[row_key]) {
+          // The row is not being operated on by another txn, but the row
+          // doesn't exist, so we cannot schedule a delete.
+          continue;
+        }
+        ops->emplace_back(TEST_DELETE, row_key);
+        FindOrDie(pending_existence_per_txn, kNoTxnId)[row_key] = false;
+        txn_touching_row[row_key] = kNoTxnId;
+        EmplaceIfNotPresent(&txns_needing_session_flush, kNoTxnId);
+        if (!data_in_mrs) {
+          // The row exists in a DRS, so this op will result in a DMS mutation.
           data_in_dms = true;
         }
-        exists[row_key] = false;
         break;
-      case TEST_FLUSH_OPS:
-        if (ops_pending) {
-          ops->emplace_back(TEST_FLUSH_OPS);
-          ops_pending = false;
-          op_timestamps++;
+      }
+      case TEST_FLUSH_OPS: {
+        const auto& txn_id = pick_txn_id(/*maybe_none*/true);
+        // If the picked transaction doesn't have any ops buffered in its
+        // session yet, pick a new action.
+        if (!ContainsKey(txns_needing_session_flush, txn_id)) continue;
+        if (txn_id == kNoTxnId) {
+          // If flushing rows that aren't part of any transaction, apply
+          // their state immediately.
+          auto& pending_existence_per_row = FindOrDie(pending_existence_per_txn, kNoTxnId);
+          for (const auto& key_and_exists : pending_existence_per_row) {
+            const auto& row_key = key_and_exists.first;
+            exists[row_key] = key_and_exists.second;
+            DCHECK_EQ(kNoTxnId, txn_touching_row[row_key]);
+            txn_touching_row[row_key] = boost::none;
+          }
+          pending_existence_per_row.clear();
         }
+        op_timestamps++;
+        txns_needing_session_flush.erase(txn_id);
+        ops->emplace_back(TEST_FLUSH_OPS, txn_id);
         break;
+      }
       case TEST_FLUSH_TABLET:
         if (data_in_mrs) {
-          if (ops_pending) {
+          // Non-transactions eagerly set 'data_in_mrs', expecting a session op
+          // to be scheduled alongside the tablet flush. Transactions don't do
+          // this -- they only set 'data_in_mrs' once committed.
+          if (ContainsKey(txns_needing_session_flush, kNoTxnId)) {
             ops->emplace_back(TEST_FLUSH_OPS);
-            ops_pending = false;
+            txns_needing_session_flush.erase(kNoTxnId);
           }
           ops->emplace_back(TEST_FLUSH_TABLET);
           data_in_mrs = false;
@@ -847,9 +1084,9 @@ void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
         break;
       case TEST_COMPACT_TABLET:
         if (worth_compacting) {
-          if (ops_pending) {
+          if (ContainsKey(txns_needing_session_flush, kNoTxnId)) {
             ops->emplace_back(TEST_FLUSH_OPS);
-            ops_pending = false;
+            txns_needing_session_flush.erase(kNoTxnId);
           }
           ops->emplace_back(TEST_COMPACT_TABLET);
           worth_compacting = false;
@@ -857,9 +1094,9 @@ void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
         break;
       case TEST_FLUSH_DELTAS:
         if (data_in_dms) {
-          if (ops_pending) {
+          if (ContainsKey(txns_needing_session_flush, kNoTxnId)) {
             ops->emplace_back(TEST_FLUSH_OPS);
-            ops_pending = false;
+            txns_needing_session_flush.erase(kNoTxnId);
           }
           ops->emplace_back(TEST_FLUSH_DELTAS);
           data_in_dms = false;
@@ -895,6 +1132,55 @@ void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
         ops->emplace_back(TEST_DIFF_SCAN, start_timestamp, end_timestamp);
         break;
       }
+      case TEST_BEGIN_TXN: {
+        // If we have --max_open_txns open transactions, we can't begin a new
+        // transaction. NOTE: 'pending_existence_per_txn' also includes
+        // kNoTxnId, hence the extra count.
+        if (pending_existence_per_txn.size() == 1 + FLAGS_max_open_txns) continue;
+        const auto txn_id = next_txn_id++;
+        ops->emplace_back(r, txn_id);
+        EmplaceOrDie(&pending_existence_per_txn, txn_id, IsPresentByRowKey{});
+        op_timestamps++;
+        break;
+      }
+      case TEST_COMMIT_TXN: {
+        if (no_open_txns()) continue;
+        const auto txn_id = pick_txn_id(/*maybe_none*/false);
+        DCHECK_NE(kNoTxnId, txn_id);
+        // If there are ops pending for this transaction, we need to flush them too.
+        if (ContainsKey(txns_needing_session_flush, txn_id)) {
+          op_timestamps++;
+          txns_needing_session_flush.erase(txn_id);
+          ops->emplace_back(TEST_FLUSH_OPS, txn_id);
+        }
+        ops->emplace_back(r, txn_id);
+        auto pending_existence = EraseKeyReturnValuePtr(&pending_existence_per_txn, txn_id);
+        for (const auto& key_and_exists : pending_existence) {
+          const auto& key = key_and_exists.first;
+          const auto& key_exists = key_and_exists.second;
+          DCHECK(key_exists); // only support inserts
+          // Since we're commiting the transaction, its MRS should hold state
+          // if there are any inserted rows.
+          data_in_mrs = true;
+          exists[key] = true;
+          txn_touching_row[key] = boost::none;
+        }
+        // Commit replicates two ops (BEGIN_COMMIT and FINALIZE_COMMIT).
+        op_timestamps += 2;
+        break;
+      }
+      case TEST_ABORT_TXN: {
+        if (no_open_txns()) continue;
+        const auto txn_id = pick_txn_id(/*maybe_none*/false);
+        DCHECK_NE(kNoTxnId, txn_id);
+        ops->emplace_back(r, txn_id);
+        auto pending_existence = EraseKeyReturnValuePtr(&pending_existence_per_txn, txn_id);
+        for (const auto& key_and_exists : pending_existence) {
+          txn_touching_row[key_and_exists.first] = boost::none;
+        }
+        op_timestamps++;
+        break;
+      }
       default:
         LOG(FATAL) << "Invalid op type: " << r;
     }
@@ -911,15 +1197,26 @@ string DumpTestCase(const vector<TestOp>& ops) {
 
 void FuzzTest::ValidateFuzzCase(const vector<TestOp>& test_ops) {
   vector<bool> exists(FLAGS_keyspace_size);
+  unordered_map<int, vector<std::pair<int, TestOpType>>> pending_rows_per_txn;
   for (const auto& test_op : test_ops) {
     switch (test_op.type) {
       case TEST_INSERT:
       case TEST_INSERT_PK_ONLY:
         CHECK(!exists[test_op.val]) << "invalid case: inserting already-existing row";
-        exists[test_op.val] = true;
-        break;
+        FALLTHROUGH_INTENDED;
       case TEST_INSERT_IGNORE:
-      case TEST_INSERT_IGNORE_PK_ONLY:
+      case TEST_INSERT_IGNORE_PK_ONLY: {
+        const auto& txn_id = test_op.val2;
+        if (txn_id == kNoTxnId) {
+          exists[test_op.val] = true;
+        } else {
+          auto& rows = FindOrDie(pending_rows_per_txn, txn_id);
+          rows.emplace_back(make_pair(test_op.val, test_op.type));
+        }
+        break;
+      }
+      // TODO(awong): UPSERT, UPDATE, and DELETE ops should account for
+      // 'pending_rows_per_txn' once we begin supporting transactions.
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY:
         exists[test_op.val] = true;
@@ -937,6 +1234,49 @@ void FuzzTest::ValidateFuzzCase(const vector<TestOp>& test_ops) {
       case TEST_DELETE_IGNORE:
         exists[test_op.val] = false;
         break;
+      case TEST_BEGIN_TXN:
+        EmplaceOrDie(&pending_rows_per_txn, test_op.val, vector<pair<int, TestOpType>>());
+        break;
+      case TEST_COMMIT_TXN: {
+        auto rows_and_ops = EraseKeyReturnValuePtr(&pending_rows_per_txn, test_op.val);
+        for (const auto& row_and_op : rows_and_ops) {
+          const auto& row = row_and_op.first;
+          const auto& op = row_and_op.second;
+          switch (op) {
+            case TEST_INSERT:
+            case TEST_INSERT_PK_ONLY:
+              CHECK(!exists[row]);
+              FALLTHROUGH_INTENDED;
+            case TEST_INSERT_IGNORE:
+            case TEST_INSERT_IGNORE_PK_ONLY:
+              exists[row] = true;
+              break;
+            default:
+              LOG(DFATAL) << "transactions only support insert operations";
+          }
+        }
+        break;
+      }
+      case TEST_ABORT_TXN: {
+        // Ensure that the rows this transaction was operating on were valid.
+        auto rows_and_ops = EraseKeyReturnValuePtr(&pending_rows_per_txn, test_op.val);
+        for (const auto& row_and_op : rows_and_ops) {
+          const auto& row = row_and_op.first;
+          const auto& op = row_and_op.second;
+          switch (op) {
+            case TEST_INSERT:
+            case TEST_INSERT_PK_ONLY:
+              CHECK(!exists[row]);
+              break;
+            case TEST_INSERT_IGNORE:
+            case TEST_INSERT_IGNORE_PK_ONLY:
+              break;
+            default:
+              LOG(DFATAL) << "transactions only support insert operations";
+          }
+        }
+        break;
+      }
       default:
         break;
     }
@@ -951,17 +1291,37 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
   // into a test method in order to reproduce a failure.
   LOG(INFO) << "test case:\n" << DumpTestCase(test_ops);
 
+  // Represent the expected state of the table if we were to flush ops.
   vector<optional<ExpectedKeyValueRow>> cur_val(FLAGS_keyspace_size);
-  vector<optional<ExpectedKeyValueRow>> pending_val(FLAGS_keyspace_size);
-  vector<Redo> pending_redos;
+
+  // The ops that are current pending in a session (not flushed yet).
+  typedef unordered_map<int, optional<ExpectedKeyValueRow>> ValueByRowKey;
+  unordered_map<int, ValueByRowKey> pending_vals_per_txn;
+  EmplaceOrDie(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{});
+
+  // We keep track of the redos too so it's easier to piece together the
+  // expected results of a diff scan.
+  unordered_map<int, vector<Redo>> pending_redos_per_txn;
+  EmplaceOrDie(&pending_redos_per_txn, kNoTxnId, vector<Redo>{});
+
+  // Returns the latest value for the given 'row_key' that is pending for the
+  // given transaction. If no mutations are pending for the 'row_key' in the
+  // given transaction, returns the latest committed value.
+  const auto pending_row_by_key_for_txn = [&] (int row_key, int txn_id) {
+    auto* pending_row_by_key = FindOrNull(pending_vals_per_txn, txn_id);
+    if (pending_row_by_key && ContainsKey(*pending_row_by_key, row_key)) {
+      return (*pending_row_by_key)[row_key];
+    }
+    return cur_val[row_key];
+  };
 
   int i = 0;
   for (const TestOp& test_op : test_ops) {
+    LOG(INFO) << "Running op " << test_op.ToString();
     if (IsMutation(test_op.type)) {
       EXPECT_EQ(cur_val[test_op.val], GetRow(test_op.val));
     }
 
-    LOG(INFO) << test_op.ToString();
     switch (test_op.type) {
       case TEST_INSERT:
       case TEST_INSERT_PK_ONLY:
@@ -969,69 +1329,90 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
       case TEST_INSERT_IGNORE_PK_ONLY:
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY: {
-        RedoType rtype = pending_val[test_op.val] ? UPDATE : INSERT;
-        pending_val[test_op.val] = InsertOrUpsertRow(
-            test_op.val, i++, pending_val[test_op.val], test_op.type);
+        const auto& row_key = test_op.val;
+        const auto& txn_id = test_op.val2;
+        const auto& old_row = pending_row_by_key_for_txn(row_key, txn_id);
+        RedoType rtype = old_row ? UPDATE : INSERT;
+        auto pending_row = InsertOrUpsertRow(
+            row_key, i++, old_row, test_op.type, txn_id);
 
-        // An insert ignore on a row that already exists will be dropped server-side.
-        // We must do the same.
-        if ((test_op.type == TEST_INSERT_IGNORE || test_op.type == TEST_INSERT_IGNORE_PK_ONLY) &&
-            rtype == UPDATE) {
-          break;
-        }
+        auto& pending_row_by_key = LookupOrEmplace(&pending_vals_per_txn, txn_id, ValueByRowKey{});
+        EmplaceOrUpdate(&pending_row_by_key, row_key, pending_row);
 
+        // An insert ignore on a row that already exists will be dropped server-side.
         // An "upsert PK-only" that is converted into an update will be dropped server-side.
         // We must do the same.
-        if (test_op.type == TEST_UPSERT_PK_ONLY && rtype == UPDATE) {
+        if ((test_op.type == TEST_INSERT_IGNORE ||
+             test_op.type == TEST_INSERT_IGNORE_PK_ONLY ||
+             test_op.type == TEST_UPSERT_PK_ONLY) &&
+            rtype == UPDATE) {
           break;
         }
 
-        pending_redos.emplace_back(rtype, test_op.val, pending_val[test_op.val]->val);
+        // There will actually be an effect on the server-side state so keep
+        // track of the change.
+        FindOrDie(pending_redos_per_txn, txn_id).emplace_back(rtype, row_key, pending_row.val);
         break;
       }
       case TEST_UPDATE:
       case TEST_UPDATE_IGNORE: {
-        // An update ignore on a row that doesn't exist will be dropped server-side.
-        // We must do the same.
-        if (test_op.type == TEST_UPDATE_IGNORE && !pending_val[test_op.val]) {
+        const auto& row_key = test_op.val;
+        if (test_op.type == TEST_UPDATE_IGNORE && !pending_row_by_key_for_txn(row_key, kNoTxnId)) {
           // Still call MutateRow to apply the UPDATE_IGNORE operations to the session.
           // However don't adjust the pending values given the operation will be ignored.
           for (int j = 0; j < update_multiplier; j++) {
-            MutateRow(test_op.val, i++, test_op.type);
+            MutateRow(row_key, i++, test_op.type);
           }
           break;
         }
-
+        ExpectedKeyValueRow latest_update;
         for (int j = 0; j < update_multiplier; j++) {
-          pending_val[test_op.val] = MutateRow(test_op.val, i++, test_op.type);
-          pending_redos.emplace_back(UPDATE, test_op.val, pending_val[test_op.val]->val);
+          latest_update = MutateRow(row_key, i++, test_op.type);
         }
+        FindOrDie(pending_redos_per_txn, kNoTxnId).emplace_back(UPDATE, row_key, latest_update.val);
+        auto& pending_row_by_key =
+            LookupOrEmplace(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{});
+        EmplaceOrUpdate(&pending_row_by_key, row_key, latest_update);
         break;
       }
       case TEST_DELETE:
       case TEST_DELETE_IGNORE: {
-        // A delete ignore on a row that doesn't exist will be dropped server-side.
-        // We must do the same.
-        if (test_op.type == TEST_DELETE_IGNORE && !pending_val[test_op.val]) {
+        const auto& row_key = test_op.val;
+        DeleteRow(test_op.val, test_op.type);
+        if (test_op.type == TEST_DELETE_IGNORE && !pending_row_by_key_for_txn(row_key, kNoTxnId)) {
           // Still call DeleteRow to apply the DELETE_IGNORE operation to the session.
           // However don't adjust the pending values given the operation will be ignored.
-          DeleteRow(test_op.val, test_op.type);
           break;
         }
-
-        pending_val[test_op.val] = DeleteRow(test_op.val, test_op.type);
-        pending_redos.emplace_back(DELETE, test_op.val, boost::none);
+        FindOrDie(pending_redos_per_txn, kNoTxnId).emplace_back(DELETE, row_key, boost::none);
+        auto& pending_row_by_key =
+            LookupOrEmplace(&pending_vals_per_txn, kNoTxnId, ValueByRowKey{});
+        EmplaceOrUpdate(&pending_row_by_key, row_key, boost::none);
         break;
       }
       case TEST_FLUSH_OPS: {
-        FlushSessionOrDie(session_);
-        cur_val = pending_val;
-        int current_time = down_cast<kudu::clock::LogicalClock*>(
-            tablet()->clock())->GetCurrentTime();
-        VLOG(1) << "Current time: " << current_time;
-        saved_values_[current_time] = cur_val;
-        saved_redos_[current_time] = pending_redos;
-        pending_redos.clear();
+        const auto& txn_id = test_op.val;
+        auto session = txn_id == kNoTxnId ? session_ : FindOrDie(txn_sessions_, txn_id);
+        FlushSessionOrDie(session);
+        // Only update the saved and pending values if the flush is _not_ part
+        // of a transaction. Transactional mutations should only take effect
+        // once committed.
+        if (txn_id == kNoTxnId) {
+          int current_time = down_cast<kudu::clock::LogicalClock*>(
+              tablet()->clock())->GetCurrentTime();
+          VLOG(1) << "Current time: " << current_time;
+
+          auto& pending_vals_no_txn = FindOrDie(pending_vals_per_txn, kNoTxnId);
+          for (const auto& kv : pending_vals_no_txn) {
+            cur_val[kv.first] = kv.second;
+          }
+          pending_vals_no_txn.clear();
+          saved_values_[current_time] = cur_val;
+
+          auto& pending_redos_no_txn = FindOrDie(pending_redos_per_txn, kNoTxnId);
+          saved_redos_[current_time] = pending_redos_no_txn;
+          pending_redos_no_txn.clear();
+        }
         break;
       }
       case TEST_FLUSH_TABLET:
@@ -1058,6 +1439,47 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
       case TEST_DIFF_SCAN:
         NO_FATALS(CheckDiffScan(test_op.val, test_op.val2));
         break;
+      case TEST_BEGIN_TXN: {
+        const auto& txn_id = test_op.val;
+        shared_ptr<KuduSession> s(new KuduSession(client_, TxnId(txn_id)));
+        s->data_->Init(s);
+        ASSERT_OK(s->SetFlushMode(KuduSession::MANUAL_FLUSH));
+        s->SetTimeoutMillis(60 * 1000);
+        EmplaceOrDie(&txn_sessions_, txn_id, std::move(s));
+        EmplaceOrDie(&pending_vals_per_txn, txn_id, ValueByRowKey{});
+        EmplaceOrDie(&pending_redos_per_txn, txn_id, vector<Redo>{});
+        ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::BEGIN_TXN, -1));
+        break;
+      }
+      case TEST_COMMIT_TXN: {
+        const auto& txn_id = test_op.val;
+        // Before committing, flush all the rows we have pending for this transaction.
+        FlushSessionOrDie(FindOrDie(txn_sessions_, txn_id));
+
+        ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::BEGIN_COMMIT, -1));
+        int current_time = down_cast<kudu::clock::LogicalClock*>(
+            tablet()->clock())->GetCurrentTime();
+        ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::FINALIZE_COMMIT,
+                                             current_time));
+        VLOG(1) << "Current time: " << current_time;
+        auto txn_pending_vals = EraseKeyReturnValuePtr(&pending_vals_per_txn, txn_id);
+        for (const auto& kv : txn_pending_vals) {
+          cur_val[kv.first] = kv.second;
+        }
+        saved_values_[current_time] = cur_val;
+
+        auto txn_pending_redos = EraseKeyReturnValuePtr(&pending_redos_per_txn, txn_id);
+        saved_redos_[current_time] = txn_pending_redos;
+
+        txn_sessions_.erase(txn_id);
+        break;
+      }
+      case TEST_ABORT_TXN: {
+        const auto& txn_id = test_op.val;
+        ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::ABORT_TXN, -1));
+        txn_sessions_.erase(txn_id);
+        break;
+      }
       default:
         LOG(FATAL) << test_op.type;
     }
@@ -1379,6 +1801,48 @@ TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanDRS) {
       {TEST_DIFF_SCAN, 4, 7}
     });
 }
+TEST_F(FuzzTest, TestReplayDeletesOnTxnRowsets) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+      {TEST_INSERT_PK_ONLY, 1, -1},
+      {TEST_FLUSH_OPS, -1},
+      {TEST_FLUSH_TABLET},
+
+      {TEST_BEGIN_TXN, 2},
+      {TEST_INSERT_IGNORE_PK_ONLY, 0, 2},
+      {TEST_FLUSH_OPS, 2},
+      {TEST_COMMIT_TXN, 2},
+
+      {TEST_DELETE, 0},
+      {TEST_DELETE, 1},
+      {TEST_FLUSH_OPS, -1},
+
+      {TEST_FLUSH_DELTAS},
+      {TEST_RESTART_TS},
+    });
+}
+
+TEST_F(FuzzTest, TestFlushMRSsWithInvisibleRows) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+    {TEST_BEGIN_TXN, 0},
+    {TEST_INSERT_IGNORE, 1, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_COMMIT_TXN, 0},
+
+    {TEST_INSERT_PK_ONLY, 0, -1},
+    {TEST_INSERT_IGNORE_PK_ONLY, 0, -1},
+    {TEST_DELETE, 0},
+    {TEST_FLUSH_OPS, -1},
+
+    {TEST_RESTART_TS},
+    {TEST_MAJOR_COMPACT_DELTAS},
+
+    {TEST_DELETE, 1},
+    {TEST_FLUSH_OPS, -1},
+    {TEST_FLUSH_TABLET},
+  });
+}
 
 // Regression test for KUDU-3108. Previously caused us to have divergent 'hot'
 // and 'hotmaxes' containers in the merge iterator, causing us to read invalid