You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/04/14 19:01:51 UTC

[kudu] 01/02: KUDU-2612: acquire and release partition lock

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 8093f95e6b8eb496043562f69df3a66e078ad731
Author: hahao <ha...@apache.org>
AuthorDate: Sun Mar 7 23:59:59 2021 -0800

    KUDU-2612: acquire and release partition lock
    
    This patch plumbs partition locks into write ops for transactional and
    non-transactional operations. Upon attempting to write, we try to
    acquire the partition lock in the WriteOp prepare phase, and upon
    successfully applying the op, we transfer ownership of the lock to the
    Txn that the write was a part of (or just release the partition lock if
    the write was non-transactional). Txns release the lock when
    FINALIZE_COMMIT or ABORT_TXN is applied.
    
    We take the partition lock for non-transactional write ops as well to
    ensure we don’t duplicate keys (e.g. if a transaction inserts key=1 and
    then a non-transactional write inserts key=1 before the transaction
    commits). If the partition lock cannot be acquired, the write op is
    aborted or retried, based on the wait-die mechanics already in the lock
    manager.
    
    A flag is also introduced to disable this locking for tests that
    currently expect support for concurrent transactions.
    
    Change-Id: If26733cae16810f3b3afd1fd05dcb984e6366939
    Reviewed-on: http://gerrit.cloudera.org:8080/17159
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/client/batcher.cc                         |  12 +-
 src/kudu/integration-tests/fuzz-itest.cc           |  10 +
 src/kudu/integration-tests/txn_commit-itest.cc     |  13 ++
 .../integration-tests/txn_participant-itest.cc     |  62 ++++++
 src/kudu/integration-tests/txn_write_ops-itest.cc  | 216 +++++++++++++++++++++
 src/kudu/tablet/lock_manager.h                     |   5 +
 src/kudu/tablet/ops/participant_op.cc              |  10 +
 src/kudu/tablet/ops/write_op.cc                    |  62 +++++-
 src/kudu/tablet/ops/write_op.h                     |  20 +-
 src/kudu/tablet/tablet.cc                          |  10 +-
 src/kudu/tablet/tablet.h                           |  14 +-
 src/kudu/tablet/tablet_bootstrap.cc                |  15 +-
 src/kudu/tablet/txn_participant-test.cc            | 126 +++++++++++-
 src/kudu/tablet/txn_participant.cc                 |  24 ++-
 src/kudu/tablet/txn_participant.h                  |  15 ++
 src/kudu/transactions/participant_rpc.cc           |   2 +
 16 files changed, 600 insertions(+), 16 deletions(-)

diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index dc26aed..f48a5c8 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -470,7 +470,9 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
     }
   }
 
-  if (result.status.IsServiceUnavailable()) {
+  if (result.status.IsServiceUnavailable() ||
+      (resp_.has_error() &&
+       resp_.error().code() == tserver::TabletServerErrorPB::TXN_LOCKED_RETRY_OP)) {
     result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
     return result;
   }
@@ -507,7 +509,8 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
   }
 
   if (resp_.has_error() &&
-      resp_.error().code() == tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE) {
+      (resp_.error().code() == tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE ||
+       resp_.error().code() == tserver::TabletServerErrorPB::TXN_LOCKED_ABORT)) {
     result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
     return result;
   }
@@ -526,8 +529,9 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
     //                becomes a real issue when handling responses to write
     //                operations in the context of multi-row transactions.
     //                For example, Status::IllegalState() originated from
-    //                TabletServerErrorPB::TXN_ILLEGAL_STATE responses are
-    //                needlessly retried.
+    //                TabletServerErrorPB::TXN_ILLEGAL_STATE response and
+    //                Status::Abort() originated from TabletServerErrorPB::TXN_LOCKED_ABORT
+    //                response are needlessly retried.
     result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
     return result;
   }
diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index 9a5b0c1..579c660 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -78,6 +78,7 @@
 
 DEFINE_int32(keyspace_size, 5,  "number of distinct primary keys to test with");
 DEFINE_int32(max_open_txns, 5,  "maximum number of open transactions to test with");
+DECLARE_bool(enable_txn_partition_lock);
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(scanner_allow_snapshot_scans_with_logical_timestamps);
 DECLARE_bool(tserver_txn_write_op_handling_enabled);
@@ -1495,6 +1496,9 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
 // The logs of this test are designed to easily be copy-pasted and create
 // more specific test cases like TestFuzz<N> below.
 TEST_F(FuzzTest, TestRandomFuzzPksOnly) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   CreateTabletAndStartClusterWithSchema(Schema({ColumnSchema("key", INT32)}, 1));
   SeedRandom();
   vector<TestOp> test_ops;
@@ -1506,6 +1510,9 @@ TEST_F(FuzzTest, TestRandomFuzzPksOnly) {
 // The logs of this test are designed to easily be copy-pasted and create
 // more specific test cases like TestFuzz<N> below.
 TEST_F(FuzzTest, TestRandomFuzz) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
   SeedRandom();
   vector<TestOp> test_ops;
@@ -1517,6 +1524,9 @@ TEST_F(FuzzTest, TestRandomFuzz) {
 // This results in very large batches which are likely to span multiple delta blocks
 // when flushed.
 TEST_F(FuzzTest, TestRandomFuzzHugeBatches) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
   SeedRandom();
   vector<TestOp> test_ops;
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc b/src/kudu/integration-tests/txn_commit-itest.cc
index 4ddc4c0..de7d6fc 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -67,6 +67,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_bool(enable_txn_partition_lock);
 DECLARE_bool(txn_manager_enabled);
 DECLARE_bool(txn_manager_lazily_initialized);
 DECLARE_bool(txn_schedule_background_tasks);
@@ -684,6 +685,9 @@ TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) {
 
 // Try concurrently beginning to commit a bunch of different transactions.
 TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   constexpr const int kNumTxns = 4;
   vector<shared_ptr<KuduTransaction>> txns(kNumTxns);
   int row_start = initial_row_count_;
@@ -724,6 +728,9 @@ TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
 }
 
 TEST_F(TxnCommitITest, TestConcurrentAbortsAndCommits) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   constexpr const int kNumTxns = 10;
   vector<shared_ptr<KuduTransaction>> txns(kNumTxns);
   int row_start = initial_row_count_;
@@ -940,6 +947,9 @@ TEST_F(TwoNodeTxnCommitITest, TestCommitWhenParticipantsAreDown) {
 // Test that when we start up, pending commits will start background tasks to
 // finalize the commit or abort.
 TEST_F(TwoNodeTxnCommitITest, TestStartTasksDuringStartup) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   shared_ptr<KuduTransaction> committed_txn;
   {
     shared_ptr<KuduSession> txn_session;
@@ -1038,6 +1048,9 @@ class ThreeNodeTxnCommitITest : public TxnCommitITest {
 };
 
 TEST_F(ThreeNodeTxnCommitITest, TestCommitTasksReloadOnLeadershipChange) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   FLAGS_txn_schedule_background_tasks = false;
   shared_ptr<KuduTransaction> committed_txn;
   shared_ptr<KuduTransaction> aborted_txn;
diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc
index 241b5e9..46f6bc9 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -72,6 +72,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_bool(enable_txn_partition_lock);
 DECLARE_bool(raft_enable_pre_election);
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_int32(consensus_inject_latency_ms_in_notifications);
@@ -830,6 +831,32 @@ TEST_F(TxnParticipantITest, TestProxyTabletNotFound) {
   }
 }
 
+// Test that we can start multiple transactions on the same participant.
+TEST_F(TxnParticipantITest, TestTxnSystemClientBeginTxnDoesntLock) {
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kFirstTxn = 0;
+  constexpr const int kSecondTxn = 1;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  auto* leader_replica = replicas[kLeaderIdx];
+  const auto tablet_id = leader_replica->tablet_id();
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+
+  // Start a transaction and make sure it results in the expected state
+  // server-side.
+  unique_ptr<TxnSystemClient> txn_client;
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client));
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kFirstTxn, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout));
+  NO_FATALS(CheckReplicasMatchTxns(replicas, { { kFirstTxn, kOpen, -1 } }));
+
+  // Begin another transaction with a lower txn ID. This is allowed, since
+  // partition locks are only taken once we write.
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kSecondTxn, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout));
+  NO_FATALS(CheckReplicasMatchTxns(replicas,
+        { { kFirstTxn, kOpen, -1 }, { kSecondTxn, kOpen, -1 } }));
+}
+
 TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
@@ -899,6 +926,9 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) {
 }
 
 TEST_F(TxnParticipantITest, TestTxnSystemClientAbortSequence) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnOne = 0;
   constexpr const int kTxnTwo = 1;
@@ -976,6 +1006,38 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientErrorWhenNotBegun) {
   NO_FATALS(CheckReplicasMatchTxns(replicas, { { 2, kAborted, -1 } }));
 }
 
+TEST_F(TxnParticipantITest, TestTxnSystemClientRepeatCalls) {
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kTxnOne = 1;
+  constexpr const int kTxnTwo = 2;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  auto* leader_replica = replicas[kLeaderIdx];
+  const auto tablet_id = leader_replica->tablet_id();
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  unique_ptr<TxnSystemClient> txn_client;
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client));
+  // Repeat each op twice. There should be no issues here since each op is
+  // idempotent. There should also be no issues with the partition lock.
+  for (const auto& type : kCommitSequence) {
+    ASSERT_OK(txn_client->ParticipateInTransaction(
+        tablet_id, MakeParticipantOp(kTxnOne, type, kDummyCommitTimestamp),
+        kDefaultTimeout));
+    ASSERT_OK(txn_client->ParticipateInTransaction(
+        tablet_id, MakeParticipantOp(kTxnOne, type, kDummyCommitTimestamp),
+        kDefaultTimeout));
+  }
+  for (const auto& type : kAbortSequence) {
+    ASSERT_OK(txn_client->ParticipateInTransaction(
+        tablet_id, MakeParticipantOp(kTxnTwo, type, kDummyCommitTimestamp),
+        kDefaultTimeout));
+    ASSERT_OK(txn_client->ParticipateInTransaction(
+        tablet_id, MakeParticipantOp(kTxnTwo, type, kDummyCommitTimestamp),
+        kDefaultTimeout));
+  }
+  NO_FATALS(CheckReplicasMatchTxns(
+      replicas, { { kTxnOne, kCommitted, kDummyCommitTimestamp }, { kTxnTwo, kAborted, -1 } }));
+}
+
 TEST_F(TxnParticipantITest, TestTxnSystemClientTimeoutWhenNoMajority) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index 2a37aa4..c597acb 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -22,12 +22,14 @@
 #include <cstdlib>
 #include <deque>
 #include <functional>
+#include <initializer_list>
 #include <iterator>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <numeric>
 #include <ostream>
+#include <random>
 #include <set>
 #include <string>
 #include <thread>
@@ -293,6 +295,87 @@ class TxnWriteOpsITest : public ExternalMiniClusterITestBase {
   string tablet_uuid_;
 };
 
+// Test that our deadlock prevention mechanisms work by writing across
+// different tablets concurrently from multiple transactions.
+// TODO(awong): it'd be much more convenient to take control of aborting the
+// transactions ourselves, rather than relying on the application user.
+TEST_F(TxnWriteOpsITest, TestClientSideDeadlockPrevention) {
+  constexpr const int kNumTxns = 8;
+  const vector<string> master_flags = {
+    "--txn_manager_enabled=true",
+
+    // Scenarios based on this test fixture assume the txn status table
+    // is created at start, not on first transaction-related operation.
+    "--txn_manager_lazily_initialized=false",
+  };
+  NO_FATALS(StartCluster({}, master_flags, kNumTabletServers));
+  vector<string> tablets_uuids;
+  NO_FATALS(Prepare(&tablets_uuids));
+  vector<thread> threads;
+  threads.reserve(kNumTxns);
+  vector<int> random_keys(kNumTxns * 2);
+  std::iota(random_keys.begin(), random_keys.end(), 1);
+  std::mt19937 gen(SeedRandom());
+  std::shuffle(random_keys.begin(), random_keys.end(), gen);
+  for (int i = 0; i < kNumTxns; i++) {
+    threads.emplace_back([&, i] {
+      bool succeeded = false;
+      while (!succeeded) {
+        shared_ptr<KuduTransaction> txn;
+        ASSERT_OK(client_->NewTransaction(&txn));
+        shared_ptr<KuduSession> session;
+        ASSERT_OK(txn->CreateSession(&session));
+        ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+        string txn_str;
+        ASSERT_OK(txn->Serialize(&txn_str));
+        TxnTokenPB token;
+        ASSERT_TRUE(token.ParseFromString(txn_str));
+        bool needs_retry = false;
+        for (const auto key_idx : { 2 * i, 2 * i + 1 }) {
+          const auto& row_key = random_keys[key_idx];
+          unique_ptr<KuduInsert> insert(BuildInsert(table_.get(), row_key));
+          Status s = session->Apply(insert.release());
+          LOG(INFO) << Substitute("Txn $0 wrote row $1: $2",
+                                  token.txn_id(), row_key, s.ToString());
+          // If the write op failed because of a locking error, roll the
+          // transaction back and retry the transaction after waiting a bit.
+          if (!s.ok()) {
+            vector<KuduError*> errors;
+            ElementDeleter d(&errors);
+            bool overflow;
+            session->GetPendingErrors(&errors, &overflow);
+            ASSERT_EQ(1, errors.size());
+            const auto& error = errors[0]->status();
+            LOG(INFO) << Substitute("Txn $0 wrote row $1: $2",
+                                    token.txn_id(), row_key, error.ToString());
+            // While the below delay between retries should help prevent
+            // deadlocks, it's possible that "waiting" write ops (i.e. "wait"
+            // in wait-die, that get retried) will still time out, after
+            // contending a bit with other ops.
+            ASSERT_TRUE(error.IsAborted() || error.IsTimedOut()) << error.ToString();
+            ASSERT_OK(txn->Rollback());
+            needs_retry = true;
+
+            // Wait a bit before retrying the entire transaction to allow for
+            // the current lock holder to complete.
+            SleepFor(MonoDelta::FromSeconds(5));
+            break;
+          }
+        }
+        if (!needs_retry) {
+          succeeded = true;
+          ASSERT_OK(txn->Commit(/*wait*/true));
+        }
+      }
+    });
+  }
+  for (auto& t : threads) { t.join(); }
+  size_t count;
+  ASSERT_OK(CountRows(table_.get(), &count));
+  ASSERT_EQ(kNumTxns * 2, count);
+}
+
 // Send multiple one-row write operations to a tablet server in the context of a
 // multi-row transaction, and commit the transaction. This scenario verifies
 // that tablet servers are able to accept high number of write requests
@@ -343,6 +426,10 @@ TEST_F(TxnWriteOpsITest, FrequentElections) {
     // Custom settings for heartbeat interval helps to complete Raft elections
     // rounds faster than with the default settings.
     Substitute("--heartbeat_interval_ms=$0", hb_interval_ms_),
+
+    // Disable the partition lock as there are concurrent transactions.
+    // TODO(awong): update this when implementing finer grained locking.
+    "--enable_txn_partition_lock=false"
   };
   const vector<string> master_flags = {
     // Enable TxnManager in Kudu masters.
@@ -458,6 +545,10 @@ TEST_F(TxnWriteOpsITest, WriteOpPerf) {
   const vector<string> ts_flags = {
     Substitute("--tablet_max_pending_txn_write_ops=$0",
                FLAGS_max_pending_txn_write_ops),
+
+    // Disable the partition lock as there are concurrent transactions.
+    // TODO(awong): update this when implementing finer grained locking.
+    "--enable_txn_partition_lock=false"
   };
   const vector<string> master_flags = {
     // Enable TxnManager in Kudu masters.
@@ -972,6 +1063,131 @@ TEST_F(TxnOpDispatcherITest, LifecycleBasic) {
   }
 }
 
+TEST_F(TxnOpDispatcherITest, BeginTxnLockAbort) {
+  NO_FATALS(Prepare(1));
+
+  // Next value for the primary key column in the test table.
+  int64_t key = 0;
+  vector<scoped_refptr<TabletReplica>> replicas = GetAllReplicas();
+  ASSERT_EQ(kNumPartitions, replicas.size());
+  shared_ptr<KuduTransaction> first_txn;
+  shared_ptr<KuduTransaction> second_txn;
+
+  // Start a single transaction and perform some writes with it.
+  {
+    ASSERT_OK(client_->NewTransaction(&first_txn));
+
+    // There should be no TxnOpDispatchers yet because not a single write
+    // operations has been sent to tablet servers yet.
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+    // Insert a single row.
+    ASSERT_OK(InsertRows(first_txn.get(), 1, &key));
+
+    // Only one tablet replica should get the txn write request and register
+    // TxnOpDispatcher for the transaction.
+    ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+
+    // Write some more rows ensuring all hash buckets of the table's partition
+    // will get at least one element.
+    ASSERT_OK(InsertRows(first_txn.get(), 5, &key));
+    ASSERT_EQ(kNumPartitions, GetTxnOpDispatchersTotalCount());
+
+    // Non transactional operations should fail as the partition lock
+    // is held by the transaction at the moment.
+    shared_ptr<KuduSession> session;
+    Status s = InsertRows(nullptr /* txn */, 1, &key, &session);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    auto row_status = GetSingleRowError(session.get());
+    ASSERT_TRUE(row_status.IsAborted()) << row_status.ToString();
+    ASSERT_STR_CONTAINS(row_status.ToString(),
+                        "Write op should be aborted");
+  }
+
+  // Start a new transaction.
+  {
+    ASSERT_OK(client_->NewTransaction(&second_txn));
+    ASSERT_EQ(kNumPartitions, GetTxnOpDispatchersTotalCount());
+
+    // Operations of the transaction should fail as the partition
+    // lock is held by the transaction at the moment.
+    shared_ptr<KuduSession> session;
+    Status s = InsertRows(second_txn.get(), 1, &key, &session);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    auto row_status = GetSingleRowError(session.get());
+    ASSERT_TRUE(row_status.IsAborted()) << row_status.ToString();
+    ASSERT_STR_CONTAINS(row_status.ToString(), "should be aborted");
+
+    // We should have an extra dispatcher for the new transactional write.
+    ASSERT_EQ(1 + kNumPartitions, GetTxnOpDispatchersTotalCount());
+  }
+  {
+    // Now, commit the first transaction.
+    ASSERT_OK(first_txn->Commit());
+
+    // All dispatchers should be unregistered once the transaction is committed.
+    ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+
+    // Writes to the second transaction should now succeed.
+    ASSERT_OK(InsertRows(second_txn.get(), 1, &key));
+    ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+
+    ASSERT_OK(second_txn->Commit());
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+  }
+}
+
+TEST_F(TxnOpDispatcherITest, BeginTxnLockRetry) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  NO_FATALS(Prepare(1));
+
+  // Next value for the primary key column in the test table.
+  int64_t key = 0;
+
+  vector<scoped_refptr<TabletReplica>> replicas = GetAllReplicas();
+  ASSERT_EQ(kNumPartitions, replicas.size());
+  shared_ptr<KuduTransaction> first_txn;
+  shared_ptr<KuduTransaction> second_txn;
+
+  // Start a single transaction.
+  {
+    ASSERT_OK(client_->NewTransaction(&second_txn));
+
+    // There should be no TxnOpDispatchers yet because not a single write
+    // operations has been sent to tablet servers yet.
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+  }
+
+  // Start another single transaction and perform some writes with it.
+  {
+    ASSERT_OK(client_->NewTransaction(&first_txn));
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+    // Write some more rows ensuring all hash buckets of the table's partition
+    // will get at least one element.
+    ASSERT_OK(InsertRows(first_txn.get(), 5, &key));
+    ASSERT_EQ(kNumPartitions, GetTxnOpDispatchersTotalCount());
+  }
+
+  {
+    // Operations of the second transaction should fail as the partition
+    // lock is held by the first transaction at the moment.
+    shared_ptr<KuduSession> session;
+    Status s = InsertRows(second_txn.get(), 1, &key, &session);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    auto row_status = GetSingleRowError(session.get());
+    ASSERT_TRUE(row_status.IsTimedOut()) << row_status.ToString();
+    ASSERT_STR_CONTAINS(row_status.ToString(), "passed its deadline");
+
+    // We should have an extra dispatcher for the new transactional write.
+    ASSERT_EQ(1 + kNumPartitions, GetTxnOpDispatchersTotalCount());
+
+    ASSERT_OK(first_txn->Commit());
+    // We should still have an op dispatcher for the second transaction.
+    ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+  }
+}
+
 // A scenario to verify TxnOpDispatcher lifecycle when there is an error
 // while trying to register a tablet as a participant in a transaction.
 TEST_F(TxnOpDispatcherITest, ErrorInParticipantRegistration) {
diff --git a/src/kudu/tablet/lock_manager.h b/src/kudu/tablet/lock_manager.h
index e7e6a75..4069939 100644
--- a/src/kudu/tablet/lock_manager.h
+++ b/src/kudu/tablet/lock_manager.h
@@ -218,6 +218,11 @@ class ScopedPartitionLock {
   // Disable the copy constructor.
   ScopedPartitionLock(const ScopedPartitionLock&) = delete;
 
+  // Returns true if this points at the same lock state as 'other'.
+  bool HasSameState(const ScopedPartitionLock& other) {
+    return lock_state_ == other.lock_state_;
+  }
+
   // Check whether the partition lock is acquired by the transaction.
   // If false, set the tablet server error code accordingly to abort
   // or retry the transaction. Otherwise, no error code is set.
diff --git a/src/kudu/tablet/ops/participant_op.cc b/src/kudu/tablet/ops/participant_op.cc
index 9d79a8f..a5ed7eb 100644
--- a/src/kudu/tablet/ops/participant_op.cc
+++ b/src/kudu/tablet/ops/participant_op.cc
@@ -20,6 +20,7 @@
 #include <memory>
 #include <ostream>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <google/protobuf/arena.h>
 
@@ -29,6 +30,7 @@
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/consensus/time_manager.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/rpc_header.pb.h"
@@ -39,10 +41,16 @@
 #include "kudu/tablet/txn_participant.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/debug/trace_event.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/trace.h"
 
+DEFINE_bool(enable_txn_partition_lock, true,
+            "Whether or not to enable partition lock for transactions");
+TAG_FLAG(enable_txn_partition_lock, unsafe);
+TAG_FLAG(enable_txn_partition_lock, hidden);
+
 using kudu::consensus::CommitMsg;
 using kudu::consensus::ReplicateMsg;
 using kudu::consensus::OperationType;
@@ -231,6 +239,7 @@ Status ParticipantOpState::PerformOp(const consensus::OpId& op_id, Tablet* table
       if (txn_->commit_op()) {
         txn_->commit_op()->FinishApplying();
       }
+      txn_->ReleasePartitionLock();
       break;
     }
     case ParticipantOpPB::ABORT_TXN: {
@@ -240,6 +249,7 @@ Status ParticipantOpState::PerformOp(const consensus::OpId& op_id, Tablet* table
       if (txn_->commit_op()) {
         txn_->commit_op()->Abort();
       }
+      txn_->ReleasePartitionLock();
       break;
     }
     case ParticipantOpPB::UNKNOWN: {
diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc
index 7f8bc92..83097f9 100644
--- a/src/kudu/tablet/ops/write_op.cc
+++ b/src/kudu/tablet/ops/write_op.cc
@@ -36,6 +36,7 @@
 #include "kudu/common/row_operations.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
+#include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/opid.pb.h"
@@ -70,6 +71,8 @@ DEFINE_int32(tablet_inject_latency_on_apply_write_op_ms, 0,
 TAG_FLAG(tablet_inject_latency_on_apply_write_op_ms, unsafe);
 TAG_FLAG(tablet_inject_latency_on_apply_write_op_ms, runtime);
 
+DECLARE_bool(enable_txn_partition_lock);
+
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -211,7 +214,14 @@ Status WriteOp::Prepare() {
     }
   }
 
-  // Now acquire row locks and prepare everything for apply
+  // Now first acquire partition lock and then row locks, and prepare
+  // everything for apply. For followers, we wait until the partition lock is
+  // held, since we know the op will not be replicated if the leader cannot
+  // take the partition lock.
+  if (PREDICT_TRUE(FLAGS_enable_txn_partition_lock)) {
+    RETURN_NOT_OK(tablet->AcquirePartitionLock(state(),
+        type() == consensus::LEADER ? LockManager::TRY_LOCK : LockManager::WAIT_FOR_LOCK));
+  }
   RETURN_NOT_OK(tablet->AcquireRowLocks(state()));
 
   TRACE("PREPARE: Finished.");
@@ -427,8 +437,13 @@ void WriteOpState::StartApplying() {
 void WriteOpState::FinishApplyingOrAbort(Op::OpResult result) {
   ReleaseMvccTxn(result);
 
-  TRACE("Releasing row and schema locks");
+  TRACE("Releasing partition, row and schema locks");
   ReleaseRowLocks();
+  if (result == Op::APPLIED) {
+    // NOTE: if the op was not successful, the lock will be released when this
+    // state is destructed.
+    TransferOrReleasePartitionLock();
+  }
   ReleaseSchemaLock();
 
   // After committing, if there is an RPC going on, the driver will respond to it.
@@ -531,6 +546,49 @@ void WriteOpState::ReleaseRowLocks() {
   rows_lock_.Release();
 }
 
+Status WriteOpState::AcquirePartitionLock(
+    LockManager* lock_manager,
+    LockManager::LockWaitMode wait_mode) {
+  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+  DCHECK(!partition_lock_.IsAcquired(&code));
+  TxnId txn_id;
+  if (request()->has_txn_id()) {
+    txn_id = request()->txn_id();
+  }
+  TRACE("Acquiring the partition lock for write op");
+  partition_lock_ = ScopedPartitionLock(lock_manager, txn_id, wait_mode);
+  bool acquired = partition_lock_.IsAcquired(&code);
+  if (!acquired) {
+    Status s;
+    if (code == TabletServerErrorPB::TXN_LOCKED_ABORT) {
+      s = Status::Aborted("Write op should be aborted since it tries to acquire the "
+                          "partition lock that is held by another transaction that "
+                          "has lower txn ID");
+    } else if (code == TabletServerErrorPB::TXN_LOCKED_RETRY_OP) {
+      s = Status::ServiceUnavailable("Write op should retry since it tries to acquire "
+                                     "the partition lock that is held by another transaction "
+                                     "that has higher txn ID");
+    } else {
+      LOG(DFATAL) << "unexpected error code " << code;
+    }
+    CHECK(!s.ok()) << s.ToString();
+    completion_callback()->set_error(s, code);
+    return s;
+  }
+  DCHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+  TRACE("Partition lock acquired for write op");
+  return Status::OK();
+}
+
+void WriteOpState::TransferOrReleasePartitionLock() {
+  if (txn_) {
+    txn_->AdoptPartitionLock(std::move(partition_lock_));
+  } else {
+    // If this isn't a transactional write, just release the partition lock.
+    partition_lock_.Release();
+  }
+}
+
 void WriteOpState::ReleaseTxnLock() {
   shared_lock<rw_semaphore> temp;
   txn_lock_.swap(temp);
diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h
index b1b1c9c..c8621fa 100644
--- a/src/kudu/tablet/ops/write_op.h
+++ b/src/kudu/tablet/ops/write_op.h
@@ -176,6 +176,13 @@ class WriteOpState : public OpState {
   // Acquire row locks for all of the rows in this Write.
   void AcquireRowLocks(LockManager* lock_manager);
 
+  // Acquire the partition lock for writes of the transaction associated with
+  // this request. If 'wait_mode' is 'WAIT_FOR_LOCK', then wait until the lock is
+  // acquired. Otherwise, if lock cannot be acquired, return 'Aborted' error if
+  // the op should be aborted or 'ServiceUnavailable' if the op should be retried.
+  Status AcquirePartitionLock(LockManager* lock_manager,
+                              LockManager::LockWaitMode wait_mode);
+
   // Acquires the lock on the given transaction, setting 'txn_' and
   // 'txn_lock_', which must be freed upon finishing this op. Checks if the
   // transaction is available to be written to, returning an error if not.
@@ -245,6 +252,14 @@ class WriteOpState : public OpState {
 
   std::string ToString() const override;
 
+  // Releases the partition lock acquired by this op. Unlike the other
+  // unlocking methods that just release locks, this transfers the ownership of
+  // the partition lock to the Txn that this write is a part of.
+  //
+  // If this is write was not a part of a transaction, this is just releases
+  // the partition lock.
+  void TransferOrReleasePartitionLock();
+
  private:
   // Releases all the row locks acquired by this op.
   void ReleaseRowLocks();
@@ -274,9 +289,12 @@ class WriteOpState : public OpState {
   // Protected by op_state_lock_.
   std::vector<RowOp*> row_ops_;
 
-  // Holds the LockManager locks acquired for this operation.
+  // Holds the row locks acquired for this operation.
   ScopedRowLock rows_lock_;
 
+  // Holds the partition lock acquired for this operation.
+  ScopedPartitionLock partition_lock_;
+
   // Array of ProbeStats for each of the operations in 'row_ops_'.
   // Allocated from this op's arena during SetRowOps().
   ProbeStats* stats_array_ = nullptr;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 1b1a27c..86b8f7f 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -576,10 +576,15 @@ Status Tablet::AcquireRowLocks(WriteOpState* op_state) {
 
   op_state->AcquireRowLocks(&lock_manager_);
 
-  TRACE("Locks acquired");
+  TRACE("Row locks acquired");
   return Status::OK();
 }
 
+Status Tablet::AcquirePartitionLock(WriteOpState* op_state,
+                                    LockManager::LockWaitMode wait_mode) {
+  return op_state->AcquirePartitionLock(&lock_manager_, wait_mode);
+}
+
 Status Tablet::AcquireTxnLock(int64_t txn_id, WriteOpState* op_state) {
   auto txn = txn_participant_.GetTransaction(txn_id);
   if (!txn) {
@@ -1172,7 +1177,8 @@ Status Tablet::CheckHasNotBeenStopped(State* cur_state) const {
   return Status::OK();
 }
 
-void Tablet::BeginTransaction(Txn* txn, const OpId& op_id) {
+void Tablet::BeginTransaction(Txn* txn,
+                              const OpId& op_id) {
   unique_ptr<MinLogIndexAnchorer> anchor(new MinLogIndexAnchorer(log_anchor_registry_.get(),
         Substitute("BEGIN_TXN-$0-$1", txn->txn_id(), txn)));
   anchor->AnchorIfMinimum(op_id.index());
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index bce8157..cc156ca 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -158,10 +158,17 @@ class Tablet {
   Status DecodeWriteOperations(const Schema* client_schema,
                                WriteOpState* op_state);
 
-  // Acquire locks for each of the operations in the given op.
+  // Acquire locks for each of the operations in the given write op.
   // This also sets the row op's RowSetKeyProbe.
   Status AcquireRowLocks(WriteOpState* op_state);
 
+  // Acquire locks for the given write op. If 'must_acquire' is true,
+  // then wait until the lock is acquired. Otherwise, return
+  // 'TXN_LOCKED_ABORT' or 'TXN_LOCKED_RETRY_OP' error if lock
+  // cannot be acquired.
+  Status AcquirePartitionLock(WriteOpState* op_state,
+                              LockManager::LockWaitMode wait_mode);
+
   // Acquire a shared lock on the given transaction, to ensure the
   // transaction's state doesn't change while the given write is in flight.
   Status AcquireTxnLock(int64_t txn_id, WriteOpState* op_state);
@@ -793,14 +800,15 @@ class Tablet {
 
   MvccManager mvcc_;
 
+  LockManager lock_manager_;
+
   // Maintains the set of in-flight transactions, and any WAL anchors
   // associated with them.
   // NOTE: the participant may retain MVCC ops, so define it after the
   // MvccManager, to ensure those ops get destructed before the MvccManager.
+  // The same goes for locks and the LockManager.
   TxnParticipant txn_participant_;
 
-  LockManager lock_manager_;
-
   std::unique_ptr<CompactionPolicy> compaction_policy_;
 
   // Lock protecting the selection of rowsets for compaction.
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 6795b5a..fb2a4fc 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -65,6 +65,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/result_tracker.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/tablet/lock_manager.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/ops/alter_schema_op.h"
@@ -1629,7 +1630,19 @@ Status TabletBootstrap::PlayRowOperations(const IOContext* io_context,
                         Substitute("Could not decode row operations: $0",
                                    SecureDebugString(op_state->request()->row_operations())));
 
-  // Run AcquireRowLocks, Apply, etc!
+  // If the write is a part of a transaction that's currently open (i.e., it
+  // has an in-flight Txn associated with it), lock the Txn now and make sure
+  // it's open.
+  // NOTE: we shouldn't take this lock if we've persisted that the transaction
+  // has completed, since there is no corresponding in-flight Txn.
+  if (op_state->txn_id() && !ContainsKey(terminal_txn_ids_, *op_state->txn_id())) {
+    RETURN_NOT_OK_PREPEND(tablet_->AcquireTxnLock(*op_state->txn_id(), op_state),
+                          "Failed to acquire txn lock");;
+  }
+
+  // Acquire partition/row locks, Apply, etc!
+  RETURN_NOT_OK_PREPEND(tablet_->AcquirePartitionLock(op_state, LockManager::WAIT_FOR_LOCK),
+                        "Failed to acquire partition lock");
   RETURN_NOT_OK_PREPEND(tablet_->AcquireRowLocks(op_state),
                         "Failed to acquire row locks");
 
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index f21b698..cedcebd 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -70,6 +70,7 @@
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
 
 using kudu::consensus::CommitMsg;
 using kudu::consensus::ConsensusBootstrapInfo;
@@ -87,6 +88,7 @@ using std::vector;
 using strings::Substitute;
 
 DECLARE_bool(enable_maintenance_manager);
+DECLARE_bool(enable_txn_partition_lock);
 DECLARE_bool(log_preallocate_segments);
 DECLARE_bool(log_async_preallocate_segments);
 
@@ -358,6 +360,9 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
 
 // Test that we have no trouble operating on separate transactions.
 TEST_F(TxnParticipantTest, TestConcurrentTransactions) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   const int kNumTxns = 10;
   vector<thread> threads;
   Status statuses[kNumTxns];
@@ -503,10 +508,93 @@ TEST_F(TxnParticipantTest, TestAllOpsRegisterAnchors) {
   }));
 }
 
+TEST_F(TxnParticipantTest, TestTakePartitionLockOnRestart) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Get to write some rows.
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN,
+                                       kDummyCommitTimestamp));
+
+  // We should be able to at least start another transaction.
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN,
+                                       kDummyCommitTimestamp));
+  ASSERT_OK(Write(0, kTxnOne));
+  const auto check_other_txns_cant_write = [&] {
+    // We'll try writing a couple times to make sure the act of writing doesn't
+    // somehow permit further writes to the transaction.
+    for (int i = 0; i < 2; i++) {
+      Status s = Write(0);
+      ASSERT_TRUE(s.IsAborted()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "partition lock that is held by another");
+
+      s = Write(0, kTxnTwo);
+      ASSERT_TRUE(s.IsAborted()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "partition lock that is held by another");
+    }
+  };
+  NO_FATALS(check_other_txns_cant_write());
+
+  // We shouldn't be able to write even after restarting.
+  ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+  NO_FATALS(check_other_txns_cant_write());
+
+  // We should be able to write as a part of the transaction though.
+  ASSERT_OK(Write(1, kTxnOne));
+
+  // Once we begin committing, we still shouldn't be able to write, even after
+  // restarting.
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT,
+                                       kDummyCommitTimestamp));
+  NO_FATALS(check_other_txns_cant_write());
+  ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+  NO_FATALS(check_other_txns_cant_write());
+  // We also shouldn't be able to write as a part of the transaction, since
+  // it's not open for further writes.
+  Status s = Write(2, kTxnOne);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "not open: COMMIT_IN_PROGRESS");
+
+  ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+
+  s = Write(2, kTxnOne);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "not open: COMMIT_IN_PROGRESS");
+
+  s = Write(0, kTxnTwo);
+  ASSERT_TRUE(s.IsAborted()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "partition lock that is held by another");
+
+  // Once we finalize the commit, we should be able to write again.
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::FINALIZE_COMMIT,
+                                       kDummyCommitTimestamp));
+
+  // And we shouldn't be able to write to the same transaction once committed.
+  s = Write(2, kTxnOne);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "not open");
+
+  // We should be able to write to the other transaction now that the partition
+  // lock isn't held.
+  ASSERT_OK(Write(2, kTxnTwo));
+
+  ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+
+  s = Write(2, kTxnOne);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "not open");
+
+  // We should be able to write to the other transaction now that the partition
+  // lock isn't held.
+  ASSERT_OK(Write(3, kTxnTwo));
+}
+
 // Test that participant ops result in tablet metadata updates that can survive
 // restarts, and that the appropriate anchors are in place as we progress
 // through a transaction's life cycle.
 TEST_F(TxnParticipantTest, TestTxnMetadataSurvivesRestart) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   // First, do a sanity check that there's nothing GCable.
   int64_t gcable_size;
   ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
@@ -741,6 +829,9 @@ TEST_P(MetadataFlushTxnParticipantTest, TestRebuildTxnMetadata) {
 
 // Test rebuilding transaction state, including writes, from WALs and metadata.
 TEST_P(MetadataFlushTxnParticipantTest, TestReplayTransactionalInserts) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   const bool should_flush = GetParam();
   constexpr const int64_t kAbortedTxnId = 2;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1));
@@ -815,6 +906,9 @@ INSTANTIATE_TEST_SUITE_P(ShouldFlushMetadata, MetadataFlushTxnParticipantTest,
 
 // Similar to the above test, but checking that in-flight ops anchor the WALs.
 TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ParticipantRequestPB req;
   ParticipantResponsePB resp;
   auto op_state = NewParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_TXN,
@@ -931,6 +1025,9 @@ TEST_F(TxnParticipantTest, TestUnsupportedOps) {
 // Test that rows inserted to transactional stores only show up when the
 // transactions complete.
 TEST_F(TxnParticipantTest, TestInsertToTransactionMRS) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(Write(0, kTxnOne));
@@ -964,6 +1061,9 @@ TEST_F(TxnParticipantTest, TestInsertToTransactionMRS) {
 // Test that rows inserted to transactional stores don't show up if the
 // transaction is aborted.
 TEST_F(TxnParticipantTest, TestDontReadAbortedInserts) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(Write(0, kTxnOne));
@@ -1018,6 +1118,9 @@ TEST_F(TxnParticipantTest, TestUpdateAfterAborting) {
 // Test that we can update rows that were inserted and committed as a part of a
 // transaction.
 TEST_F(TxnParticipantTest, TestUpdateCommittedTransactionMRS) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(Write(0, kTxnId));
 
@@ -1026,7 +1129,7 @@ TEST_F(TxnParticipantTest, TestUpdateCommittedTransactionMRS) {
   ASSERT_OK(IterateToStrings(&rows));
   ASSERT_EQ(0, rows.size());
   Status s = Delete(0);
-  ASSERT_TRUE(s.IsNotFound());
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, -1));
 
   // We still haven't finished committing, so we should see no rows.
@@ -1059,6 +1162,9 @@ TEST_F(TxnParticipantTest, TestUpdateCommittedTransactionMRS) {
 // Test that we can flush multiple MRSs, and that when restarting, ops are
 // replayed (or not) as appropriate.
 TEST_F(TxnParticipantTest, TestFlushMultipleMRSs) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   const int kNumTxns = 3;
   const int kNumRowsPerTxn = 100;
   vector<string> rows;
@@ -1137,6 +1243,9 @@ TEST_F(TxnParticipantTest, TestInsertIgnoreInTransactionMRS) {
 
 // Test that INSERT_IGNORE ops work when the row exists in the main MRS.
 TEST_F(TxnParticipantTest, TestInsertIgnoreInMainMRS) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1));
   // Insert into the main MRS, and then INSERT_IGNORE as a part of a
   // transaction.
@@ -1160,6 +1269,9 @@ TEST_F(TxnParticipantTest, TestInsertIgnoreInMainMRS) {
 
 // Test that the live row count accounts for transactional MRSs.
 TEST_F(TxnParticipantTest, TestLiveRowCountAccountsForTransactionalMRSs) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(Write(0));
@@ -1183,6 +1295,9 @@ TEST_F(TxnParticipantTest, TestLiveRowCountAccountsForTransactionalMRSs) {
 
 // Test that the MRS size metrics account for transactional MRSs.
 TEST_F(TxnParticipantTest, TestSizeAccountsForTransactionalMRS) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_TRUE(tablet_replica_->tablet()->MemRowSetEmpty());
@@ -1216,6 +1331,9 @@ TEST_F(TxnParticipantTest, TestSizeAccountsForTransactionalMRS) {
 
 // Test that the MRS anchored WALs metric accounts for transactional MRSs.
 TEST_F(TxnParticipantTest, TestWALsAnchoredAccountsForTransactionalMRS) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   const auto mrs_wal_size = [&] {
     map<int64_t, int64_t> replay_size_map;
     CHECK_OK(tablet_replica_->GetReplaySizeMap(&replay_size_map));
@@ -1288,6 +1406,9 @@ TEST_F(TxnParticipantTest, TestRacingCommitAndWrite) {
 
 // Test that the write metrics account for transactional rowsets.
 TEST_F(TxnParticipantTest, TestMRSLookupsMetric) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1));
 
@@ -1340,6 +1461,9 @@ class TxnParticipantConcurrencyTest : public TxnParticipantTest,
 
 // Test inserting into multiple transactions from multiple threads.
 TEST_P(TxnParticipantConcurrencyTest, TestConcurrentDisjointInsertsTxn) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   const auto& params = GetParam();
   const auto& num_txns = params.num_txns;
   const int kNumThreads = 10;
diff --git a/src/kudu/tablet/txn_participant.cc b/src/kudu/tablet/txn_participant.cc
index 5cbef6f..4a63d53 100644
--- a/src/kudu/tablet/txn_participant.cc
+++ b/src/kudu/tablet/txn_participant.cc
@@ -25,16 +25,18 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
 
 #include "kudu/common/timestamp.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/txn_metadata.h"
 
+DECLARE_bool(enable_txn_partition_lock);
+
 using kudu::log::LogAnchorRegistry;
+using kudu::tserver::TabletServerErrorPB;
 using std::vector;
-using strings::Substitute;
 
 namespace kudu {
 namespace tablet {
@@ -72,6 +74,24 @@ void Txn::AcquireReadLock(shared_lock<rw_semaphore>* txn_lock) {
   *txn_lock = std::move(l);
 }
 
+void Txn::AdoptPartitionLock(ScopedPartitionLock partition_lock) {
+  if (PREDICT_TRUE(FLAGS_enable_txn_partition_lock)) {
+    TabletServerErrorPB::Code code = tserver::TabletServerErrorPB::UNKNOWN_ERROR;
+#ifndef NDEBUG
+    CHECK(partition_lock.IsAcquired(&code)) << code;
+    if (partition_lock_.IsAcquired(&code)) {
+      // Make sure if we're adopting a lock while one is already held, that
+      // they're the same lock.
+      CHECK(partition_lock.HasSameState(partition_lock_));
+    }
+#endif
+    // Release the current lock and acquire the new one.
+    partition_lock_.Release();
+    partition_lock_ = std::move(partition_lock);
+    DCHECK(partition_lock_.IsAcquired(&code)) << code;
+  }
+}
+
 void TxnParticipant::CreateOpenTransaction(int64_t txn_id,
                                            LogAnchorRegistry* log_anchor_registry) {
   std::lock_guard<simple_spinlock> l(lock_);
diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h
index 8539372..92b7b00 100644
--- a/src/kudu/tablet/txn_participant.h
+++ b/src/kudu/tablet/txn_participant.h
@@ -34,6 +34,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/lock_manager.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tserver/tserver.pb.h"
@@ -108,6 +109,13 @@ class Txn : public RefCountedThreadSafe<Txn> {
   void AcquireWriteLock(std::unique_lock<rw_semaphore>* txn_lock);
   void AcquireReadLock(shared_lock<rw_semaphore>* txn_lock);
 
+  // Adopts the input partition lock, maintaining it until the transaction is
+  // complete (aborted or finalized). Rather than maintaining multiple
+  // ScopedPartitionLocks, this will release any currently-held lock and
+  // acquire the new one. It is thus expected that repeat callers are taking
+  // the same lock.
+  void AdoptPartitionLock(ScopedPartitionLock partition_lock);
+
   // Validates that the transaction is in the appropriate state to perform the
   // given operation. Should be called while holding the state lock before
   // replicating a participant op.
@@ -256,6 +264,10 @@ class Txn : public RefCountedThreadSafe<Txn> {
     return commit_op_.get();
   }
 
+  void ReleasePartitionLock() {
+    partition_lock_.Release();
+  }
+
  private:
   friend class RefCountedThreadSafe<Txn>;
 
@@ -345,6 +357,9 @@ class Txn : public RefCountedThreadSafe<Txn> {
   // repeatable.
   std::unique_ptr<ScopedOp> commit_op_;
 
+  // Holds the partition lock acquired for this transaction.
+  ScopedPartitionLock partition_lock_;
+
   DISALLOW_COPY_AND_ASSIGN(Txn);
 };
 
diff --git a/src/kudu/transactions/participant_rpc.cc b/src/kudu/transactions/participant_rpc.cc
index 36ece24..1cc76f6 100644
--- a/src/kudu/transactions/participant_rpc.cc
+++ b/src/kudu/transactions/participant_rpc.cc
@@ -171,12 +171,14 @@ RetriableRpcStatus ParticipantRpc::AnalyzeResponse(const Status& rpc_cb_status)
         return result;
       case TabletServerErrorPB::TABLET_NOT_RUNNING:
       case TabletServerErrorPB::THROTTLED:
+      case TabletServerErrorPB::TXN_LOCKED_RETRY_OP:
         result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
         return result;
       case TabletServerErrorPB::NOT_THE_LEADER:
         result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
         return result;
       case TabletServerErrorPB::TXN_ILLEGAL_STATE:
+      case TabletServerErrorPB::TXN_LOCKED_ABORT:
         result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
         return result;
       case TabletServerErrorPB::TXN_OP_ALREADY_APPLIED: