You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/04/14 19:01:52 UTC

[kudu] 02/02: KUDU-2671: Adds new field to PartitionSchema.

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 2dda869456659a36247eb89f5b9e5e3837e5f8a3
Author: Mahesh Reddy <mr...@cloudera.com>
AuthorDate: Mon Feb 1 14:53:30 2021 -0800

    KUDU-2671: Adds new field to PartitionSchema.
    
    This patch introduces a new field to PartitionSchema that combines range bounds
    and their respective hash bucket schemas. Any instance that assumes
    the same hash schemas are used for each range will need this new field.
    Some of the more important instances include partition pruning and many
    of the internal PartitionSchema functions.
    
    I moved RowOperationsPB to a separate .proto file due to some circular
    dependency issues between common.proto and wire_protocol.proto. Most of
    the proto changes in this patch revolve around this change.
    
    Change-Id: Ic5d8615ab9967fdb40292b9c77eb68a19baeca1d
    Reviewed-on: http://gerrit.cloudera.org:8080/17025
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../java/org/apache/kudu/client/Operation.java     |   2 +-
 .../java/org/apache/kudu/client/TestOperation.java |   2 +-
 src/kudu/client/scan_token-internal.cc             |   7 +-
 src/kudu/common/CMakeLists.txt                     |  13 +-
 src/kudu/common/common.proto                       |  14 ++
 src/kudu/common/partition-test.cc                  | 193 ++++++++++++++++++++-
 src/kudu/common/partition.cc                       | 106 ++++++++++-
 src/kudu/common/partition.h                        |  22 ++-
 src/kudu/common/partition_pruner.cc                |   5 +-
 src/kudu/common/row_operations-test.cc             |   2 +-
 src/kudu/common/row_operations.h                   |   2 +-
 src/kudu/common/row_operations.proto               |  83 +++++++++
 src/kudu/common/wire_protocol.proto                |  57 ------
 src/kudu/integration-tests/alter_table-test.cc     |   2 +-
 src/kudu/master/catalog_manager.cc                 |  16 +-
 src/kudu/master/master.proto                       |   6 +-
 src/kudu/master/sys_catalog.cc                     |   2 +-
 src/kudu/tablet/tablet_metadata.cc                 |   2 +-
 src/kudu/tserver/tablet_copy_client.cc             |   1 -
 src/kudu/tserver/tablet_server-test.cc             |   1 +
 src/kudu/tserver/tablet_service.cc                 |   4 +-
 src/kudu/tserver/tserver.proto                     |   1 +
 22 files changed, 453 insertions(+), 90 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 4981d5e..4d871c9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -34,10 +34,10 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.RowOperations.RowOperationsPB;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.WireProtocol.AppStatusPB.ErrorCode;
-import org.apache.kudu.WireProtocol.RowOperationsPB;
 import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
 import org.apache.kudu.client.Statistics.Statistic;
 import org.apache.kudu.client.Statistics.TabletStatistics;
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
index 20e7640..42c747c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
@@ -29,9 +29,9 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.RowOperations.RowOperationsPB;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.WireProtocol.RowOperationsPB;
 import org.apache.kudu.client.Operation.ChangeType;
 import org.apache.kudu.test.junit.RetryRule;
 import org.apache.kudu.tserver.Tserver.WriteRequestPBOrBuilder;
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 132e772..00afa2b 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -40,7 +40,6 @@
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/client/tablet-internal.h"
 #include "kudu/client/tablet_server-internal.h"
-#include "kudu/common/column_predicate.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/encoded_key.h"
 #include "kudu/common/partition.h"
@@ -69,6 +68,7 @@ using strings::Substitute;
 
 namespace kudu {
 
+class ColumnPredicate;
 using master::GetTableLocationsResponsePB;
 using master::TableIdentifierPB;
 using master::TabletLocationsPB;
@@ -131,7 +131,7 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
     KuduSchema kudu_schema(schema);
     PartitionSchema partition_schema;
     RETURN_NOT_OK(PartitionSchema::FromPB(metadata.partition_schema(), schema,
-        &partition_schema));
+                                          &partition_schema));
     map<string, string> extra_configs(metadata.extra_configs().begin(),
         metadata.extra_configs().end());
     table.reset(new KuduTable(client->shared_from_this(), metadata.table_name(),
@@ -341,7 +341,8 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
     RETURN_NOT_OK(SchemaToPB(KuduSchema::ToSchema(table->schema()), &schema_pb));
     *table_pb.mutable_schema() = std::move(schema_pb);
     PartitionSchemaPB partition_schema_pb;
-    table->partition_schema().ToPB(&partition_schema_pb);
+    RETURN_NOT_OK(table->partition_schema().ToPB(KuduSchema::ToSchema(table->schema()),
+                                                 &partition_schema_pb));
     table_pb.mutable_partition_schema()->CopyFrom(partition_schema_pb);
     table_pb.mutable_extra_configs()->insert(table->extra_configs().begin(),
                                              table->extra_configs().end());
diff --git a/src/kudu/common/CMakeLists.txt b/src/kudu/common/CMakeLists.txt
index e25d07e..1cb6207 100644
--- a/src/kudu/common/CMakeLists.txt
+++ b/src/kudu/common/CMakeLists.txt
@@ -22,10 +22,20 @@ PROTOBUF_GENERATE_CPP(
   PROTO_FILES common.proto)
 ADD_EXPORTABLE_LIBRARY(kudu_common_proto
   SRCS ${COMMON_PROTO_SRCS}
-  DEPS block_bloom_filter_proto hash_proto pb_util_proto protobuf util_compression_proto
+  DEPS block_bloom_filter_proto hash_proto protobuf row_operations_proto util_compression_proto
   NONLINK_DEPS ${COMMON_PROTO_TGTS})
 
 PROTOBUF_GENERATE_CPP(
+  ROW_OPERATIONS_PROTO_SRCS ROW_OPERATIONS_PROTO_HDRS ROW_OPERATIONS_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES row_operations.proto)
+ADD_EXPORTABLE_LIBRARY(row_operations_proto
+  SRCS ${ROW_OPERATIONS_PROTO_SRCS}
+  DEPS pb_util_proto protobuf
+  NONLINK_DEPS ${ROW_OPERATIONS_PROTO_TGTS})
+
+PROTOBUF_GENERATE_CPP(
   WIRE_PROTOCOL_PROTO_SRCS WIRE_PROTOCOL_PROTO_HDRS WIRE_PROTOCOL_PROTO_TGTS
   SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
   BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
@@ -77,6 +87,7 @@ set(COMMON_LIBS
   gutil
   kudu_common_proto
   kudu_util
+  row_operations_proto
   wire_protocol_proto)
 
 ADD_EXPORTABLE_LIBRARY(kudu_common
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index 0043669..2b51a7a 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -28,6 +28,7 @@ package kudu;
 
 option java_package = "org.apache.kudu";
 
+import "kudu/common/row_operations.proto";
 import "kudu/util/block_bloom_filter.proto";
 import "kudu/util/compression/compression.proto";
 import "kudu/util/hash.proto";
@@ -348,8 +349,21 @@ message PartitionSchemaPB {
     optional HashAlgorithm hash_algorithm = 4;
   }
 
+  message PerRangeHashBucketSchemasPB {
+    repeated HashBucketSchemaPB hash_schemas = 1;
+  }
+
   repeated HashBucketSchemaPB hash_bucket_schemas = 1;
   optional RangeSchemaPB range_schema = 2;
+
+  // Each index of 'range_bounds' represents the upper and lower bounds of
+  // ranges whose hash bucket schemas were specified. Its corresponding index
+  // of 'range_hash_schemas' represents that range's hash schema. An empty
+  // field of 'range_hash_schemas' indicates that the table wide hash schema
+  // specified in 'hash_bucket_schemas' is used. Both of these fields must have
+  // the same size.
+  repeated PerRangeHashBucketSchemasPB range_hash_schemas = 3;
+  repeated RowOperationsPB range_bounds = 4;
 }
 
 // The serialized format of a Kudu table partition.
diff --git a/src/kudu/common/partition-test.cc b/src/kudu/common/partition-test.cc
index 70b63f8..e71cce0 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -28,10 +28,13 @@
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <google/protobuf/util/message_differencer.h>
 #include <gtest/gtest.h>
 
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
+#include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/util/slice.h"
@@ -40,6 +43,7 @@
 #include "kudu/util/test_util.h"
 
 using boost::optional;
+using google::protobuf::util::MessageDifferencer;
 using std::pair;
 using std::string;
 using std::vector;
@@ -974,6 +978,23 @@ void CheckPartitions(const vector<Partition>& partitions) {
   EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a5\0\0b5\0\0", 16),partitions[15].partition_key_start());
   EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a6\0\0\0\0c6", 16),partitions[15].partition_key_end());
 }
+
+void CheckSerializationFunctions(const PartitionSchemaPB& pb,
+                                 const PartitionSchema& partition_schema,
+                                 const Schema& schema) {
+
+  PartitionSchemaPB pb1;
+  ASSERT_OK(partition_schema.ToPB(schema, &pb1));
+
+  // Compares original protobuf message to encoded protobuf message.
+  MessageDifferencer::Equals(pb, pb1);
+
+  PartitionSchema partition_schema1;
+  ASSERT_OK(PartitionSchema::FromPB(pb1, schema, &partition_schema1));
+
+  ASSERT_TRUE(partition_schema.Equals(partition_schema1));
+}
+
 } // namespace
 
 TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
@@ -990,6 +1011,7 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
   AddHashBucketComponent(&schema_builder, { "b" }, 2, 0);
   PartitionSchema partition_schema;
   ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema));
+  CheckSerializationFunctions(schema_builder, partition_schema, schema);
 
   ASSERT_EQ("HASH (a, c) PARTITIONS 3, HASH (b) PARTITIONS 2, RANGE (a, b, c)",
             partition_schema.DebugString(schema));
@@ -1092,7 +1114,7 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
 
 TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) {
   // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
-  // PARTITION BY [HASH BUCKET (a, c), HASH BUCKET (b), RANGE (a, b, c)];
+  // PARTITION BY [HASH BUCKET (b), RANGE (a, b, c)];
   Schema schema({ ColumnSchema("a", STRING),
                   ColumnSchema("b", STRING),
                   ColumnSchema("c", STRING) },
@@ -1103,6 +1125,7 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) {
   AddHashBucketComponent(&schema_builder, { "b" }, 2, 0);
   PartitionSchema partition_schema;
   ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema));
+  CheckSerializationFunctions(schema_builder, partition_schema, schema);
 
   ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b, c)",
             partition_schema.DebugString(schema));
@@ -1228,4 +1251,172 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) {
   EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a4\0\0b4\0\0", 16), partitions[11].partition_key_start());
   EXPECT_EQ("", partitions[11].partition_key_end());
 }
+
+TEST_F(PartitionTest, TestPartitionSchemaPB) {
+  // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
+  // PARTITION BY [HASH BUCKET (b), RANGE (a, b, c)];
+  Schema schema({ ColumnSchema("a", STRING),
+                  ColumnSchema("b", STRING),
+                  ColumnSchema("c", STRING) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
+
+  PartitionSchemaPB pb;
+  // Table-wide hash schema defined below.
+  AddHashBucketComponent(&pb, { "b" }, 2, 0);
+
+  // [(a0, _, c0), (a0, _, c1))
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a0"));
+    ASSERT_OK(lower.SetStringCopy("c", "c0"));
+    ASSERT_OK(upper.SetStringCopy("a", "a0"));
+    ASSERT_OK(upper.SetStringCopy("c", "c1"));
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    auto range_hash_component = pb.add_range_hash_schemas();
+    auto hash_component = range_hash_component->add_hash_schemas();
+    hash_component->add_columns()->set_name("a");
+    hash_component->set_num_buckets(4);
+  }
+
+  // [(a1, _, c2), (a1, _, c3))
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a1"));
+    ASSERT_OK(lower.SetStringCopy("c", "c2"));
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    ASSERT_OK(upper.SetStringCopy("c", "c3"));
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    auto range_hash_component = pb.add_range_hash_schemas();
+    auto hash_component_1 = range_hash_component->add_hash_schemas();
+    hash_component_1->add_columns()->set_name("a");
+    hash_component_1->set_num_buckets(2);
+    auto hash_component_2 = range_hash_component->add_hash_schemas();
+    hash_component_2->add_columns()->set_name("b");
+    hash_component_2->set_num_buckets(3);
+  }
+
+  // [(a2, _, c4), (a2, _, c5))
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a2"));
+    ASSERT_OK(lower.SetStringCopy("c", "c4"));
+    ASSERT_OK(upper.SetStringCopy("a", "a2"));
+    ASSERT_OK(upper.SetStringCopy("c", "c5"));
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    // empty field implies use of table wide hash schema
+    pb.add_range_hash_schemas();
+  }
+
+  PartitionSchema partition_schema;
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  // Check fields of 'partition_schema' to verify decoder function.
+  ASSERT_EQ(1, partition_schema.hash_partition_schemas().size());
+  const auto& ranges_with_hash_schemas = partition_schema.ranges_with_hash_schemas();
+  ASSERT_EQ(3, ranges_with_hash_schemas.size());
+
+  EXPECT_EQ(string("a0\0\0\0\0c0", 8), ranges_with_hash_schemas[0].lower);
+  EXPECT_EQ(string("a0\0\0\0\0c1", 8), ranges_with_hash_schemas[0].upper);
+  EXPECT_EQ(1, ranges_with_hash_schemas[0].hash_schemas.size());
+
+  const auto& range1_hash_schema = ranges_with_hash_schemas[0].hash_schemas[0];
+  EXPECT_EQ(1, range1_hash_schema.column_ids.size());
+  EXPECT_EQ(0, range1_hash_schema.column_ids[0]);
+  EXPECT_EQ(4, range1_hash_schema.num_buckets);
+
+  EXPECT_EQ(string("a1\0\0\0\0c2", 8), ranges_with_hash_schemas[1].lower);
+  EXPECT_EQ(string("a1\0\0\0\0c3", 8), ranges_with_hash_schemas[1].upper);
+  EXPECT_EQ(2, ranges_with_hash_schemas[1].hash_schemas.size());
+
+  const auto& range2_hash_schema_1 = ranges_with_hash_schemas[1].hash_schemas[0];
+  EXPECT_EQ(1, range2_hash_schema_1.column_ids.size());
+  EXPECT_EQ(0, range2_hash_schema_1.column_ids[0]);
+  EXPECT_EQ(2, range2_hash_schema_1.num_buckets);
+
+  const auto& range2_hash_schema_2 = ranges_with_hash_schemas[1].hash_schemas[1];
+  EXPECT_EQ(1, range2_hash_schema_2.column_ids.size());
+  EXPECT_EQ(1, range2_hash_schema_2.column_ids[0]);
+  EXPECT_EQ(3, range2_hash_schema_2.num_buckets);
+
+  EXPECT_EQ(string("a2\0\0\0\0c4", 8), ranges_with_hash_schemas[2].lower);
+  EXPECT_EQ(string("a2\0\0\0\0c5", 8), ranges_with_hash_schemas[2].upper);
+  EXPECT_EQ(0, ranges_with_hash_schemas[2].hash_schemas.size());
+
+  CheckSerializationFunctions(pb, partition_schema, schema);
+}
+
+TEST_F(PartitionTest, TestMalformedPartitionSchemaPB) {
+  // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
+  // PARTITION BY [RANGE (a, b, c)];
+  Schema schema({ ColumnSchema("a", STRING),
+                  ColumnSchema("b", STRING),
+                  ColumnSchema("c", STRING) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
+
+  PartitionSchemaPB pb;
+
+  // Testing that only a pair of range bounds is allowed.
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    KuduPartialRow extra(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a0"));
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    ASSERT_OK(extra.SetStringCopy("a", "a2"));
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, extra);
+  }
+
+  PartitionSchema partition_schema;
+  Status s = PartitionSchema::FromPB(pb, schema, &partition_schema);
+  ASSERT_EQ("Invalid argument: 3 ops were provided; "
+            "Only two ops are expected for this pair of range bounds.",
+            s.ToString());
+
+  pb.Clear();
+  // Testing that no split rows are allowed.
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow split(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(split.SetStringCopy("a", "a0"));
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    encoder.Add(RowOperationsPB::SPLIT_ROW, split);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+  }
+
+  Status s1 = PartitionSchema::FromPB(pb, schema, &partition_schema);
+  ASSERT_EQ("Invalid argument: Illegal row operation type in request: 4",
+            s1.ToString());
+
+  pb.Clear();
+  // Testing that 2nd bound is either RANGE_UPPER_BOUND or INCLUSIVE_RANGE_UPPER_BOUND.
+  {
+    RowOperationsPBEncoder encoder(pb.add_range_bounds());
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a0"));
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::SPLIT_ROW, upper);
+  }
+
+  Status s2 = PartitionSchema::FromPB(pb, schema, &partition_schema);
+  ASSERT_EQ("Invalid argument: missing upper range bound in request",
+            s2.ToString());
+}
 } // namespace kudu
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index ae276c1..e7787d0 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <cstring>
 #include <iterator>
+#include <memory>
 #include <set>
 #include <string>
 #include <unordered_set>
@@ -32,6 +33,8 @@
 #include "kudu/common/key_encoder.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row.h"
+#include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/endian.h"
 #include "kudu/gutil/map-util.h"
@@ -177,9 +180,60 @@ Status PartitionSchema::ExtractHashBucketSchemasFromPB(
 Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
                                const Schema& schema,
                                PartitionSchema* partition_schema) {
+  return FromPB(pb, schema, schema, partition_schema);
+}
+
+Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
+                               const Schema& schema,
+                               const Schema& client_schema,
+                               PartitionSchema* partition_schema) {
   partition_schema->Clear();
   RETURN_NOT_OK(ExtractHashBucketSchemasFromPB(schema, pb.hash_bucket_schemas(),
-                             &partition_schema->hash_bucket_schemas_));
+                                               &partition_schema->hash_bucket_schemas_));
+  RangeHashSchema range_hash_schema;
+  range_hash_schema.resize(pb.range_hash_schemas_size());
+  for (int i = 0; i < pb.range_hash_schemas_size(); i++) {
+    RETURN_NOT_OK(ExtractHashBucketSchemasFromPB(schema, pb.range_hash_schemas(i).hash_schemas(),
+                                                 &range_hash_schema[i]));
+  }
+  vector<pair<KuduPartialRow, KuduPartialRow>> range_bounds;
+  for (int i = 0; i < pb.range_bounds_size(); i++) {
+    RowOperationsPBDecoder decoder(&pb.range_bounds(i), &client_schema, &schema, nullptr);
+    vector<DecodedRowOperation> ops;
+    RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));
+    if (ops.size() != 2) {
+      return Status::InvalidArgument(Substitute("$0 ops were provided; Only two ops are expected "
+                                                "for this pair of range bounds.", ops.size()));
+    }
+    const DecodedRowOperation& op1 = ops[0];
+    const DecodedRowOperation& op2 = ops[1];
+    switch (op1.type) {
+      case RowOperationsPB::RANGE_LOWER_BOUND:
+      case RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND: {
+        if (op2.type != RowOperationsPB::RANGE_UPPER_BOUND &&
+            op2.type != RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
+          return Status::InvalidArgument("missing upper range bound in request");
+        }
+
+        // Lower bound range partition keys are inclusive and upper bound range partition keys
+        // are exclusive by design. If the provided keys are not of this format, these keys
+        // will be transformed to their proper format.
+        if (op1.type == RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND) {
+          RETURN_NOT_OK(partition_schema->MakeLowerBoundRangePartitionKeyInclusive(
+              op1.split_row.get()));
+        }
+        if (op2.type == RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
+          RETURN_NOT_OK(partition_schema->MakeUpperBoundRangePartitionKeyExclusive(
+              op2.split_row.get()));
+        }
+        range_bounds.emplace_back(*op1.split_row, *op2.split_row);
+        break;
+      }
+      default:
+        return Status::InvalidArgument(
+            Substitute("Illegal row operation type in request: $0", op1.type));
+    }
+  }
 
   if (pb.has_range_schema()) {
     const PartitionSchemaPB_RangeSchemaPB& range_pb = pb.range_schema();
@@ -194,10 +248,15 @@ Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
     }
   }
 
+  if (!range_bounds.empty()) {
+    RETURN_NOT_OK(partition_schema->EncodeRangeBounds(
+        range_bounds, range_hash_schema, schema, &partition_schema->ranges_with_hash_schemas_));
+  }
+
   return partition_schema->Validate(schema);
 }
 
-void PartitionSchema::ToPB(PartitionSchemaPB* pb) const {
+Status PartitionSchema::ToPB(const Schema& schema, PartitionSchemaPB* pb) const {
   pb->Clear();
   pb->mutable_hash_bucket_schemas()->Reserve(hash_bucket_schemas_.size());
   for (const HashBucketSchema& hash_bucket : hash_bucket_schemas_) {
@@ -207,7 +266,34 @@ void PartitionSchema::ToPB(PartitionSchemaPB* pb) const {
     hash_bucket_pb->set_seed(hash_bucket.seed);
   }
 
+  if (!ranges_with_hash_schemas_.empty()) {
+    pb->mutable_range_hash_schemas()->Reserve(ranges_with_hash_schemas_.size());
+    pb->mutable_range_bounds()->Reserve(ranges_with_hash_schemas_.size());
+    Arena arena(256);
+    for (const auto& range_hash_schema : ranges_with_hash_schemas_) {
+      RowOperationsPBEncoder encoder(pb->add_range_bounds());
+      arena.Reset();
+      KuduPartialRow lower(&schema);
+      KuduPartialRow upper(&schema);
+      Slice s_lower = Slice(range_hash_schema.lower);
+      Slice s_upper = Slice(range_hash_schema.upper);
+      RETURN_NOT_OK(DecodeRangeKey(&s_lower, &lower, &arena));
+      RETURN_NOT_OK(DecodeRangeKey(&s_upper, &upper, &arena));
+      encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+      encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+      auto* range_hash_schema_pb = pb->add_range_hash_schemas();
+      for (const auto& hash_bucket : range_hash_schema.hash_schemas) {
+        auto* hash_bucket_pb = range_hash_schema_pb->add_hash_schemas();
+        SetColumnIdentifiers(hash_bucket.column_ids, hash_bucket_pb->mutable_columns());
+        hash_bucket_pb->set_num_buckets(hash_bucket.num_buckets);
+        hash_bucket_pb->set_seed(hash_bucket.seed);
+      }
+    }
+  }
+
   SetColumnIdentifiers(range_schema_.column_ids, pb->mutable_range_schema()->mutable_columns());
+  return Status::OK();
 }
 
 template<typename Row>
@@ -1103,11 +1189,14 @@ Status PartitionSchema::BucketForRow(const ConstContiguousRow& row,
 void PartitionSchema::Clear() {
   hash_bucket_schemas_.clear();
   range_schema_.column_ids.clear();
+  ranges_with_hash_schemas_.clear();
+
 }
 
-Status PartitionSchema::Validate(const Schema& schema) const {
+Status PartitionSchema::ValidateHashBucketSchemas(const Schema& schema,
+                                                  const HashBucketSchemas& hash_schemas) {
   set<ColumnId> hash_columns;
-  for (const PartitionSchema::HashBucketSchema& hash_schema : hash_bucket_schemas_) {
+  for (const PartitionSchema::HashBucketSchema& hash_schema : hash_schemas) {
     if (hash_schema.num_buckets < 2) {
       return Status::InvalidArgument("must have at least two hash buckets");
     }
@@ -1131,6 +1220,15 @@ Status PartitionSchema::Validate(const Schema& schema) const {
       }
     }
   }
+  return Status::OK();
+}
+
+Status PartitionSchema::Validate(const Schema& schema) const {
+  RETURN_NOT_OK(ValidateHashBucketSchemas(schema, hash_bucket_schemas_));
+
+  for (const auto& range_with_hash_schemas : ranges_with_hash_schemas_) {
+    RETURN_NOT_OK(ValidateHashBucketSchemas(schema, range_with_hash_schemas.hash_schemas));
+  }
 
   for (const ColumnId& column_id : range_schema_.column_ids) {
     int32_t column_idx = schema.find_column_by_id(column_id);
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index 85c58e9..2226e45 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -175,8 +175,16 @@ class PartitionSchema {
                        const Schema& schema,
                        PartitionSchema* partition_schema) WARN_UNUSED_RESULT;
 
+  // Overloaded function similar to function above, used when an
+  // explicit client schema is available to decode the range bounds.
+  static Status FromPB(const PartitionSchemaPB& pb,
+                       const Schema& schema,
+                       const Schema& client_schema,
+                       PartitionSchema* partition_schema) WARN_UNUSED_RESULT;
+
   // Serializes a partition schema into a protobuf message.
-  void ToPB(PartitionSchemaPB* pb) const;
+  // Requires a schema to encode the range bounds.
+  Status ToPB(const Schema& schema, PartitionSchemaPB* pb) const;
 
   // Appends the row's encoded partition key into the provided buffer.
   // On failure, the buffer may have data partially appended.
@@ -192,7 +200,7 @@ class PartitionSchema {
   // of resulting partitions is the product of the number of hash buckets for
   // each hash bucket component, multiplied by
   // (split_rows.size() + max(1, range_bounds.size())).
-  // 'range_hash_schema' contains each range's HashBucketSchemas,
+  // 'range_hash_schemas' contains each range's HashBucketSchemas,
   // its order corresponds to the bounds in 'range_bounds'.
   // If 'range_hash_schemas' is empty, the table wide hash schema is used per range.
   // Size of 'range_hash_schemas' and 'range_bounds' are equal if 'range_hash_schema' isn't empty.
@@ -304,6 +312,10 @@ class PartitionSchema {
     return hash_bucket_schemas_;
   }
 
+  const std::vector<RangeWithHashSchemas>& ranges_with_hash_schemas() const {
+    return ranges_with_hash_schemas_;
+  }
+
   // Gets the vector containing the column indexes of the range partition keys.
   // If any of the columns is not in the key range columns then an
   // InvalidArgument status is returned.
@@ -414,6 +426,10 @@ class PartitionSchema {
   // Clears the state of this partition schema.
   void Clear();
 
+  // Helper function that validates the hash bucket schemas.
+  static Status ValidateHashBucketSchemas(const Schema& schema,
+                                          const HashBucketSchemas& hash_schemas);
+
   // Validates that this partition schema is valid. Returns OK, or an
   // appropriate error code for an invalid partition schema.
   Status Validate(const Schema& schema) const;
@@ -453,6 +469,8 @@ class PartitionSchema {
 
   HashBucketSchemas hash_bucket_schemas_;
   RangeSchema range_schema_;
+
+  std::vector<RangeWithHashSchemas> ranges_with_hash_schemas_;
 };
 
 } // namespace kudu
diff --git a/src/kudu/common/partition_pruner.cc b/src/kudu/common/partition_pruner.cc
index c96e0bb..2c39e10 100644
--- a/src/kudu/common/partition_pruner.cc
+++ b/src/kudu/common/partition_pruner.cc
@@ -18,8 +18,8 @@
 #include "kudu/common/partition_pruner.h"
 
 #include <algorithm>
-#include <cstring>
 #include <cstdint>
+#include <cstring>
 #include <iterator>
 #include <memory>
 #include <numeric>
@@ -43,6 +43,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/array_view.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/slice.h"
 
@@ -281,7 +282,7 @@ void PartitionPruner::Init(const Schema& schema,
   // components and a range component, then a few patterns emerge from the
   // examples above:
   //
-  // 1) The partition keys are truncated after the final constrained component
+  // 1) The partition keys are truncated after the final constrained component.
   //    Hash bucket components are constrained when the scan is limited to a
   //    subset of buckets via equality or in-list predicates on that component.
   //    Range components are constrained if they have an upper or lower bound
diff --git a/src/kudu/common/row_operations-test.cc b/src/kudu/common/row_operations-test.cc
index d56b77b..d501180 100644
--- a/src/kudu/common/row_operations-test.cc
+++ b/src/kudu/common/row_operations-test.cc
@@ -32,8 +32,8 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/dynamic_annotations.h"
diff --git a/src/kudu/common/row_operations.h b/src/kudu/common/row_operations.h
index 0717603..657a875 100644
--- a/src/kudu/common/row_operations.h
+++ b/src/kudu/common/row_operations.h
@@ -23,7 +23,7 @@
 #include <vector>
 
 #include "kudu/common/row_changelist.h"
-#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/bitset.h"
 #include "kudu/util/slice.h"
diff --git a/src/kudu/common/row_operations.proto b/src/kudu/common/row_operations.proto
new file mode 100644
index 0000000..c8c8431
--- /dev/null
+++ b/src/kudu/common/row_operations.proto
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Separate file created to resolve circular dependency between
+// common.proto and wire_protocol.proto due to necessity of
+// RowOperationsPB in PartitionSchemaPB to support varying
+// hash schemas per range.
+syntax = "proto2";
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+import "kudu/util/pb_util.proto";
+
+// A set of operations (INSERT, UPDATE, UPSERT, or DELETE) to apply to a table,
+// or the set of split rows and range bounds when creating or altering table.
+// Range bounds determine the boundaries of range partitions during table
+// creation, split rows further subdivide the ranges into more partitions.
+message RowOperationsPB {
+  enum Type {
+    UNKNOWN = 0;
+    INSERT = 1;
+    UPDATE = 2;
+    DELETE = 3;
+    UPSERT = 5;
+    INSERT_IGNORE = 10;
+    UPDATE_IGNORE = 11;
+    DELETE_IGNORE = 12;
+
+    // Used when specifying split rows on table creation.
+    SPLIT_ROW = 4;
+    // Used when specifying an inclusive lower bound range on table creation.
+    // Should be followed by the associated upper bound. If all values are
+    // missing, then signifies unbounded.
+    RANGE_LOWER_BOUND = 6;
+    // Used when specifying an exclusive upper bound range on table creation.
+    // Should be preceded by the associated lower bound. If all values are
+    // missing, then signifies unbounded.
+    RANGE_UPPER_BOUND = 7;
+    // Used when specifying an exclusive lower bound range on table creation.
+    // Should be followed by the associated upper bound. If all values are
+    // missing, then signifies unbounded.
+    EXCLUSIVE_RANGE_LOWER_BOUND = 8;
+    // Used when specifying an inclusive upper bound range on table creation.
+    // Should be preceded by the associated lower bound. If all values are
+    // missing, then signifies unbounded.
+    INCLUSIVE_RANGE_UPPER_BOUND = 9;
+  }
+
+  // The row data for each operation is stored in the following format:
+  //
+  // [operation type] (one byte):
+  //   A single-byte field which determines the type of operation. The values are
+  //   based on the 'Type' enum above.
+  // [column isset bitmap]   (one bit for each column in the Schema, rounded to nearest byte)
+  //   A set bit in this bitmap indicates that the user has specified the given column
+  //   in the row. This indicates that the column will be present in the data to follow.
+  // [null bitmap]           (one bit for each Schema column, rounded to nearest byte)
+  //   A set bit in this bitmap indicates that the given column is NULL.
+  //   This is only present if there are any nullable columns.
+  // [column data]
+  //   For each column which is set and not NULL, the column's data follows. The data
+  //   format of each cell is the canonical in-memory format (eg little endian).
+  //   For string data, the pointers are relative to 'indirect_data'.
+  //
+  // The rows are concatenated end-to-end with no padding/alignment.
+  optional bytes rows = 2 [(kudu.REDACT) = true];
+  optional bytes indirect_data = 3 [(kudu.REDACT) = true];
+}
diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto
index cce7a87..b6b0c69 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -27,7 +27,6 @@ option java_package = "org.apache.kudu";
 
 import "kudu/common/common.proto";
 import "kudu/consensus/metadata.proto";
-import "kudu/util/pb_util.proto";
 
 // Error status returned by any RPC method.
 // Every RPC method which could generate an application-level error
@@ -182,59 +181,3 @@ message ColumnarRowBlockPB {
   repeated Column columns = 1;
   optional int64 num_rows = 2;
 }
-
-// A set of operations (INSERT, UPDATE, UPSERT, or DELETE) to apply to a table,
-// or the set of split rows and range bounds when creating or altering table.
-// Range bounds determine the boundaries of range partitions during table
-// creation, split rows further subdivide the ranges into more partitions.
-message RowOperationsPB {
-  enum Type {
-    UNKNOWN = 0;
-    INSERT = 1;
-    UPDATE = 2;
-    DELETE = 3;
-    UPSERT = 5;
-    INSERT_IGNORE = 10;
-    UPDATE_IGNORE = 11;
-    DELETE_IGNORE = 12;
-
-    // Used when specifying split rows on table creation.
-    SPLIT_ROW = 4;
-    // Used when specifying an inclusive lower bound range on table creation.
-    // Should be followed by the associated upper bound. If all values are
-    // missing, then signifies unbounded.
-    RANGE_LOWER_BOUND = 6;
-    // Used when specifying an exclusive upper bound range on table creation.
-    // Should be preceded by the associated lower bound. If all values are
-    // missing, then signifies unbounded.
-    RANGE_UPPER_BOUND = 7;
-    // Used when specifying an exclusive lower bound range on table creation.
-    // Should be followed by the associated upper bound. If all values are
-    // missing, then signifies unbounded.
-    EXCLUSIVE_RANGE_LOWER_BOUND = 8;
-    // Used when specifying an inclusive upper bound range on table creation.
-    // Should be preceded by the associated lower bound. If all values are
-    // missing, then signifies unbounded.
-    INCLUSIVE_RANGE_UPPER_BOUND = 9;
-  }
-
-  // The row data for each operation is stored in the following format:
-  //
-  // [operation type] (one byte):
-  //   A single-byte field which determines the type of operation. The values are
-  //   based on the 'Type' enum above.
-  // [column isset bitmap]   (one bit for each column in the Schema, rounded to nearest byte)
-  //   A set bit in this bitmap indicates that the user has specified the given column
-  //   in the row. This indicates that the column will be present in the data to follow.
-  // [null bitmap]           (one bit for each Schema column, rounded to nearest byte)
-  //   A set bit in this bitmap indicates that the given column is NULL.
-  //   This is only present if there are any nullable columns.
-  // [column data]
-  //   For each column which is set and not NULL, the column's data follows. The data
-  //   format of each cell is the canonical in-memory format (eg little endian).
-  //   For string data, the pointers are relative to 'indirect_data'.
-  //
-  // The rows are concatenated end-to-end with no padding/alignment.
-  optional bytes rows = 2 [(kudu.REDACT) = true];
-  optional bytes indirect_data = 3 [(kudu.REDACT) = true];
-}
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 6d6f65e..42e4818 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -1875,7 +1875,7 @@ TEST_F(AlterTableTest, TestAddRangePartitionConflictExhaustive) {
     if (a_lower_bound == b_lower_bound && a_upper_bound == b_upper_bound) {
       ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
       ASSERT_STR_CONTAINS(s.ToString(),
-                          "new range partiton duplicates another newly added one");
+                          "new range partition duplicates another newly added one");
     } else {
       ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
       ASSERT_STR_CONTAINS(s.ToString(),
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index f3c2422..dd7ce71 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -71,6 +71,7 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/common/wire_protocol.h"
@@ -1676,8 +1677,8 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   // partitioned on the primary key columns) will be used.
   PartitionSchema partition_schema;
   RETURN_NOT_OK(SetupError(
-        PartitionSchema::FromPB(req.partition_schema(), schema, &partition_schema),
-        resp, MasterErrorPB::INVALID_SCHEMA));
+      PartitionSchema::FromPB(req.partition_schema(), schema, client_schema, &partition_schema),
+      resp, MasterErrorPB::INVALID_SCHEMA));
 
   // Decode split rows.
   vector<KuduPartialRow> split_rows;
@@ -1708,11 +1709,11 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
 
         if (op.type == RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND) {
           RETURN_NOT_OK(partition_schema.MakeLowerBoundRangePartitionKeyInclusive(
-                op.split_row.get()));
+              op.split_row.get()));
         }
         if (ops[i].type == RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND) {
           RETURN_NOT_OK(partition_schema.MakeUpperBoundRangePartitionKeyExclusive(
-                ops[i].split_row.get()));
+              ops[i].split_row.get()));
         }
         range_bounds.emplace_back(*op.split_row, *ops[i].split_row);
         break;
@@ -2015,7 +2016,7 @@ scoped_refptr<TableInfo> CatalogManager::CreateTableInfo(
   // Use the Schema object passed in, since it has the column IDs already assigned,
   // whereas the user request PB does not.
   CHECK_OK(SchemaToPB(schema, metadata->mutable_schema()));
-  partition_schema.ToPB(metadata->mutable_partition_schema());
+  CHECK_OK(partition_schema.ToPB(schema, metadata->mutable_partition_schema()));
   metadata->set_create_timestamp(time(nullptr));
   (*metadata->mutable_extra_config()) = std::move(extra_config_pb);
   table->RegisterMetrics(master_->metric_registry(), metadata->name());
@@ -2436,7 +2437,8 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
   Schema schema;
   RETURN_NOT_OK(SchemaFromPB(l.data().pb.schema(), &schema));
   PartitionSchema partition_schema;
-  RETURN_NOT_OK(PartitionSchema::FromPB(l.data().pb.partition_schema(), schema, &partition_schema));
+  RETURN_NOT_OK(PartitionSchema::FromPB(l.data().pb.partition_schema(), schema,
+                                        client_schema, &partition_schema));
 
   TableInfo::TabletInfoMap existing_tablets = table->tablet_map();
   TableInfo::TabletInfoMap new_tablets;
@@ -2556,7 +2558,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
             if (lower_bound == p.partition_key_start() &&
                 upper_bound == p.partition_key_end()) {
               return Status::AlreadyPresent(
-                  "new range partiton duplicates another newly added one",
+                  "new range partition duplicates another newly added one",
                   partition_schema.RangePartitionDebugString(*ops[0].split_row,
                                                              *ops[1].split_row));
             }
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 2d64f41..51d57e6 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -20,6 +20,7 @@ package kudu.master;
 option java_package = "org.apache.kudu.master";
 
 import "kudu/common/common.proto";
+import "kudu/common/row_operations.proto";
 import "kudu/common/wire_protocol.proto";
 import "kudu/consensus/metadata.proto";
 import "kudu/consensus/replica_management.proto";
@@ -493,9 +494,6 @@ message GetTabletLocationsResponsePB {
 // ============================================================================
 //  Catalog
 // ============================================================================
-message PerRangeHashBucketSchemasPB {
-  repeated PartitionSchemaPB.HashBucketSchemaPB hash_schemas = 1;
-}
 
 message CreateTableRequestPB {
   required string name = 1;
@@ -512,7 +510,7 @@ message CreateTableRequestPB {
   // split rows are specified. If this field is set, its size must match the number of ranges
   // specified by range bounds and they must be in the same order. If this field is empty,
   // 'partition_schema' is assumed for every range bound.
-  repeated PerRangeHashBucketSchemasPB range_hash_schemas = 12;
+  repeated PartitionSchemaPB.PerRangeHashBucketSchemasPB range_hash_schemas = 12;
   optional int32 num_replicas = 4;
 
   // If set, uses the provided value as the table owner when creating the table.
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 71506e2..54cee49 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -39,12 +39,12 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/rowblock_memory.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/consensus_peers.h"
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 69fcc84..fe321b2 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -738,7 +738,7 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
   partition_.ToPB(pb.mutable_partition());
   pb.set_last_durable_mrs_id(last_durable_mrs_id_);
   pb.set_schema_version(schema_version_);
-  partition_schema_.ToPB(pb.mutable_partition_schema());
+  RETURN_NOT_OK(partition_schema_.ToPB(*schema_, pb.mutable_partition_schema()));
   pb.set_table_name(table_name_);
 
   for (const shared_ptr<RowSetMetadata>& meta : rowsets) {
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index dee2afe..c9ff0d2 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -25,7 +25,6 @@
 #include <utility>
 
 #include <boost/optional/optional.hpp>
-#include <boost/type_traits/decay.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/port.h>
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 555bd47..86a8aa0 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -49,6 +49,7 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol-test-util.h"
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 4d1c3db..56a63c9 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -44,6 +44,7 @@
 #include "kudu/common/iterator_stats.h"
 #include "kudu/common/key_range.h"
 #include "kudu/common/partition.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/rowblock_memory.h"
 #include "kudu/common/scan_spec.h"
@@ -2188,7 +2189,8 @@ void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req,
     if (req->need_schema_info()) {
       CHECK_OK(SchemaToPB(replica->tablet_metadata()->schema(),
                           status->mutable_schema()));
-      replica->tablet_metadata()->partition_schema().ToPB(status->mutable_partition_schema());
+      CHECK_OK(replica->tablet_metadata()->partition_schema().ToPB(
+          replica->tablet_metadata()->schema(), status->mutable_partition_schema()));
     }
   }
   context->RespondSuccess();
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 9721bf8..a603975 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -20,6 +20,7 @@ package kudu.tserver;
 option java_package = "org.apache.kudu.tserver";
 
 import "kudu/common/common.proto";
+import "kudu/common/row_operations.proto";
 import "kudu/common/wire_protocol.proto";
 import "kudu/security/token.proto";
 import "kudu/tablet/tablet.proto";