You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/04/14 19:01:50 UTC

[kudu] branch master updated (7fa9beb -> 2dda869)

This is an automated email from the ASF dual-hosted git repository.

awong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 7fa9beb  KUDU-2612: send next txn keepalive sooner in case of timeout
     new 8093f95  KUDU-2612: acquire and release partition lock
     new 2dda869  KUDU-2671: Adds new field to PartitionSchema.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/kudu/client/Operation.java     |   2 +-
 .../java/org/apache/kudu/client/TestOperation.java |   2 +-
 src/kudu/client/batcher.cc                         |  12 +-
 src/kudu/client/scan_token-internal.cc             |   7 +-
 src/kudu/common/CMakeLists.txt                     |  13 +-
 src/kudu/common/common.proto                       |  14 ++
 src/kudu/common/partition-test.cc                  | 193 +++++++++++++++++-
 src/kudu/common/partition.cc                       | 106 +++++++++-
 src/kudu/common/partition.h                        |  22 ++-
 src/kudu/common/partition_pruner.cc                |   5 +-
 src/kudu/common/row_operations-test.cc             |   2 +-
 src/kudu/common/row_operations.h                   |   2 +-
 src/kudu/common/row_operations.proto               |  83 ++++++++
 src/kudu/common/wire_protocol.proto                |  57 ------
 src/kudu/integration-tests/alter_table-test.cc     |   2 +-
 src/kudu/integration-tests/fuzz-itest.cc           |  10 +
 src/kudu/integration-tests/txn_commit-itest.cc     |  13 ++
 .../integration-tests/txn_participant-itest.cc     |  62 ++++++
 src/kudu/integration-tests/txn_write_ops-itest.cc  | 216 +++++++++++++++++++++
 src/kudu/master/catalog_manager.cc                 |  16 +-
 src/kudu/master/master.proto                       |   6 +-
 src/kudu/master/sys_catalog.cc                     |   2 +-
 src/kudu/tablet/lock_manager.h                     |   5 +
 src/kudu/tablet/ops/participant_op.cc              |  10 +
 src/kudu/tablet/ops/write_op.cc                    |  62 +++++-
 src/kudu/tablet/ops/write_op.h                     |  20 +-
 src/kudu/tablet/tablet.cc                          |  10 +-
 src/kudu/tablet/tablet.h                           |  14 +-
 src/kudu/tablet/tablet_bootstrap.cc                |  15 +-
 src/kudu/tablet/tablet_metadata.cc                 |   2 +-
 src/kudu/tablet/txn_participant-test.cc            | 126 +++++++++++-
 src/kudu/tablet/txn_participant.cc                 |  24 ++-
 src/kudu/tablet/txn_participant.h                  |  15 ++
 src/kudu/transactions/participant_rpc.cc           |   2 +
 src/kudu/tserver/tablet_copy_client.cc             |   1 -
 src/kudu/tserver/tablet_server-test.cc             |   1 +
 src/kudu/tserver/tablet_service.cc                 |   4 +-
 src/kudu/tserver/tserver.proto                     |   1 +
 38 files changed, 1053 insertions(+), 106 deletions(-)
 create mode 100644 src/kudu/common/row_operations.proto

[kudu] 02/02: KUDU-2671: Adds new field to PartitionSchema.

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 2dda869456659a36247eb89f5b9e5e3837e5f8a3
Author: Mahesh Reddy <mr...@cloudera.com>
AuthorDate: Mon Feb 1 14:53:30 2021 -0800

    KUDU-2671: Adds new field to PartitionSchema.
    
    This patch introduces a new field to PartitionSchema that combines range bounds
    and their respective hash bucket schemas. Any instance that assumes
    the same hash schemas are used for each range will need this new field.
    Some of the more important instances include partition pruning and many
    of the internal PartitionSchema functions.
    
    I moved RowOperationsPB to a separate .proto file due to some circular
    dependency issues between common.proto and wire_protocol.proto. Most of
    the proto changes in this patch revolve around this change.
    
    Change-Id: Ic5d8615ab9967fdb40292b9c77eb68a19baeca1d
    Reviewed-on: http://gerrit.cloudera.org:8080/17025
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../java/org/apache/kudu/client/Operation.java     |   2 +-
 .../java/org/apache/kudu/client/TestOperation.java |   2 +-
 src/kudu/client/scan_token-internal.cc             |   7 +-
 src/kudu/common/CMakeLists.txt                     |  13 +-
 src/kudu/common/common.proto                       |  14 ++
 src/kudu/common/partition-test.cc                  | 193 ++++++++++++++++++++-
 src/kudu/common/partition.cc                       | 106 ++++++++++-
 src/kudu/common/partition.h                        |  22 ++-
 src/kudu/common/partition_pruner.cc                |   5 +-
 src/kudu/common/row_operations-test.cc             |   2 +-
 src/kudu/common/row_operations.h                   |   2 +-
 src/kudu/common/row_operations.proto               |  83 +++++++++
 src/kudu/common/wire_protocol.proto                |  57 ------
 src/kudu/integration-tests/alter_table-test.cc     |   2 +-
 src/kudu/master/catalog_manager.cc                 |  16 +-
 src/kudu/master/master.proto                       |   6 +-
 src/kudu/master/sys_catalog.cc                     |   2 +-
 src/kudu/tablet/tablet_metadata.cc                 |   2 +-
 src/kudu/tserver/tablet_copy_client.cc             |   1 -
 src/kudu/tserver/tablet_server-test.cc             |   1 +
 src/kudu/tserver/tablet_service.cc                 |   4 +-
 src/kudu/tserver/tserver.proto                     |   1 +
 22 files changed, 453 insertions(+), 90 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 4981d5e..4d871c9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -34,10 +34,10 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.RowOperations.RowOperationsPB;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.WireProtocol.AppStatusPB.ErrorCode;
-import org.apache.kudu.WireProtocol.RowOperationsPB;
 import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
 import org.apache.kudu.client.Statistics.Statistic;
 import org.apache.kudu.client.Statistics.TabletStatistics;
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
index 20e7640..42c747c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
@@ -29,9 +29,9 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.RowOperations.RowOperationsPB;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.WireProtocol.RowOperationsPB;
 import org.apache.kudu.client.Operation.ChangeType;
 import org.apache.kudu.test.junit.RetryRule;
 import org.apache.kudu.tserver.Tserver.WriteRequestPBOrBuilder;
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 132e772..00afa2b 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -40,7 +40,6 @@
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/client/tablet-internal.h"
 #include "kudu/client/tablet_server-internal.h"
-#include "kudu/common/column_predicate.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/encoded_key.h"
 #include "kudu/common/partition.h"
@@ -69,6 +68,7 @@ using strings::Substitute;
 
 namespace kudu {
 
+class ColumnPredicate;
 using master::GetTableLocationsResponsePB;
 using master::TableIdentifierPB;
 using master::TabletLocationsPB;
@@ -131,7 +131,7 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
     KuduSchema kudu_schema(schema);
     PartitionSchema partition_schema;
     RETURN_NOT_OK(PartitionSchema::FromPB(metadata.partition_schema(), schema,
-        &partition_schema));
+                                          &partition_schema));
     map<string, string> extra_configs(metadata.extra_configs().begin(),
         metadata.extra_configs().end());
     table.reset(new KuduTable(client->shared_from_this(), metadata.table_name(),
@@ -341,7 +341,8 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
     RETURN_NOT_OK(SchemaToPB(KuduSchema::ToSchema(table->schema()), &schema_pb));
     *table_pb.mutable_schema() = std::move(schema_pb);
     PartitionSchemaPB partition_schema_pb;
-    table->partition_schema().ToPB(&partition_schema_pb);
+    RETURN_NOT_OK(table->partition_schema().ToPB(KuduSchema::ToSchema(table->schema()),
+                                                 &partition_schema_pb));
     table_pb.mutable_partition_schema()->CopyFrom(partition_schema_pb);
     table_pb.mutable_extra_configs()->insert(table->extra_configs().begin(),
                                              table->extra_configs().end());
diff --git a/src/kudu/common/CMakeLists.txt b/src/kudu/common/CMakeLists.txt
index e25d07e..1cb6207 100644
--- a/src/kudu/common/CMakeLists.txt
+++ b/src/kudu/common/CMakeLists.txt
@@ -22,10 +22,20 @@ PROTOBUF_GENERATE_CPP(
   PROTO_FILES common.proto)
 ADD_EXPORTABLE_LIBRARY(kudu_common_proto
   SRCS ${COMMON_PROTO_SRCS}
-  DEPS block_bloom_filter_proto hash_proto pb_util_proto protobuf util_compression_proto
+  DEPS block_bloom_filter_proto hash_proto protobuf row_operations_proto util_compression_proto
   NONLINK_DEPS ${COMMON_PROTO_TGTS})
 
 PROTOBUF_GENERATE_CPP(
+  ROW_OPERATIONS_PROTO_SRCS ROW_OPERATIONS_PROTO_HDRS ROW_OPERATIONS_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES row_operations.proto)
+ADD_EXPORTABLE_LIBRARY(row_operations_proto
+  SRCS ${ROW_OPERATIONS_PROTO_SRCS}
+  DEPS pb_util_proto protobuf
+  NONLINK_DEPS ${ROW_OPERATIONS_PROTO_TGTS})
+
+PROTOBUF_GENERATE_CPP(
   WIRE_PROTOCOL_PROTO_SRCS WIRE_PROTOCOL_PROTO_HDRS WIRE_PROTOCOL_PROTO_TGTS
   SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
   BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
@@ -77,6 +87,7 @@ set(COMMON_LIBS
   gutil
   kudu_common_proto
   kudu_util
+  row_operations_proto
   wire_protocol_proto)
 
 ADD_EXPORTABLE_LIBRARY(kudu_common
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index 0043669..2b51a7a 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -28,6 +28,7 @@ package kudu;
 
 option java_package = "org.apache.kudu";
 
+import "kudu/common/row_operations.proto";
 import "kudu/util/block_bloom_filter.proto";
 import "kudu/util/compression/compression.proto";
 import "kudu/util/hash.proto";
@@ -348,8 +349,21 @@ message PartitionSchemaPB {
     optional HashAlgorithm hash_algorithm = 4;
   }
 
+  message PerRangeHashBucketSchemasPB {
+    repeated HashBucketSchemaPB hash_schemas = 1;
+  }
+
   repeated HashBucketSchemaPB hash_bucket_schemas = 1;
   optional RangeSchemaPB range_schema = 2;
+
+  // Each index of 'range_bounds' represents the upper and lower bounds of
+  // ranges whose hash bucket schemas were specified. Its corresponding index
+  // of 'range_hash_schemas' represents that range's hash schema. An empty
+  // field of 'range_hash_schemas' indicates that the table wide hash schema
+  // specified in 'hash_bucket_schemas' is used. Both of these fields must have
+  // the same size.
+  repeated PerRangeHashBucketSchemasPB range_hash_schemas = 3;
+  repeated RowOperationsPB range_bounds = 4;
 }
 
 // The serialized format of a Kudu table partition.
diff --git a/src/kudu/common/partition-test.cc b/src/kudu/common/partition-test.cc
index 70b63f8..e71cce0 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -28,10 +28,13 @@
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <google/protobuf/util/message_differencer.h>
 #include <gtest/gtest.h>
 
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
+#include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/util/slice.h"
@@ -40,6 +43,7 @@
 #include "kudu/util/test_util.h"
 
 using boost::optional;
+using google::protobuf::util::MessageDifferencer;
 using std::pair;
 using std::string;
 using std::vector;
@@ -974,6 +978,23 @@ void CheckPartitions(const vector<Partition>& partitions) {
   EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a5\0\0b5\0\0", 16),partitions[15].partition_key_start());
   EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a6\0\0\0\0c6", 16),partitions[15].partition_key_end());
 }
+
+void CheckSerializationFunctions(const PartitionSchemaPB& pb,
+                                 const PartitionSchema& partition_schema,
+                                 const Schema& schema) {
+
+  PartitionSchemaPB pb1;
+  ASSERT_OK(partition_schema.ToPB(schema, &pb1));
+
+  // Compares original protobuf message to encoded protobuf message.
+  MessageDifferencer::Equals(pb, pb1);
+
+  PartitionSchema partition_schema1;
+  ASSERT_OK(PartitionSchema::FromPB(pb1, schema, &partition_schema1));
+
+  ASSERT_TRUE(partition_schema.Equals(partition_schema1));
+}
+
 } // namespace
 
 TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
@@ -990,6 +1011,7 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
   AddHashBucketComponent(&schema_builder, { "b" }, 2, 0);
   PartitionSchema partition_schema;
   ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema));
+  CheckSerializationFunctions(schema_builder, partition_schema, schema);
 
   ASSERT_EQ("HASH (a, c) PARTITIONS 3, HASH (b) PARTITIONS 2, RANGE (a, b, c)",
             partition_schema.DebugString(schema));
@@ -1092,7 +1114,7 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
 
 TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) {
   // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
-  // PARTITION BY [HASH BUCKET (a, c), HASH BUCKET (b), RANGE (a, b, c)];
+  // PARTITION BY [HASH BUCKET (b), RANGE (a, b, c)];
   Schema schema({ ColumnSchema("a", STRING),
                   ColumnSchema("b", STRING),
                   ColumnSchema("c", STRING) },
@@ -1103,6 +1125,7 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) {
   AddHashBucketComponent(&schema_builder, { "b" }, 2, 0);
   PartitionSchema partition_schema;
   ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema));
+  CheckSerializationFunctions(schema_builder, partition_schema, schema);
 
   ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b, c)",
             partition_schema.DebugString(schema));
@@ -1228,4 +1251,172 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) {
   EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a4\0\0b4\0\0", 16), partitions[11].partition_key_start());
   EXPECT_EQ("", partitions[11].partition_key_end());
 }
+
+TEST_F(PartitionTest, TestPartitionSchemaPB) {
+  // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
+  // PARTITION BY [HASH BUCKET (b), RANGE (a, b, c)];
+  Schema schema({ ColumnSchema("a", STRING),
+                  ColumnSchema("b", STRING),
+                  ColumnSchema("c", STRING) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
+
+  PartitionSchemaPB pb;
+  // Table-wide hash schema defined below.
+  AddHashBucketComponent(&pb, { "b" }, 2, 0);
+
+  // [(a0, _, c0), (a0, _, c1))
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a0"));
+    ASSERT_OK(lower.SetStringCopy("c", "c0"));
+    ASSERT_OK(upper.SetStringCopy("a", "a0"));
+    ASSERT_OK(upper.SetStringCopy("c", "c1"));
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    auto range_hash_component = pb.add_range_hash_schemas();
+    auto hash_component = range_hash_component->add_hash_schemas();
+    hash_component->add_columns()->set_name("a");
+    hash_component->set_num_buckets(4);
+  }
+
+  // [(a1, _, c2), (a1, _, c3))
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a1"));
+    ASSERT_OK(lower.SetStringCopy("c", "c2"));
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    ASSERT_OK(upper.SetStringCopy("c", "c3"));
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    auto range_hash_component = pb.add_range_hash_schemas();
+    auto hash_component_1 = range_hash_component->add_hash_schemas();
+    hash_component_1->add_columns()->set_name("a");
+    hash_component_1->set_num_buckets(2);
+    auto hash_component_2 = range_hash_component->add_hash_schemas();
+    hash_component_2->add_columns()->set_name("b");
+    hash_component_2->set_num_buckets(3);
+  }
+
+  // [(a2, _, c4), (a2, _, c5))
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a2"));
+    ASSERT_OK(lower.SetStringCopy("c", "c4"));
+    ASSERT_OK(upper.SetStringCopy("a", "a2"));
+    ASSERT_OK(upper.SetStringCopy("c", "c5"));
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    // empty field implies use of table wide hash schema
+    pb.add_range_hash_schemas();
+  }
+
+  PartitionSchema partition_schema;
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  // Check fields of 'partition_schema' to verify decoder function.
+  ASSERT_EQ(1, partition_schema.hash_partition_schemas().size());
+  const auto& ranges_with_hash_schemas = partition_schema.ranges_with_hash_schemas();
+  ASSERT_EQ(3, ranges_with_hash_schemas.size());
+
+  EXPECT_EQ(string("a0\0\0\0\0c0", 8), ranges_with_hash_schemas[0].lower);
+  EXPECT_EQ(string("a0\0\0\0\0c1", 8), ranges_with_hash_schemas[0].upper);
+  EXPECT_EQ(1, ranges_with_hash_schemas[0].hash_schemas.size());
+
+  const auto& range1_hash_schema = ranges_with_hash_schemas[0].hash_schemas[0];
+  EXPECT_EQ(1, range1_hash_schema.column_ids.size());
+  EXPECT_EQ(0, range1_hash_schema.column_ids[0]);
+  EXPECT_EQ(4, range1_hash_schema.num_buckets);
+
+  EXPECT_EQ(string("a1\0\0\0\0c2", 8), ranges_with_hash_schemas[1].lower);
+  EXPECT_EQ(string("a1\0\0\0\0c3", 8), ranges_with_hash_schemas[1].upper);
+  EXPECT_EQ(2, ranges_with_hash_schemas[1].hash_schemas.size());
+
+  const auto& range2_hash_schema_1 = ranges_with_hash_schemas[1].hash_schemas[0];
+  EXPECT_EQ(1, range2_hash_schema_1.column_ids.size());
+  EXPECT_EQ(0, range2_hash_schema_1.column_ids[0]);
+  EXPECT_EQ(2, range2_hash_schema_1.num_buckets);
+
+  const auto& range2_hash_schema_2 = ranges_with_hash_schemas[1].hash_schemas[1];
+  EXPECT_EQ(1, range2_hash_schema_2.column_ids.size());
+  EXPECT_EQ(1, range2_hash_schema_2.column_ids[0]);
+  EXPECT_EQ(3, range2_hash_schema_2.num_buckets);
+
+  EXPECT_EQ(string("a2\0\0\0\0c4", 8), ranges_with_hash_schemas[2].lower);
+  EXPECT_EQ(string("a2\0\0\0\0c5", 8), ranges_with_hash_schemas[2].upper);
+  EXPECT_EQ(0, ranges_with_hash_schemas[2].hash_schemas.size());
+
+  CheckSerializationFunctions(pb, partition_schema, schema);
+}
+
+TEST_F(PartitionTest, TestMalformedPartitionSchemaPB) {
+  // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
+  // PARTITION BY [RANGE (a, b, c)];
+  Schema schema({ ColumnSchema("a", STRING),
+                  ColumnSchema("b", STRING),
+                  ColumnSchema("c", STRING) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
+
+  PartitionSchemaPB pb;
+
+  // Testing that only a pair of range bounds is allowed.
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    KuduPartialRow extra(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a0"));
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    ASSERT_OK(extra.SetStringCopy("a", "a2"));
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, extra);
+  }
+
+  PartitionSchema partition_schema;
+  Status s = PartitionSchema::FromPB(pb, schema, &partition_schema);
+  ASSERT_EQ("Invalid argument: 3 ops were provided; "
+            "Only two ops are expected for this pair of range bounds.",
+            s.ToString());
+
+  pb.Clear();
+  // Testing that no split rows are allowed.
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow split(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(split.SetStringCopy("a", "a0"));
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    encoder.Add(RowOperationsPB::SPLIT_ROW, split);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+  }
+
+  Status s1 = PartitionSchema::FromPB(pb, schema, &partition_schema);
+  ASSERT_EQ("Invalid argument: Illegal row operation type in request: 4",
+            s1.ToString());
+
+  pb.Clear();
+  // Testing that 2nd bound is either RANGE_UPPER_BOUND or INCLUSIVE_RANGE_UPPER_BOUND.
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a0"));
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::SPLIT_ROW, upper);
+  }
+
+  Status s2 = PartitionSchema::FromPB(pb, schema, &partition_schema);
+  ASSERT_EQ("Invalid argument: missing upper range bound in request",
+            s2.ToString());
+}
 } // namespace kudu
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index ae276c1..e7787d0 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <cstring>
 #include <iterator>
+#include <memory>
 #include <set>
 #include <string>
 #include <unordered_set>
@@ -32,6 +33,8 @@
 #include "kudu/common/key_encoder.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row.h"
+#include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/endian.h"
 #include "kudu/gutil/map-util.h"
@@ -177,9 +180,60 @@ Status PartitionSchema::ExtractHashBucketSchemasFromPB(
 Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
                                const Schema& schema,
                                PartitionSchema* partition_schema) {
+  return FromPB(pb, schema, schema, partition_schema);
+}
+
+Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
+                               const Schema& schema,
+                               const Schema& client_schema,
+                               PartitionSchema* partition_schema) {
   partition_schema->Clear();
   RETURN_NOT_OK(ExtractHashBucketSchemasFromPB(schema, pb.hash_bucket_schemas(),
-                             &partition_schema->hash_bucket_schemas_));
+                                               &partition_schema->hash_bucket_schemas_));
+  RangeHashSchema range_hash_schema;
+  range_hash_schema.resize(pb.range_hash_schemas_size());
+  for (int i = 0; i < pb.range_hash_schemas_size(); i++) {
+    RETURN_NOT_OK(ExtractHashBucketSchemasFromPB(schema, pb.range_hash_schemas(i).hash_schemas(),
+                                                 &range_hash_schema[i]));
+  }
+  vector<pair<KuduPartialRow, KuduPartialRow>> range_bounds;
+  for (int i = 0; i < pb.range_bounds_size(); i++) {
+    RowOperationsPBDecoder decoder(&pb.range_bounds(i), &client_schema, &schema, nullptr);
+    vector<DecodedRowOperation> ops;
+    RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));
+    if (ops.size() != 2) {
+      return Status::InvalidArgument(Substitute("$0 ops were provided; Only two ops are expected "
+                                                "for this pair of range bounds.", ops.size()));
+    }
+    const DecodedRowOperation& op1 = ops[0];
+    const DecodedRowOperation& op2 = ops[1];
+    switch (op1.type) {
+      case RowOperationsPB::RANGE_LOWER_BOUND:
+      case RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND: {
+        if (op2.type != RowOperationsPB::RANGE_UPPER_BOUND &&
+            op2.type != RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
+          return Status::InvalidArgument("missing upper range bound in request");
+        }
+
+        // Lower bound range partition keys are inclusive and upper bound range partition keys
+        // are exclusive by design. If the provided keys are not of this format, these keys
+        // will be transformed to their proper format.
+        if (op1.type == RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND) {
+          RETURN_NOT_OK(partition_schema->MakeLowerBoundRangePartitionKeyInclusive(
+              op1.split_row.get()));
+        }
+        if (op2.type == RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
+          RETURN_NOT_OK(partition_schema->MakeUpperBoundRangePartitionKeyExclusive(
+              op2.split_row.get()));
+        }
+        range_bounds.emplace_back(*op1.split_row, *op2.split_row);
+        break;
+      }
+      default:
+        return Status::InvalidArgument(
+            Substitute("Illegal row operation type in request: $0", op1.type));
+    }
+  }
 
   if (pb.has_range_schema()) {
     const PartitionSchemaPB_RangeSchemaPB& range_pb = pb.range_schema();
@@ -194,10 +248,15 @@ Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
     }
   }
 
+  if (!range_bounds.empty()) {
+    RETURN_NOT_OK(partition_schema->EncodeRangeBounds(
+        range_bounds, range_hash_schema, schema, &partition_schema->ranges_with_hash_schemas_));
+  }
+
   return partition_schema->Validate(schema);
 }
 
-void PartitionSchema::ToPB(PartitionSchemaPB* pb) const {
+Status PartitionSchema::ToPB(const Schema& schema, PartitionSchemaPB* pb) const {
   pb->Clear();
   pb->mutable_hash_bucket_schemas()->Reserve(hash_bucket_schemas_.size());
   for (const HashBucketSchema& hash_bucket : hash_bucket_schemas_) {
@@ -207,7 +266,34 @@ void PartitionSchema::ToPB(PartitionSchemaPB* pb) const {
     hash_bucket_pb->set_seed(hash_bucket.seed);
   }
 
+  if (!ranges_with_hash_schemas_.empty()) {
+    pb->mutable_range_hash_schemas()->Reserve(ranges_with_hash_schemas_.size());
+    pb->mutable_range_bounds()->Reserve(ranges_with_hash_schemas_.size());
+    Arena arena(256);
+    for (const auto& range_hash_schema : ranges_with_hash_schemas_) {
+      RowOperationsPBEncoder encoder(pb->add_range_bounds());
+      arena.Reset();
+      KuduPartialRow lower(&schema);
+      KuduPartialRow upper(&schema);
+      Slice s_lower = Slice(range_hash_schema.lower);
+      Slice s_upper = Slice(range_hash_schema.upper);
+      RETURN_NOT_OK(DecodeRangeKey(&s_lower, &lower, &arena));
+      RETURN_NOT_OK(DecodeRangeKey(&s_upper, &upper, &arena));
+      encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+      encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+      auto* range_hash_schema_pb = pb->add_range_hash_schemas();
+      for (const auto& hash_bucket : range_hash_schema.hash_schemas) {
+        auto* hash_bucket_pb = range_hash_schema_pb->add_hash_schemas();
+        SetColumnIdentifiers(hash_bucket.column_ids, hash_bucket_pb->mutable_columns());
+        hash_bucket_pb->set_num_buckets(hash_bucket.num_buckets);
+        hash_bucket_pb->set_seed(hash_bucket.seed);
+      }
+    }
+  }
+
   SetColumnIdentifiers(range_schema_.column_ids, pb->mutable_range_schema()->mutable_columns());
+  return Status::OK();
 }
 
 template<typename Row>
@@ -1103,11 +1189,14 @@ Status PartitionSchema::BucketForRow(const ConstContiguousRow& row,
 void PartitionSchema::Clear() {
   hash_bucket_schemas_.clear();
   range_schema_.column_ids.clear();
+  ranges_with_hash_schemas_.clear();
+
 }
 
-Status PartitionSchema::Validate(const Schema& schema) const {
+Status PartitionSchema::ValidateHashBucketSchemas(const Schema& schema,
+                                                  const HashBucketSchemas& hash_schemas) {
   set<ColumnId> hash_columns;
-  for (const PartitionSchema::HashBucketSchema& hash_schema : hash_bucket_schemas_) {
+  for (const PartitionSchema::HashBucketSchema& hash_schema : hash_schemas) {
     if (hash_schema.num_buckets < 2) {
       return Status::InvalidArgument("must have at least two hash buckets");
     }
@@ -1131,6 +1220,15 @@ Status PartitionSchema::Validate(const Schema& schema) const {
       }
     }
   }
+  return Status::OK();
+}
+
+Status PartitionSchema::Validate(const Schema& schema) const {
+  RETURN_NOT_OK(ValidateHashBucketSchemas(schema, hash_bucket_schemas_));
+
+  for (const auto& range_with_hash_schemas : ranges_with_hash_schemas_) {
+    RETURN_NOT_OK(ValidateHashBucketSchemas(schema, range_with_hash_schemas.hash_schemas));
+  }
 
   for (const ColumnId& column_id : range_schema_.column_ids) {
     int32_t column_idx = schema.find_column_by_id(column_id);
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index 85c58e9..2226e45 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -175,8 +175,16 @@ class PartitionSchema {
                        const Schema& schema,
                        PartitionSchema* partition_schema) WARN_UNUSED_RESULT;
 
+  // Overloaded function similar to function above, used when an
+  // explicit client schema is available to decode the range bounds.
+  static Status FromPB(const PartitionSchemaPB& pb,
+                       const Schema& schema,
+                       const Schema& client_schema,
+                       PartitionSchema* partition_schema) WARN_UNUSED_RESULT;
+
   // Serializes a partition schema into a protobuf message.
-  void ToPB(PartitionSchemaPB* pb) const;
+  // Requires a schema to encode the range bounds.
+  Status ToPB(const Schema& schema, PartitionSchemaPB* pb) const;
 
   // Appends the row's encoded partition key into the provided buffer.
   // On failure, the buffer may have data partially appended.
@@ -192,7 +200,7 @@ class PartitionSchema {
   // of resulting partitions is the product of the number of hash buckets for
   // each hash bucket component, multiplied by
   // (split_rows.size() + max(1, range_bounds.size())).
-  // 'range_hash_schema' contains each range's HashBucketSchemas,
+  // 'range_hash_schemas' contains each range's HashBucketSchemas,
   // its order corresponds to the bounds in 'range_bounds'.
   // If 'range_hash_schemas' is empty, the table wide hash schema is used per range.
   // Size of 'range_hash_schemas' and 'range_bounds' are equal if 'range_hash_schema' isn't empty.
@@ -304,6 +312,10 @@ class PartitionSchema {
     return hash_bucket_schemas_;
   }
 
+  const std::vector<RangeWithHashSchemas>& ranges_with_hash_schemas() const {
+    return ranges_with_hash_schemas_;
+  }
+
   // Gets the vector containing the column indexes of the range partition keys.
   // If any of the columns is not in the key range columns then an
   // InvalidArgument status is returned.
@@ -414,6 +426,10 @@ class PartitionSchema {
   // Clears the state of this partition schema.
   void Clear();
 
+  // Helper function that validates the hash bucket schemas.
+  static Status ValidateHashBucketSchemas(const Schema& schema,
+                                          const HashBucketSchemas& hash_schemas);
+
   // Validates that this partition schema is valid. Returns OK, or an
   // appropriate error code for an invalid partition schema.
   Status Validate(const Schema& schema) const;
@@ -453,6 +469,8 @@ class PartitionSchema {
 
   HashBucketSchemas hash_bucket_schemas_;
   RangeSchema range_schema_;
+
+  std::vector<RangeWithHashSchemas> ranges_with_hash_schemas_;
 };
 
 } // namespace kudu
diff --git a/src/kudu/common/partition_pruner.cc b/src/kudu/common/partition_pruner.cc
index c96e0bb..2c39e10 100644
--- a/src/kudu/common/partition_pruner.cc
+++ b/src/kudu/common/partition_pruner.cc
@@ -18,8 +18,8 @@
 #include "kudu/common/partition_pruner.h"
 
 #include <algorithm>
-#include <cstring>
 #include <cstdint>
+#include <cstring>
 #include <iterator>
 #include <memory>
 #include <numeric>
@@ -43,6 +43,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/array_view.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/slice.h"
 
@@ -281,7 +282,7 @@ void PartitionPruner::Init(const Schema& schema,
   // components and a range component, then a few patterns emerge from the
   // examples above:
   //
-  // 1) The partition keys are truncated after the final constrained component
+  // 1) The partition keys are truncated after the final constrained component.
   //    Hash bucket components are constrained when the scan is limited to a
   //    subset of buckets via equality or in-list predicates on that component.
   //    Range components are constrained if they have an upper or lower bound
diff --git a/src/kudu/common/row_operations-test.cc b/src/kudu/common/row_operations-test.cc
index d56b77b..d501180 100644
--- a/src/kudu/common/row_operations-test.cc
+++ b/src/kudu/common/row_operations-test.cc
@@ -32,8 +32,8 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/dynamic_annotations.h"
diff --git a/src/kudu/common/row_operations.h b/src/kudu/common/row_operations.h
index 0717603..657a875 100644
--- a/src/kudu/common/row_operations.h
+++ b/src/kudu/common/row_operations.h
@@ -23,7 +23,7 @@
 #include <vector>
 
 #include "kudu/common/row_changelist.h"
-#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/bitset.h"
 #include "kudu/util/slice.h"
diff --git a/src/kudu/common/row_operations.proto b/src/kudu/common/row_operations.proto
new file mode 100644
index 0000000..c8c8431
--- /dev/null
+++ b/src/kudu/common/row_operations.proto
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Separate file created to resolve circular dependency between
+// common.proto and wire_protocol.proto due to necessity of
+// RowOperationsPB in PartitionSchemaPB to support varying
+// hash schemas per range.
+syntax = "proto2";
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+import "kudu/util/pb_util.proto";
+
+// A set of operations (INSERT, UPDATE, UPSERT, or DELETE) to apply to a table,
+// or the set of split rows and range bounds when creating or altering table.
+// Range bounds determine the boundaries of range partitions during table
+// creation, split rows further subdivide the ranges into more partitions.
+message RowOperationsPB {
+  enum Type {
+    UNKNOWN = 0;
+    INSERT = 1;
+    UPDATE = 2;
+    DELETE = 3;
+    UPSERT = 5;
+    INSERT_IGNORE = 10;
+    UPDATE_IGNORE = 11;
+    DELETE_IGNORE = 12;
+
+    // Used when specifying split rows on table creation.
+    SPLIT_ROW = 4;
+    // Used when specifying an inclusive lower bound range on table creation.
+    // Should be followed by the associated upper bound. If all values are
+    // missing, then signifies unbounded.
+    RANGE_LOWER_BOUND = 6;
+    // Used when specifying an exclusive upper bound range on table creation.
+    // Should be preceded by the associated lower bound. If all values are
+    // missing, then signifies unbounded.
+    RANGE_UPPER_BOUND = 7;
+    // Used when specifying an exclusive lower bound range on table creation.
+    // Should be followed by the associated upper bound. If all values are
+    // missing, then signifies unbounded.
+    EXCLUSIVE_RANGE_LOWER_BOUND = 8;
+    // Used when specifying an inclusive upper bound range on table creation.
+    // Should be preceded by the associated lower bound. If all values are
+    // missing, then signifies unbounded.
+    INCLUSIVE_RANGE_UPPER_BOUND = 9;
+  }
+
+  // The row data for each operation is stored in the following format:
+  //
+  // [operation type] (one byte):
+  //   A single-byte field which determines the type of operation. The values are
+  //   based on the 'Type' enum above.
+  // [column isset bitmap]   (one bit for each column in the Schema, rounded to nearest byte)
+  //   A set bit in this bitmap indicates that the user has specified the given column
+  //   in the row. This indicates that the column will be present in the data to follow.
+  // [null bitmap]           (one bit for each Schema column, rounded to nearest byte)
+  //   A set bit in this bitmap indicates that the given column is NULL.
+  //   This is only present if there are any nullable columns.
+  // [column data]
+  //   For each column which is set and not NULL, the column's data follows. The data
+  //   format of each cell is the canonical in-memory format (eg little endian).
+  //   For string data, the pointers are relative to 'indirect_data'.
+  //
+  // The rows are concatenated end-to-end with no padding/alignment.
+  optional bytes rows = 2 [(kudu.REDACT) = true];
+  optional bytes indirect_data = 3 [(kudu.REDACT) = true];
+}
diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto
index cce7a87..b6b0c69 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -27,7 +27,6 @@ option java_package = "org.apache.kudu";
 
 import "kudu/common/common.proto";
 import "kudu/consensus/metadata.proto";
-import "kudu/util/pb_util.proto";
 
 // Error status returned by any RPC method.
 // Every RPC method which could generate an application-level error
@@ -182,59 +181,3 @@ message ColumnarRowBlockPB {
   repeated Column columns = 1;
   optional int64 num_rows = 2;
 }
-
-// A set of operations (INSERT, UPDATE, UPSERT, or DELETE) to apply to a table,
-// or the set of split rows and range bounds when creating or altering table.
-// Range bounds determine the boundaries of range partitions during table
-// creation, split rows further subdivide the ranges into more partitions.
-message RowOperationsPB {
-  enum Type {
-    UNKNOWN = 0;
-    INSERT = 1;
-    UPDATE = 2;
-    DELETE = 3;
-    UPSERT = 5;
-    INSERT_IGNORE = 10;
-    UPDATE_IGNORE = 11;
-    DELETE_IGNORE = 12;
-
-    // Used when specifying split rows on table creation.
-    SPLIT_ROW = 4;
-    // Used when specifying an inclusive lower bound range on table creation.
-    // Should be followed by the associated upper bound. If all values are
-    // missing, then signifies unbounded.
-    RANGE_LOWER_BOUND = 6;
-    // Used when specifying an exclusive upper bound range on table creation.
-    // Should be preceded by the associated lower bound. If all values are
-    // missing, then signifies unbounded.
-    RANGE_UPPER_BOUND = 7;
-    // Used when specifying an exclusive lower bound range on table creation.
-    // Should be followed by the associated upper bound. If all values are
-    // missing, then signifies unbounded.
-    EXCLUSIVE_RANGE_LOWER_BOUND = 8;
-    // Used when specifying an inclusive upper bound range on table creation.
-    // Should be preceded by the associated lower bound. If all values are
-    // missing, then signifies unbounded.
-    INCLUSIVE_RANGE_UPPER_BOUND = 9;
-  }
-
-  // The row data for each operation is stored in the following format:
-  //
-  // [operation type] (one byte):
-  //   A single-byte field which determines the type of operation. The values are
-  //   based on the 'Type' enum above.
-  // [column isset bitmap]   (one bit for each column in the Schema, rounded to nearest byte)
-  //   A set bit in this bitmap indicates that the user has specified the given column
-  //   in the row. This indicates that the column will be present in the data to follow.
-  // [null bitmap]           (one bit for each Schema column, rounded to nearest byte)
-  //   A set bit in this bitmap indicates that the given column is NULL.
-  //   This is only present if there are any nullable columns.
-  // [column data]
-  //   For each column which is set and not NULL, the column's data follows. The data
-  //   format of each cell is the canonical in-memory format (eg little endian).
-  //   For string data, the pointers are relative to 'indirect_data'.
-  //
-  // The rows are concatenated end-to-end with no padding/alignment.
-  optional bytes rows = 2 [(kudu.REDACT) = true];
-  optional bytes indirect_data = 3 [(kudu.REDACT) = true];
-}
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 6d6f65e..42e4818 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -1875,7 +1875,7 @@ TEST_F(AlterTableTest, TestAddRangePartitionConflictExhaustive) {
     if (a_lower_bound == b_lower_bound && a_upper_bound == b_upper_bound) {
       ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
       ASSERT_STR_CONTAINS(s.ToString(),
-                          "new range partiton duplicates another newly added one");
+                          "new range partition duplicates another newly added one");
     } else {
       ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
       ASSERT_STR_CONTAINS(s.ToString(),
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index f3c2422..dd7ce71 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -71,6 +71,7 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/common/wire_protocol.h"
@@ -1676,8 +1677,8 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   // partitioned on the primary key columns) will be used.
   PartitionSchema partition_schema;
   RETURN_NOT_OK(SetupError(
-        PartitionSchema::FromPB(req.partition_schema(), schema, &partition_schema),
-        resp, MasterErrorPB::INVALID_SCHEMA));
+      PartitionSchema::FromPB(req.partition_schema(), schema, client_schema, &partition_schema),
+      resp, MasterErrorPB::INVALID_SCHEMA));
 
   // Decode split rows.
   vector<KuduPartialRow> split_rows;
@@ -1708,11 +1709,11 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
 
         if (op.type == RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND) {
           RETURN_NOT_OK(partition_schema.MakeLowerBoundRangePartitionKeyInclusive(
-                op.split_row.get()));
+              op.split_row.get()));
         }
         if (ops[i].type == RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
           RETURN_NOT_OK(partition_schema.MakeUpperBoundRangePartitionKeyExclusive(
-                ops[i].split_row.get()));
+              ops[i].split_row.get()));
         }
         range_bounds.emplace_back(*op.split_row, *ops[i].split_row);
         break;
@@ -2015,7 +2016,7 @@ scoped_refptr<TableInfo> CatalogManager::CreateTableInfo(
   // Use the Schema object passed in, since it has the column IDs already assigned,
   // whereas the user request PB does not.
   CHECK_OK(SchemaToPB(schema, metadata->mutable_schema()));
-  partition_schema.ToPB(metadata->mutable_partition_schema());
+  CHECK_OK(partition_schema.ToPB(schema, metadata->mutable_partition_schema()));
   metadata->set_create_timestamp(time(nullptr));
   (*metadata->mutable_extra_config()) = std::move(extra_config_pb);
   table->RegisterMetrics(master_->metric_registry(), metadata->name());
@@ -2436,7 +2437,8 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
   Schema schema;
   RETURN_NOT_OK(SchemaFromPB(l.data().pb.schema(), &schema));
   PartitionSchema partition_schema;
-  RETURN_NOT_OK(PartitionSchema::FromPB(l.data().pb.partition_schema(), schema, &partition_schema));
+  RETURN_NOT_OK(PartitionSchema::FromPB(l.data().pb.partition_schema(), schema,
+                                        client_schema, &partition_schema));
 
   TableInfo::TabletInfoMap existing_tablets = table->tablet_map();
   TableInfo::TabletInfoMap new_tablets;
@@ -2556,7 +2558,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
             if (lower_bound == p.partition_key_start() &&
                 upper_bound == p.partition_key_end()) {
               return Status::AlreadyPresent(
-                  "new range partiton duplicates another newly added one",
+                  "new range partition duplicates another newly added one",
                   partition_schema.RangePartitionDebugString(*ops[0].split_row,
                                                              *ops[1].split_row));
             }
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 2d64f41..51d57e6 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -20,6 +20,7 @@ package kudu.master;
 option java_package = "org.apache.kudu.master";
 
 import "kudu/common/common.proto";
+import "kudu/common/row_operations.proto";
 import "kudu/common/wire_protocol.proto";
 import "kudu/consensus/metadata.proto";
 import "kudu/consensus/replica_management.proto";
@@ -493,9 +494,6 @@ message GetTabletLocationsResponsePB {
 // ============================================================================
 //  Catalog
 // ============================================================================
-message PerRangeHashBucketSchemasPB {
-  repeated PartitionSchemaPB.HashBucketSchemaPB hash_schemas = 1;
-}
 
 message CreateTableRequestPB {
   required string name = 1;
@@ -512,7 +510,7 @@ message CreateTableRequestPB {
   // split rows are specified. If this field is set, its size must match the number of ranges
   // specified by range bounds and they must be in the same order. If this field is empty,
   // 'partition_schema' is assumed for every range bound.
-  repeated PerRangeHashBucketSchemasPB range_hash_schemas = 12;
+  repeated PartitionSchemaPB.PerRangeHashBucketSchemasPB range_hash_schemas = 12;
   optional int32 num_replicas = 4;
 
   // If set, uses the provided value as the table owner when creating the table.
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 71506e2..54cee49 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -39,12 +39,12 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/rowblock_memory.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/consensus_peers.h"
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 69fcc84..fe321b2 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -738,7 +738,7 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
   partition_.ToPB(pb.mutable_partition());
   pb.set_last_durable_mrs_id(last_durable_mrs_id_);
   pb.set_schema_version(schema_version_);
-  partition_schema_.ToPB(pb.mutable_partition_schema());
+  RETURN_NOT_OK(partition_schema_.ToPB(*schema_, pb.mutable_partition_schema()));
   pb.set_table_name(table_name_);
 
   for (const shared_ptr<RowSetMetadata>& meta : rowsets) {
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index dee2afe..c9ff0d2 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -25,7 +25,6 @@
 #include <utility>
 
 #include <boost/optional/optional.hpp>
-#include <boost/type_traits/decay.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/port.h>
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 555bd47..86a8aa0 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -49,6 +49,7 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol-test-util.h"
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 4d1c3db..56a63c9 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -44,6 +44,7 @@
 #include "kudu/common/iterator_stats.h"
 #include "kudu/common/key_range.h"
 #include "kudu/common/partition.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/rowblock_memory.h"
 #include "kudu/common/scan_spec.h"
@@ -2188,7 +2189,8 @@ void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req,
     if (req->need_schema_info()) {
       CHECK_OK(SchemaToPB(replica->tablet_metadata()->schema(),
                           status->mutable_schema()));
-      replica->tablet_metadata()->partition_schema().ToPB(status->mutable_partition_schema());
+      CHECK_OK(replica->tablet_metadata()->partition_schema().ToPB(
+          replica->tablet_metadata()->schema(), status->mutable_partition_schema()));
     }
   }
   context->RespondSuccess();
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 9721bf8..a603975 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -20,6 +20,7 @@ package kudu.tserver;
 option java_package = "org.apache.kudu.tserver";
 
 import "kudu/common/common.proto";
+import "kudu/common/row_operations.proto";
 import "kudu/common/wire_protocol.proto";
 import "kudu/security/token.proto";
 import "kudu/tablet/tablet.proto";

[kudu] 01/02: KUDU-2612: acquire and release partition lock

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 8093f95e6b8eb496043562f69df3a66e078ad731
Author: hahao <ha...@apache.org>
AuthorDate: Sun Mar 7 23:59:59 2021 -0800

    KUDU-2612: acquire and release partition lock
    
    This patch plumbs partition locks into write ops for transactional and
    non-transactional operations. Upon attempting to write, we try to
    acquire the partition lock in the WriteOp prepare phase, and upon
    successfully applying the op, we transfer ownership of the lock to the
    Txn that the write was a part of (or just release the partition lock if
    the write was non-transactional). Txns release the lock when
    FINALIZE_COMMIT or ABORT_TXN is applied.
    
    We take the partition lock for non-transactional write ops as well to
    ensure we don’t duplicate keys (e.g. if a transaction inserts key=1 and
    then a non-transactional write inserts key=1 before the transaction
    commits). If the partition lock cannot be acquired, the write op is
    aborted or retried, based on the wait-die mechanics already in the lock
    manager.
    
    A flag is also introduced to disable this locking for tests that
    currently expect support for concurrent transactions.
    
    Change-Id: If26733cae16810f3b3afd1fd05dcb984e6366939
    Reviewed-on: http://gerrit.cloudera.org:8080/17159
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/client/batcher.cc                         |  12 +-
 src/kudu/integration-tests/fuzz-itest.cc           |  10 +
 src/kudu/integration-tests/txn_commit-itest.cc     |  13 ++
 .../integration-tests/txn_participant-itest.cc     |  62 ++++++
 src/kudu/integration-tests/txn_write_ops-itest.cc  | 216 +++++++++++++++++++++
 src/kudu/tablet/lock_manager.h                     |   5 +
 src/kudu/tablet/ops/participant_op.cc              |  10 +
 src/kudu/tablet/ops/write_op.cc                    |  62 +++++-
 src/kudu/tablet/ops/write_op.h                     |  20 +-
 src/kudu/tablet/tablet.cc                          |  10 +-
 src/kudu/tablet/tablet.h                           |  14 +-
 src/kudu/tablet/tablet_bootstrap.cc                |  15 +-
 src/kudu/tablet/txn_participant-test.cc            | 126 +++++++++++-
 src/kudu/tablet/txn_participant.cc                 |  24 ++-
 src/kudu/tablet/txn_participant.h                  |  15 ++
 src/kudu/transactions/participant_rpc.cc           |   2 +
 16 files changed, 600 insertions(+), 16 deletions(-)

diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index dc26aed..f48a5c8 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -470,7 +470,9 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
     }
   }
 
-  if (result.status.IsServiceUnavailable()) {
+  if (result.status.IsServiceUnavailable() ||
+      (resp_.has_error() &&
+       resp_.error().code() == tserver::TabletServerErrorPB::TXN_LOCKED_RETRY_OP)) {
     result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
     return result;
   }
@@ -507,7 +509,8 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
   }
 
   if (resp_.has_error() &&
-      resp_.error().code() == tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE) {
+      (resp_.error().code() == tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE ||
+       resp_.error().code() == tserver::TabletServerErrorPB::TXN_LOCKED_ABORT)) {
     result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
     return result;
   }
@@ -526,8 +529,9 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
     //                becomes a real issue when handling responses to write
     //                operations in the context of multi-row transactions.
     //                For example, Status::IllegalState() originated from
-    //                TabletServerErrorPB::TXN_ILLEGAL_STATE responses are
-    //                needlessly retried.
+    //                TabletServerErrorPB::TXN_ILLEGAL_STATE response and
+    //                Status::Abort() originated from TabletServerErrorPB::TXN_LOCKED_ABORT
+    //                response are needlessly retried.
     result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
     return result;
   }
diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index 9a5b0c1..579c660 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -78,6 +78,7 @@
 
 DEFINE_int32(keyspace_size, 5,  "number of distinct primary keys to test with");
 DEFINE_int32(max_open_txns, 5,  "maximum number of open transactions to test with");
+DECLARE_bool(enable_txn_partition_lock);
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(scanner_allow_snapshot_scans_with_logical_timestamps);
 DECLARE_bool(tserver_txn_write_op_handling_enabled);
@@ -1495,6 +1496,9 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
 // The logs of this test are designed to easily be copy-pasted and create
 // more specific test cases like TestFuzz<N> below.
 TEST_F(FuzzTest, TestRandomFuzzPksOnly) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   CreateTabletAndStartClusterWithSchema(Schema({ColumnSchema("key", INT32)}, 1));
   SeedRandom();
   vector<TestOp> test_ops;
@@ -1506,6 +1510,9 @@ TEST_F(FuzzTest, TestRandomFuzzPksOnly) {
 // The logs of this test are designed to easily be copy-pasted and create
 // more specific test cases like TestFuzz<N> below.
 TEST_F(FuzzTest, TestRandomFuzz) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
   SeedRandom();
   vector<TestOp> test_ops;
@@ -1517,6 +1524,9 @@ TEST_F(FuzzTest, TestRandomFuzz) {
 // This results in very large batches which are likely to span multiple delta blocks
 // when flushed.
 TEST_F(FuzzTest, TestRandomFuzzHugeBatches) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
   SeedRandom();
   vector<TestOp> test_ops;
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc b/src/kudu/integration-tests/txn_commit-itest.cc
index 4ddc4c0..de7d6fc 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -67,6 +67,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_bool(enable_txn_partition_lock);
 DECLARE_bool(txn_manager_enabled);
 DECLARE_bool(txn_manager_lazily_initialized);
 DECLARE_bool(txn_schedule_background_tasks);
@@ -684,6 +685,9 @@ TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) {
 
 // Try concurrently beginning to commit a bunch of different transactions.
 TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   constexpr const int kNumTxns = 4;
   vector<shared_ptr<KuduTransaction>> txns(kNumTxns);
   int row_start = initial_row_count_;
@@ -724,6 +728,9 @@ TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
 }
 
 TEST_F(TxnCommitITest, TestConcurrentAbortsAndCommits) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   constexpr const int kNumTxns = 10;
   vector<shared_ptr<KuduTransaction>> txns(kNumTxns);
   int row_start = initial_row_count_;
@@ -940,6 +947,9 @@ TEST_F(TwoNodeTxnCommitITest, TestCommitWhenParticipantsAreDown) {
 // Test that when we start up, pending commits will start background tasks to
 // finalize the commit or abort.
 TEST_F(TwoNodeTxnCommitITest, TestStartTasksDuringStartup) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   shared_ptr<KuduTransaction> committed_txn;
   {
     shared_ptr<KuduSession> txn_session;
@@ -1038,6 +1048,9 @@ class ThreeNodeTxnCommitITest : public TxnCommitITest {
 };
 
 TEST_F(ThreeNodeTxnCommitITest, TestCommitTasksReloadOnLeadershipChange) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   FLAGS_txn_schedule_background_tasks = false;
   shared_ptr<KuduTransaction> committed_txn;
   shared_ptr<KuduTransaction> aborted_txn;
diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc
index 241b5e9..46f6bc9 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -72,6 +72,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_bool(enable_txn_partition_lock);
 DECLARE_bool(raft_enable_pre_election);
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_int32(consensus_inject_latency_ms_in_notifications);
@@ -830,6 +831,32 @@ TEST_F(TxnParticipantITest, TestProxyTabletNotFound) {
   }
 }
 
+// Test that we can start multiple transactions on the same participant.
+TEST_F(TxnParticipantITest, TestTxnSystemClientBeginTxnDoesntLock) {
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kFirstTxn = 0;
+  constexpr const int kSecondTxn = 1;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  auto* leader_replica = replicas[kLeaderIdx];
+  const auto tablet_id = leader_replica->tablet_id();
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+
+  // Start a transaction and make sure it results in the expected state
+  // server-side.
+  unique_ptr<TxnSystemClient> txn_client;
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client));
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kFirstTxn, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout));
+  NO_FATALS(CheckReplicasMatchTxns(replicas, { { kFirstTxn, kOpen, -1 } }));
+
+  // Begin another transaction with a lower txn ID. This is allowed, since
+  // partition locks are only taken once we write.
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kSecondTxn, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout));
+  NO_FATALS(CheckReplicasMatchTxns(replicas,
+        { { kFirstTxn, kOpen, -1 }, { kSecondTxn, kOpen, -1 } }));
+}
+
 TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
@@ -899,6 +926,9 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) {
 }
 
 TEST_F(TxnParticipantITest, TestTxnSystemClientAbortSequence) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnOne = 0;
   constexpr const int kTxnTwo = 1;
@@ -976,6 +1006,38 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientErrorWhenNotBegun) {
   NO_FATALS(CheckReplicasMatchTxns(replicas, { { 2, kAborted, -1 } }));
 }
 
+TEST_F(TxnParticipantITest, TestTxnSystemClientRepeatCalls) {
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kTxnOne = 1;
+  constexpr const int kTxnTwo = 2;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  auto* leader_replica = replicas[kLeaderIdx];
+  const auto tablet_id = leader_replica->tablet_id();
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+  unique_ptr<TxnSystemClient> txn_client;
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client));
+  // Repeat each op twice. There should be no issues here since each op is
+  // idempotent. There should also be no issues with the partition lock.
+  for (const auto& type : kCommitSequence) {
+    ASSERT_OK(txn_client->ParticipateInTransaction(
+        tablet_id, MakeParticipantOp(kTxnOne, type, kDummyCommitTimestamp),
+        kDefaultTimeout));
+    ASSERT_OK(txn_client->ParticipateInTransaction(
+        tablet_id, MakeParticipantOp(kTxnOne, type, kDummyCommitTimestamp),
+        kDefaultTimeout));
+  }
+  for (const auto& type : kAbortSequence) {
+    ASSERT_OK(txn_client->ParticipateInTransaction(
+        tablet_id, MakeParticipantOp(kTxnTwo, type, kDummyCommitTimestamp),
+        kDefaultTimeout));
+    ASSERT_OK(txn_client->ParticipateInTransaction(
+        tablet_id, MakeParticipantOp(kTxnTwo, type, kDummyCommitTimestamp),
+        kDefaultTimeout));
+  }
+  NO_FATALS(CheckReplicasMatchTxns(
+      replicas, { { kTxnOne, kCommitted, kDummyCommitTimestamp }, { kTxnTwo, kAborted, -1 } }));
+}
+
 TEST_F(TxnParticipantITest, TestTxnSystemClientTimeoutWhenNoMajority) {
   constexpr const int kLeaderIdx = 0;
   constexpr const int kTxnId = 0;
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index 2a37aa4..c597acb 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -22,12 +22,14 @@
 #include <cstdlib>
 #include <deque>
 #include <functional>
+#include <initializer_list>
 #include <iterator>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <numeric>
 #include <ostream>
+#include <random>
 #include <set>
 #include <string>
 #include <thread>
@@ -293,6 +295,87 @@ class TxnWriteOpsITest : public ExternalMiniClusterITestBase {
   string tablet_uuid_;
 };
 
+// Test that our deadlock prevention mechanisms work by writing across
+// different tablets concurrently from multiple transactions.
+// TODO(awong): it'd be much more convenient to take control of aborting the
+// transactions ourselves, rather than relying on the application user.
+TEST_F(TxnWriteOpsITest, TestClientSideDeadlockPrevention) {
+  constexpr const int kNumTxns = 8;
+  const vector<string> master_flags = {
+    "--txn_manager_enabled=true",
+
+    // Scenarios based on this test fixture assume the txn status table
+    // is created at start, not on first transaction-related operation.
+    "--txn_manager_lazily_initialized=false",
+  };
+  NO_FATALS(StartCluster({}, master_flags, kNumTabletServers));
+  vector<string> tablets_uuids;
+  NO_FATALS(Prepare(&tablets_uuids));
+  vector<thread> threads;
+  threads.reserve(kNumTxns);
+  vector<int> random_keys(kNumTxns * 2);
+  std::iota(random_keys.begin(), random_keys.end(), 1);
+  std::mt19937 gen(SeedRandom());
+  std::shuffle(random_keys.begin(), random_keys.end(), gen);
+  for (int i = 0; i < kNumTxns; i++) {
+    threads.emplace_back([&, i] {
+      bool succeeded = false;
+      while (!succeeded) {
+        shared_ptr<KuduTransaction> txn;
+        ASSERT_OK(client_->NewTransaction(&txn));
+        shared_ptr<KuduSession> session;
+        ASSERT_OK(txn->CreateSession(&session));
+        ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+        string txn_str;
+        ASSERT_OK(txn->Serialize(&txn_str));
+        TxnTokenPB token;
+        ASSERT_TRUE(token.ParseFromString(txn_str));
+        bool needs_retry = false;
+        for (const auto key_idx : { 2 * i, 2 * i + 1 }) {
+          const auto& row_key = random_keys[key_idx];
+          unique_ptr<KuduInsert> insert(BuildInsert(table_.get(), row_key));
+          Status s = session->Apply(insert.release());
+          LOG(INFO) << Substitute("Txn $0 wrote row $1: $2",
+                                  token.txn_id(), row_key, s.ToString());
+          // If the write op failed because of a locking error, roll the
+          // transaction back and retry the transaction after waiting a bit.
+          if (!s.ok()) {
+            vector<KuduError*> errors;
+            ElementDeleter d(&errors);
+            bool overflow;
+            session->GetPendingErrors(&errors, &overflow);
+            ASSERT_EQ(1, errors.size());
+            const auto& error = errors[0]->status();
+            LOG(INFO) << Substitute("Txn $0 wrote row $1: $2",
+                                    token.txn_id(), row_key, error.ToString());
+            // While the below delay between retries should help prevent
+            // deadlocks, it's possible that "waiting" write ops (i.e. "wait"
+            // in wait-die, that get retried) will still time out, after
+            // contending a bit with other ops.
+            ASSERT_TRUE(error.IsAborted() || error.IsTimedOut()) << error.ToString();
+            ASSERT_OK(txn->Rollback());
+            needs_retry = true;
+
+            // Wait a bit before retrying the entire transaction to allow for
+            // the current lock holder to complete.
+            SleepFor(MonoDelta::FromSeconds(5));
+            break;
+          }
+        }
+        if (!needs_retry) {
+          succeeded = true;
+          ASSERT_OK(txn->Commit(/*wait*/true));
+        }
+      }
+    });
+  }
+  for (auto& t : threads) { t.join(); }
+  size_t count;
+  ASSERT_OK(CountRows(table_.get(), &count));
+  ASSERT_EQ(kNumTxns * 2, count);
+}
+
 // Send multiple one-row write operations to a tablet server in the context of a
 // multi-row transaction, and commit the transaction. This scenario verifies
 // that tablet servers are able to accept high number of write requests
@@ -343,6 +426,10 @@ TEST_F(TxnWriteOpsITest, FrequentElections) {
     // Custom settings for heartbeat interval helps to complete Raft elections
     // rounds faster than with the default settings.
     Substitute("--heartbeat_interval_ms=$0", hb_interval_ms_),
+
+    // Disable the partition lock as there are concurrent transactions.
+    // TODO(awong): update this when implementing finer grained locking.
+    "--enable_txn_partition_lock=false"
   };
   const vector<string> master_flags = {
     // Enable TxnManager in Kudu masters.
@@ -458,6 +545,10 @@ TEST_F(TxnWriteOpsITest, WriteOpPerf) {
   const vector<string> ts_flags = {
     Substitute("--tablet_max_pending_txn_write_ops=$0",
                FLAGS_max_pending_txn_write_ops),
+
+    // Disable the partition lock as there are concurrent transactions.
+    // TODO(awong): update this when implementing finer grained locking.
+    "--enable_txn_partition_lock=false"
   };
   const vector<string> master_flags = {
     // Enable TxnManager in Kudu masters.
@@ -972,6 +1063,131 @@ TEST_F(TxnOpDispatcherITest, LifecycleBasic) {
   }
 }
 
+TEST_F(TxnOpDispatcherITest, BeginTxnLockAbort) {
+  NO_FATALS(Prepare(1));
+
+  // Next value for the primary key column in the test table.
+  int64_t key = 0;
+  vector<scoped_refptr<TabletReplica>> replicas = GetAllReplicas();
+  ASSERT_EQ(kNumPartitions, replicas.size());
+  shared_ptr<KuduTransaction> first_txn;
+  shared_ptr<KuduTransaction> second_txn;
+
+  // Start a single transaction and perform some writes with it.
+  {
+    ASSERT_OK(client_->NewTransaction(&first_txn));
+
+    // There should be no TxnOpDispatchers yet because not a single write
+    // operations has been sent to tablet servers yet.
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+    // Insert a single row.
+    ASSERT_OK(InsertRows(first_txn.get(), 1, &key));
+
+    // Only one tablet replica should get the txn write request and register
+    // TxnOpDispatcher for the transaction.
+    ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+
+    // Write some more rows ensuring all hash buckets of the table's partition
+    // will get at least one element.
+    ASSERT_OK(InsertRows(first_txn.get(), 5, &key));
+    ASSERT_EQ(kNumPartitions, GetTxnOpDispatchersTotalCount());
+
+    // Non transactional operations should fail as the partition lock
+    // is held by the transaction at the moment.
+    shared_ptr<KuduSession> session;
+    Status s = InsertRows(nullptr /* txn */, 1, &key, &session);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    auto row_status = GetSingleRowError(session.get());
+    ASSERT_TRUE(row_status.IsAborted()) << row_status.ToString();
+    ASSERT_STR_CONTAINS(row_status.ToString(),
+                        "Write op should be aborted");
+  }
+
+  // Start a new transaction.
+  {
+    ASSERT_OK(client_->NewTransaction(&second_txn));
+    ASSERT_EQ(kNumPartitions, GetTxnOpDispatchersTotalCount());
+
+    // Operations of the transaction should fail as the partition
+    // lock is held by the transaction at the moment.
+    shared_ptr<KuduSession> session;
+    Status s = InsertRows(second_txn.get(), 1, &key, &session);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    auto row_status = GetSingleRowError(session.get());
+    ASSERT_TRUE(row_status.IsAborted()) << row_status.ToString();
+    ASSERT_STR_CONTAINS(row_status.ToString(), "should be aborted");
+
+    // We should have an extra dispatcher for the new transactional write.
+    ASSERT_EQ(1 + kNumPartitions, GetTxnOpDispatchersTotalCount());
+  }
+  {
+    // Now, commit the first transaction.
+    ASSERT_OK(first_txn->Commit());
+
+    // All dispatchers should be unregistered once the transaction is committed.
+    ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+
+    // Writes to the second transaction should now succeed.
+    ASSERT_OK(InsertRows(second_txn.get(), 1, &key));
+    ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+
+    ASSERT_OK(second_txn->Commit());
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+  }
+}
+
+TEST_F(TxnOpDispatcherITest, BeginTxnLockRetry) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  NO_FATALS(Prepare(1));
+
+  // Next value for the primary key column in the test table.
+  int64_t key = 0;
+
+  vector<scoped_refptr<TabletReplica>> replicas = GetAllReplicas();
+  ASSERT_EQ(kNumPartitions, replicas.size());
+  shared_ptr<KuduTransaction> first_txn;
+  shared_ptr<KuduTransaction> second_txn;
+
+  // Start a single transaction.
+  {
+    ASSERT_OK(client_->NewTransaction(&second_txn));
+
+    // There should be no TxnOpDispatchers yet because not a single write
+    // operations has been sent to tablet servers yet.
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+  }
+
+  // Start another single transaction and perform some writes with it.
+  {
+    ASSERT_OK(client_->NewTransaction(&first_txn));
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+
+    // Write some more rows ensuring all hash buckets of the table's partition
+    // will get at least one element.
+    ASSERT_OK(InsertRows(first_txn.get(), 5, &key));
+    ASSERT_EQ(kNumPartitions, GetTxnOpDispatchersTotalCount());
+  }
+
+  {
+    // Operations of the second transaction should fail as the partition
+    // lock is held by the first transaction at the moment.
+    shared_ptr<KuduSession> session;
+    Status s = InsertRows(second_txn.get(), 1, &key, &session);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    auto row_status = GetSingleRowError(session.get());
+    ASSERT_TRUE(row_status.IsTimedOut()) << row_status.ToString();
+    ASSERT_STR_CONTAINS(row_status.ToString(), "passed its deadline");
+
+    // We should have an extra dispatcher for the new transactional write.
+    ASSERT_EQ(1 + kNumPartitions, GetTxnOpDispatchersTotalCount());
+
+    ASSERT_OK(first_txn->Commit());
+    // We should still have an op dispatcher for the second transaction.
+    ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+  }
+}
+
 // A scenario to verify TxnOpDispatcher lifecycle when there is an error
 // while trying to register a tablet as a participant in a transaction.
 TEST_F(TxnOpDispatcherITest, ErrorInParticipantRegistration) {
diff --git a/src/kudu/tablet/lock_manager.h b/src/kudu/tablet/lock_manager.h
index e7e6a75..4069939 100644
--- a/src/kudu/tablet/lock_manager.h
+++ b/src/kudu/tablet/lock_manager.h
@@ -218,6 +218,11 @@ class ScopedPartitionLock {
   // Disable the copy constructor.
   ScopedPartitionLock(const ScopedPartitionLock&) = delete;
 
+  // Returns true if this points at the same lock state as 'other'.
+  bool HasSameState(const ScopedPartitionLock& other) {
+    return lock_state_ == other.lock_state_;
+  }
+
   // Check whether the partition lock is acquired by the transaction.
   // If false, set the tablet server error code accordingly to abort
   // or retry the transaction. Otherwise, no error code is set.
diff --git a/src/kudu/tablet/ops/participant_op.cc b/src/kudu/tablet/ops/participant_op.cc
index 9d79a8f..a5ed7eb 100644
--- a/src/kudu/tablet/ops/participant_op.cc
+++ b/src/kudu/tablet/ops/participant_op.cc
@@ -20,6 +20,7 @@
 #include <memory>
 #include <ostream>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <google/protobuf/arena.h>
 
@@ -29,6 +30,7 @@
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/consensus/time_manager.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/rpc_header.pb.h"
@@ -39,10 +41,16 @@
 #include "kudu/tablet/txn_participant.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/debug/trace_event.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/trace.h"
 
+DEFINE_bool(enable_txn_partition_lock, true,
+            "Whether or not to enable partition lock for transactions");
+TAG_FLAG(enable_txn_partition_lock, unsafe);
+TAG_FLAG(enable_txn_partition_lock, hidden);
+
 using kudu::consensus::CommitMsg;
 using kudu::consensus::ReplicateMsg;
 using kudu::consensus::OperationType;
@@ -231,6 +239,7 @@ Status ParticipantOpState::PerformOp(const consensus::OpId& op_id, Tablet* table
       if (txn_->commit_op()) {
         txn_->commit_op()->FinishApplying();
       }
+      txn_->ReleasePartitionLock();
       break;
     }
     case ParticipantOpPB::ABORT_TXN: {
@@ -240,6 +249,7 @@ Status ParticipantOpState::PerformOp(const consensus::OpId& op_id, Tablet* table
       if (txn_->commit_op()) {
         txn_->commit_op()->Abort();
       }
+      txn_->ReleasePartitionLock();
       break;
     }
     case ParticipantOpPB::UNKNOWN: {
diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc
index 7f8bc92..83097f9 100644
--- a/src/kudu/tablet/ops/write_op.cc
+++ b/src/kudu/tablet/ops/write_op.cc
@@ -36,6 +36,7 @@
 #include "kudu/common/row_operations.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
+#include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/opid.pb.h"
@@ -70,6 +71,8 @@ DEFINE_int32(tablet_inject_latency_on_apply_write_op_ms, 0,
 TAG_FLAG(tablet_inject_latency_on_apply_write_op_ms, unsafe);
 TAG_FLAG(tablet_inject_latency_on_apply_write_op_ms, runtime);
 
+DECLARE_bool(enable_txn_partition_lock);
+
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -211,7 +214,14 @@ Status WriteOp::Prepare() {
     }
   }
 
-  // Now acquire row locks and prepare everything for apply
+  // Now first acquire partition lock and then row locks, and prepare
+  // everything for apply. For followers, we wait until the partition lock is
+  // held, since we know the op will not be replicated if the leader cannot
+  // take the partition lock.
+  if (PREDICT_TRUE(FLAGS_enable_txn_partition_lock)) {
+    RETURN_NOT_OK(tablet->AcquirePartitionLock(state(),
+        type() == consensus::LEADER ? LockManager::TRY_LOCK : LockManager::WAIT_FOR_LOCK));
+  }
   RETURN_NOT_OK(tablet->AcquireRowLocks(state()));
 
   TRACE("PREPARE: Finished.");
@@ -427,8 +437,13 @@ void WriteOpState::StartApplying() {
 void WriteOpState::FinishApplyingOrAbort(Op::OpResult result) {
   ReleaseMvccTxn(result);
 
-  TRACE("Releasing row and schema locks");
+  TRACE("Releasing partition, row and schema locks");
   ReleaseRowLocks();
+  if (result == Op::APPLIED) {
+    // NOTE: if the op was not successful, the lock will be released when this
+    // state is destructed.
+    TransferOrReleasePartitionLock();
+  }
   ReleaseSchemaLock();
 
   // After committing, if there is an RPC going on, the driver will respond to it.
@@ -531,6 +546,49 @@ void WriteOpState::ReleaseRowLocks() {
   rows_lock_.Release();
 }
 
+Status WriteOpState::AcquirePartitionLock(
+    LockManager* lock_manager,
+    LockManager::LockWaitMode wait_mode) {
+  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+  DCHECK(!partition_lock_.IsAcquired(&code));
+  TxnId txn_id;
+  if (request()->has_txn_id()) {
+    txn_id = request()->txn_id();
+  }
+  TRACE("Acquiring the partition lock for write op");
+  partition_lock_ = ScopedPartitionLock(lock_manager, txn_id, wait_mode);
+  bool acquired = partition_lock_.IsAcquired(&code);
+  if (!acquired) {
+    Status s;
+    if (code == TabletServerErrorPB::TXN_LOCKED_ABORT) {
+      s = Status::Aborted("Write op should be aborted since it tries to acquire the "
+                          "partition lock that is held by another transaction that "
+                          "has lower txn ID");
+    } else if (code == TabletServerErrorPB::TXN_LOCKED_RETRY_OP) {
+      s = Status::ServiceUnavailable("Write op should retry since it tries to acquire "
+                                     "the partition lock that is held by another transaction "
+                                     "that has higher txn ID");
+    } else {
+      LOG(DFATAL) << "unexpected error code " << code;
+    }
+    CHECK(!s.ok()) << s.ToString();
+    completion_callback()->set_error(s, code);
+    return s;
+  }
+  DCHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+  TRACE("Partition lock acquired for write op");
+  return Status::OK();
+}
+
+void WriteOpState::TransferOrReleasePartitionLock() {
+  if (txn_) {
+    txn_->AdoptPartitionLock(std::move(partition_lock_));
+  } else {
+    // If this isn't a transactional write, just release the partition lock.
+    partition_lock_.Release();
+  }
+}
+
 void WriteOpState::ReleaseTxnLock() {
   shared_lock<rw_semaphore> temp;
   txn_lock_.swap(temp);
diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h
index b1b1c9c..c8621fa 100644
--- a/src/kudu/tablet/ops/write_op.h
+++ b/src/kudu/tablet/ops/write_op.h
@@ -176,6 +176,13 @@ class WriteOpState : public OpState {
   // Acquire row locks for all of the rows in this Write.
   void AcquireRowLocks(LockManager* lock_manager);
 
+  // Acquire the partition lock for writes of the transaction associated with
+  // this request. If 'wait_mode' is 'WAIT_FOR_LOCK', then wait until the lock is
+  // acquired. Otherwise, if lock cannot be acquired, return 'Aborted' error if
+  // the op should be aborted or 'ServiceUnavailable' if the op should be retried.
+  Status AcquirePartitionLock(LockManager* lock_manager,
+                              LockManager::LockWaitMode wait_mode);
+
   // Acquires the lock on the given transaction, setting 'txn_' and
   // 'txn_lock_', which must be freed upon finishing this op. Checks if the
   // transaction is available to be written to, returning an error if not.
@@ -245,6 +252,14 @@ class WriteOpState : public OpState {
 
   std::string ToString() const override;
 
+  // Releases the partition lock acquired by this op. Unlike the other
+  // unlocking methods that just release locks, this transfers the ownership of
+  // the partition lock to the Txn that this write is a part of.
+  //
+  // If this is write was not a part of a transaction, this is just releases
+  // the partition lock.
+  void TransferOrReleasePartitionLock();
+
  private:
   // Releases all the row locks acquired by this op.
   void ReleaseRowLocks();
@@ -274,9 +289,12 @@ class WriteOpState : public OpState {
   // Protected by op_state_lock_.
   std::vector<RowOp*> row_ops_;
 
-  // Holds the LockManager locks acquired for this operation.
+  // Holds the row locks acquired for this operation.
   ScopedRowLock rows_lock_;
 
+  // Holds the partition lock acquired for this operation.
+  ScopedPartitionLock partition_lock_;
+
   // Array of ProbeStats for each of the operations in 'row_ops_'.
   // Allocated from this op's arena during SetRowOps().
   ProbeStats* stats_array_ = nullptr;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 1b1a27c..86b8f7f 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -576,10 +576,15 @@ Status Tablet::AcquireRowLocks(WriteOpState* op_state) {
 
   op_state->AcquireRowLocks(&lock_manager_);
 
-  TRACE("Locks acquired");
+  TRACE("Row locks acquired");
   return Status::OK();
 }
 
+Status Tablet::AcquirePartitionLock(WriteOpState* op_state,
+                                    LockManager::LockWaitMode wait_mode) {
+  return op_state->AcquirePartitionLock(&lock_manager_, wait_mode);
+}
+
 Status Tablet::AcquireTxnLock(int64_t txn_id, WriteOpState* op_state) {
   auto txn = txn_participant_.GetTransaction(txn_id);
   if (!txn) {
@@ -1172,7 +1177,8 @@ Status Tablet::CheckHasNotBeenStopped(State* cur_state) const {
   return Status::OK();
 }
 
-void Tablet::BeginTransaction(Txn* txn, const OpId& op_id) {
+void Tablet::BeginTransaction(Txn* txn,
+                              const OpId& op_id) {
   unique_ptr<MinLogIndexAnchorer> anchor(new MinLogIndexAnchorer(log_anchor_registry_.get(),
         Substitute("BEGIN_TXN-$0-$1", txn->txn_id(), txn)));
   anchor->AnchorIfMinimum(op_id.index());
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index bce8157..cc156ca 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -158,10 +158,17 @@ class Tablet {
   Status DecodeWriteOperations(const Schema* client_schema,
                                WriteOpState* op_state);
 
-  // Acquire locks for each of the operations in the given op.
+  // Acquire locks for each of the operations in the given write op.
   // This also sets the row op's RowSetKeyProbe.
   Status AcquireRowLocks(WriteOpState* op_state);
 
+  // Acquire locks for the given write op. If 'must_acquire' is true,
+  // then wait until the lock is acquired. Otherwise, return
+  // 'TXN_LOCKED_ABORT' or 'TXN_LOCKED_RETRY_OP' error if lock
+  // cannot be acquired.
+  Status AcquirePartitionLock(WriteOpState* op_state,
+                              LockManager::LockWaitMode wait_mode);
+
   // Acquire a shared lock on the given transaction, to ensure the
   // transaction's state doesn't change while the given write is in flight.
   Status AcquireTxnLock(int64_t txn_id, WriteOpState* op_state);
@@ -793,14 +800,15 @@ class Tablet {
 
   MvccManager mvcc_;
 
+  LockManager lock_manager_;
+
   // Maintains the set of in-flight transactions, and any WAL anchors
   // associated with them.
   // NOTE: the participant may retain MVCC ops, so define it after the
   // MvccManager, to ensure those ops get destructed before the MvccManager.
+  // The same goes for locks and the LockManager.
   TxnParticipant txn_participant_;
 
-  LockManager lock_manager_;
-
   std::unique_ptr<CompactionPolicy> compaction_policy_;
 
   // Lock protecting the selection of rowsets for compaction.
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 6795b5a..fb2a4fc 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -65,6 +65,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/result_tracker.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/tablet/lock_manager.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/ops/alter_schema_op.h"
@@ -1629,7 +1630,19 @@ Status TabletBootstrap::PlayRowOperations(const IOContext* io_context,
                         Substitute("Could not decode row operations: $0",
                                    SecureDebugString(op_state->request()->row_operations())));
 
-  // Run AcquireRowLocks, Apply, etc!
+  // If the write is a part of a transaction that's currently open (i.e., it
+  // has an in-flight Txn associated with it), lock the Txn now and make sure
+  // it's open.
+  // NOTE: we shouldn't take this lock if we've persisted that the transaction
+  // has completed, since there is no corresponding in-flight Txn.
+  if (op_state->txn_id() && !ContainsKey(terminal_txn_ids_, *op_state->txn_id())) {
+    RETURN_NOT_OK_PREPEND(tablet_->AcquireTxnLock(*op_state->txn_id(), op_state),
+                          "Failed to acquire txn lock");;
+  }
+
+  // Acquire partition/row locks, Apply, etc!
+  RETURN_NOT_OK_PREPEND(tablet_->AcquirePartitionLock(op_state, LockManager::WAIT_FOR_LOCK),
+                        "Failed to acquire partition lock");
   RETURN_NOT_OK_PREPEND(tablet_->AcquireRowLocks(op_state),
                         "Failed to acquire row locks");
 
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index f21b698..cedcebd 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -70,6 +70,7 @@
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
 
 using kudu::consensus::CommitMsg;
 using kudu::consensus::ConsensusBootstrapInfo;
@@ -87,6 +88,7 @@ using std::vector;
 using strings::Substitute;
 
 DECLARE_bool(enable_maintenance_manager);
+DECLARE_bool(enable_txn_partition_lock);
 DECLARE_bool(log_preallocate_segments);
 DECLARE_bool(log_async_preallocate_segments);
 
@@ -358,6 +360,9 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
 
 // Test that we have no trouble operating on separate transactions.
 TEST_F(TxnParticipantTest, TestConcurrentTransactions) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   const int kNumTxns = 10;
   vector<thread> threads;
   Status statuses[kNumTxns];
@@ -503,10 +508,93 @@ TEST_F(TxnParticipantTest, TestAllOpsRegisterAnchors) {
   }));
 }
 
+TEST_F(TxnParticipantTest, TestTakePartitionLockOnRestart) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Get to write some rows.
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN,
+                                       kDummyCommitTimestamp));
+
+  // We should be able to at least start another transaction.
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN,
+                                       kDummyCommitTimestamp));
+  ASSERT_OK(Write(0, kTxnOne));
+  const auto check_other_txns_cant_write = [&] {
+    // We'll try writing a couple times to make sure the act of writing doesn't
+    // somehow permit further writes to the transaction.
+    for (int i = 0; i < 2; i++) {
+      Status s = Write(0);
+      ASSERT_TRUE(s.IsAborted()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "partition lock that is held by another");
+
+      s = Write(0, kTxnTwo);
+      ASSERT_TRUE(s.IsAborted()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "partition lock that is held by another");
+    }
+  };
+  NO_FATALS(check_other_txns_cant_write());
+
+  // We shouldn't be able to write even after restarting.
+  ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+  NO_FATALS(check_other_txns_cant_write());
+
+  // We should be able to write as a part of the transaction though.
+  ASSERT_OK(Write(1, kTxnOne));
+
+  // Once we begin committing, we still shouldn't be able to write, even after
+  // restarting.
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT,
+                                       kDummyCommitTimestamp));
+  NO_FATALS(check_other_txns_cant_write());
+  ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+  NO_FATALS(check_other_txns_cant_write());
+  // We also shouldn't be able to write as a part of the transaction, since
+  // it's not open for further writes.
+  Status s = Write(2, kTxnOne);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "not open: COMMIT_IN_PROGRESS");
+
+  ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+
+  s = Write(2, kTxnOne);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "not open: COMMIT_IN_PROGRESS");
+
+  s = Write(0, kTxnTwo);
+  ASSERT_TRUE(s.IsAborted()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "partition lock that is held by another");
+
+  // Once we finalize the commit, we should be able to write again.
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::FINALIZE_COMMIT,
+                                       kDummyCommitTimestamp));
+
+  // And we shouldn't be able to write to the same transaction once committed.
+  s = Write(2, kTxnOne);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "not open");
+
+  // We should be able to write to the other transaction now that the partition
+  // lock isn't held.
+  ASSERT_OK(Write(2, kTxnTwo));
+
+  ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+
+  s = Write(2, kTxnOne);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "not open");
+
+  // We should be able to write to the other transaction now that the partition
+  // lock isn't held.
+  ASSERT_OK(Write(3, kTxnTwo));
+}
+
 // Test that participant ops result in tablet metadata updates that can survive
 // restarts, and that the appropriate anchors are in place as we progress
 // through a transaction's life cycle.
 TEST_F(TxnParticipantTest, TestTxnMetadataSurvivesRestart) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   // First, do a sanity check that there's nothing GCable.
   int64_t gcable_size;
   ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
@@ -741,6 +829,9 @@ TEST_P(MetadataFlushTxnParticipantTest, TestRebuildTxnMetadata) {
 
 // Test rebuilding transaction state, including writes, from WALs and metadata.
 TEST_P(MetadataFlushTxnParticipantTest, TestReplayTransactionalInserts) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   const bool should_flush = GetParam();
   constexpr const int64_t kAbortedTxnId = 2;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1));
@@ -815,6 +906,9 @@ INSTANTIATE_TEST_SUITE_P(ShouldFlushMetadata, MetadataFlushTxnParticipantTest,
 
 // Similar to the above test, but checking that in-flight ops anchor the WALs.
 TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ParticipantRequestPB req;
   ParticipantResponsePB resp;
   auto op_state = NewParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_TXN,
@@ -931,6 +1025,9 @@ TEST_F(TxnParticipantTest, TestUnsupportedOps) {
 // Test that rows inserted to transactional stores only show up when the
 // transactions complete.
 TEST_F(TxnParticipantTest, TestInsertToTransactionMRS) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(Write(0, kTxnOne));
@@ -964,6 +1061,9 @@ TEST_F(TxnParticipantTest, TestInsertToTransactionMRS) {
 // Test that rows inserted to transactional stores don't show up if the
 // transaction is aborted.
 TEST_F(TxnParticipantTest, TestDontReadAbortedInserts) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(Write(0, kTxnOne));
@@ -1018,6 +1118,9 @@ TEST_F(TxnParticipantTest, TestUpdateAfterAborting) {
 // Test that we can update rows that were inserted and committed as a part of a
 // transaction.
 TEST_F(TxnParticipantTest, TestUpdateCommittedTransactionMRS) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(Write(0, kTxnId));
 
@@ -1026,7 +1129,7 @@ TEST_F(TxnParticipantTest, TestUpdateCommittedTransactionMRS) {
   ASSERT_OK(IterateToStrings(&rows));
   ASSERT_EQ(0, rows.size());
   Status s = Delete(0);
-  ASSERT_TRUE(s.IsNotFound());
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, -1));
 
   // We still haven't finished committing, so we should see no rows.
@@ -1059,6 +1162,9 @@ TEST_F(TxnParticipantTest, TestUpdateCommittedTransactionMRS) {
 // Test that we can flush multiple MRSs, and that when restarting, ops are
 // replayed (or not) as appropriate.
 TEST_F(TxnParticipantTest, TestFlushMultipleMRSs) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   const int kNumTxns = 3;
   const int kNumRowsPerTxn = 100;
   vector<string> rows;
@@ -1137,6 +1243,9 @@ TEST_F(TxnParticipantTest, TestInsertIgnoreInTransactionMRS) {
 
 // Test that INSERT_IGNORE ops work when the row exists in the main MRS.
 TEST_F(TxnParticipantTest, TestInsertIgnoreInMainMRS) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, -1));
   // Insert into the main MRS, and then INSERT_IGNORE as a part of a
   // transaction.
@@ -1160,6 +1269,9 @@ TEST_F(TxnParticipantTest, TestInsertIgnoreInMainMRS) {
 
 // Test that the live row count accounts for transactional MRSs.
 TEST_F(TxnParticipantTest, TestLiveRowCountAccountsForTransactionalMRSs) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(Write(0));
@@ -1183,6 +1295,9 @@ TEST_F(TxnParticipantTest, TestLiveRowCountAccountsForTransactionalMRSs) {
 
 // Test that the MRS size metrics account for transactional MRSs.
 TEST_F(TxnParticipantTest, TestSizeAccountsForTransactionalMRS) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_TRUE(tablet_replica_->tablet()->MemRowSetEmpty());
@@ -1216,6 +1331,9 @@ TEST_F(TxnParticipantTest, TestSizeAccountsForTransactionalMRS) {
 
 // Test that the MRS anchored WALs metric accounts for transactional MRSs.
 TEST_F(TxnParticipantTest, TestWALsAnchoredAccountsForTransactionalMRS) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   const auto mrs_wal_size = [&] {
     map<int64_t, int64_t> replay_size_map;
     CHECK_OK(tablet_replica_->GetReplaySizeMap(&replay_size_map));
@@ -1288,6 +1406,9 @@ TEST_F(TxnParticipantTest, TestRacingCommitAndWrite) {
 
 // Test that the write metrics account for transactional rowsets.
 TEST_F(TxnParticipantTest, TestMRSLookupsMetric) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN, -1));
   ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN, -1));
 
@@ -1340,6 +1461,9 @@ class TxnParticipantConcurrencyTest : public TxnParticipantTest,
 
 // Test inserting into multiple transactions from multiple threads.
 TEST_P(TxnParticipantConcurrencyTest, TestConcurrentDisjointInsertsTxn) {
+  // Disable the partition lock as there are concurrent transactions.
+  // TODO(awong): update this when implementing finer grained locking.
+  FLAGS_enable_txn_partition_lock = false;
   const auto& params = GetParam();
   const auto& num_txns = params.num_txns;
   const int kNumThreads = 10;
diff --git a/src/kudu/tablet/txn_participant.cc b/src/kudu/tablet/txn_participant.cc
index 5cbef6f..4a63d53 100644
--- a/src/kudu/tablet/txn_participant.cc
+++ b/src/kudu/tablet/txn_participant.cc
@@ -25,16 +25,18 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
 
 #include "kudu/common/timestamp.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/txn_metadata.h"
 
+DECLARE_bool(enable_txn_partition_lock);
+
 using kudu::log::LogAnchorRegistry;
+using kudu::tserver::TabletServerErrorPB;
 using std::vector;
-using strings::Substitute;
 
 namespace kudu {
 namespace tablet {
@@ -72,6 +74,24 @@ void Txn::AcquireReadLock(shared_lock<rw_semaphore>* txn_lock) {
   *txn_lock = std::move(l);
 }
 
+void Txn::AdoptPartitionLock(ScopedPartitionLock partition_lock) {
+  if (PREDICT_TRUE(FLAGS_enable_txn_partition_lock)) {
+    TabletServerErrorPB::Code code = tserver::TabletServerErrorPB::UNKNOWN_ERROR;
+#ifndef NDEBUG
+    CHECK(partition_lock.IsAcquired(&code)) << code;
+    if (partition_lock_.IsAcquired(&code)) {
+      // Make sure if we're adopting a lock while one is already held, that
+      // they're the same lock.
+      CHECK(partition_lock.HasSameState(partition_lock_));
+    }
+#endif
+    // Release the current lock and acquire the new one.
+    partition_lock_.Release();
+    partition_lock_ = std::move(partition_lock);
+    DCHECK(partition_lock_.IsAcquired(&code)) << code;
+  }
+}
+
 void TxnParticipant::CreateOpenTransaction(int64_t txn_id,
                                            LogAnchorRegistry* log_anchor_registry) {
   std::lock_guard<simple_spinlock> l(lock_);
diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h
index 8539372..92b7b00 100644
--- a/src/kudu/tablet/txn_participant.h
+++ b/src/kudu/tablet/txn_participant.h
@@ -34,6 +34,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/lock_manager.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tserver/tserver.pb.h"
@@ -108,6 +109,13 @@ class Txn : public RefCountedThreadSafe<Txn> {
   void AcquireWriteLock(std::unique_lock<rw_semaphore>* txn_lock);
   void AcquireReadLock(shared_lock<rw_semaphore>* txn_lock);
 
+  // Adopts the input partition lock, maintaining it until the transaction is
+  // complete (aborted or finalized). Rather than maintaining multiple
+  // ScopedPartitionLocks, this will release any currently-held lock and
+  // acquire the new one. It is thus expected that repeat callers are taking
+  // the same lock.
+  void AdoptPartitionLock(ScopedPartitionLock partition_lock);
+
   // Validates that the transaction is in the appropriate state to perform the
   // given operation. Should be called while holding the state lock before
   // replicating a participant op.
@@ -256,6 +264,10 @@ class Txn : public RefCountedThreadSafe<Txn> {
     return commit_op_.get();
   }
 
+  void ReleasePartitionLock() {
+    partition_lock_.Release();
+  }
+
  private:
   friend class RefCountedThreadSafe<Txn>;
 
@@ -345,6 +357,9 @@ class Txn : public RefCountedThreadSafe<Txn> {
   // repeatable.
   std::unique_ptr<ScopedOp> commit_op_;
 
+  // Holds the partition lock acquired for this transaction.
+  ScopedPartitionLock partition_lock_;
+
   DISALLOW_COPY_AND_ASSIGN(Txn);
 };
 
diff --git a/src/kudu/transactions/participant_rpc.cc b/src/kudu/transactions/participant_rpc.cc
index 36ece24..1cc76f6 100644
--- a/src/kudu/transactions/participant_rpc.cc
+++ b/src/kudu/transactions/participant_rpc.cc
@@ -171,12 +171,14 @@ RetriableRpcStatus ParticipantRpc::AnalyzeResponse(const Status& rpc_cb_status)
         return result;
       case TabletServerErrorPB::TABLET_NOT_RUNNING:
       case TabletServerErrorPB::THROTTLED:
+      case TabletServerErrorPB::TXN_LOCKED_RETRY_OP:
         result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
         return result;
       case TabletServerErrorPB::NOT_THE_LEADER:
         result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
         return result;
       case TabletServerErrorPB::TXN_ILLEGAL_STATE:
+      case TabletServerErrorPB::TXN_LOCKED_ABORT:
         result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
         return result;
       case TabletServerErrorPB::TXN_OP_ALREADY_APPLIED: