You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/12/01 01:32:44 UTC

[kudu] branch master updated: [partitioning] KUDU-2671: Support for range specific HashSchemas.

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 17575e0  [partitioning] KUDU-2671: Support for range specific HashSchemas.
17575e0 is described below

commit 17575e0b693cf97a6ec5d74e78d89343de4781eb
Author: Mahesh Reddy <mr...@cloudera.com>
AuthorDate: Tue Oct 13 16:45:07 2020 -0700

    [partitioning] KUDU-2671: Support for range specific HashSchemas.
    
    This patch updates PartitionSchema::CreatePartitions() to support the
    ability to add different hash schemas per each range. If no hash schema
    per range is specified, the table wide hash schema is used. Currently,
    this only works if no split_rows are specified.
    
    Since split_rows only exists for backwards compatibility reasons, this
    feature will not be supported with split_rows. Returning a message to
    the user stating to specify both upper and lower bounds either at table
    creation or alteration time should suffice. Split_rows is also more
    syntactically ambiguous when specifying bounds.
    
    Currently, range_hash_schemas holds the HashBucketSchemas for each range.
    Its order corresponds to the bounds in range_bounds so that when the
    bounds are sorted its corresponding hash schemas are sorted as well.
    
    Inspiration from Vlad: https://gerrit.cloudera.org/c/15758/
    
    Change-Id: I8725f4bd072a81b05b36dfc7df0c074c172b4ce8
    Reviewed-on: http://gerrit.cloudera.org:8080/16596
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/common/partition-test.cc                  | 248 ++++++++++++++++++++-
 src/kudu/common/partition.cc                       | 225 ++++++++++++-------
 src/kudu/common/partition.h                        |  64 ++++--
 src/kudu/common/partition_pruner-test.cc           |  23 +-
 src/kudu/common/scan_spec-test.cc                  |  10 +-
 .../integration-tests/ts_tablet_manager-itest.cc   |   2 +-
 src/kudu/master/catalog_manager.cc                 |   5 +-
 src/kudu/master/sys_catalog.cc                     |   2 +-
 src/kudu/tablet/tablet-harness.h                   |   2 +-
 src/kudu/tserver/tablet_server-test.cc             |   2 +-
 10 files changed, 456 insertions(+), 127 deletions(-)

diff --git a/src/kudu/common/partition-test.cc b/src/kudu/common/partition-test.cc
index fa1eda2..c5e0873 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -19,7 +19,9 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <random>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -41,6 +43,7 @@ using boost::optional;
 using std::pair;
 using std::string;
 using std::vector;
+using std::make_pair;
 
 namespace kudu {
 
@@ -72,7 +75,7 @@ void CheckCreateRangePartitions(const vector<pair<optional<string>, optional<str
   CHECK_EQ(std::max(raw_bounds.size(), 1UL) + raw_splits.size(), expected_partition_ranges.size());
 
   // CREATE TABLE t (col STRING PRIMARY KEY),
-  // PARITITION BY RANGE (col);
+  // PARTITION BY RANGE (col);
   Schema schema({ ColumnSchema("col", STRING) }, { ColumnId(0) }, 1);
 
   PartitionSchema partition_schema;
@@ -103,7 +106,7 @@ void CheckCreateRangePartitions(const vector<pair<optional<string>, optional<str
   }
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, {}, schema, &partitions));
   ASSERT_EQ(expected_partition_ranges.size(), partitions.size());
 
   for (int i = 0; i < partitions.size(); i++) {
@@ -129,7 +132,7 @@ TEST_F(PartitionTest, TestCompoundRangeKeyEncoding) {
 
   // CREATE TABLE t (c1 STRING, c2 STRING, c3 STRING),
   // PRIMARY KEY (c1, c2, c3)
-  // PARITITION BY RANGE (c1, c2, c3);
+  // PARTITION BY RANGE (c1, c2, c3);
   Schema schema({ ColumnSchema("c1", STRING),
                   ColumnSchema("c2", STRING),
                   ColumnSchema("c3", STRING) },
@@ -175,7 +178,7 @@ TEST_F(PartitionTest, TestCompoundRangeKeyEncoding) {
   }
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(splits, bounds, {}, schema, &partitions));
   ASSERT_EQ(4, partitions.size());
 
   EXPECT_TRUE(partitions[0].hash_buckets().empty());
@@ -192,7 +195,7 @@ TEST_F(PartitionTest, TestCompoundRangeKeyEncoding) {
 
 TEST_F(PartitionTest, TestPartitionKeyEncoding) {
   // CREATE TABLE t (a INT32, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
-  // PARITITION BY [HASH BUCKET (a, b), HASH BUCKET (c), RANGE (a, b, c)];
+  // PARTITION BY [HASH BUCKET (a, b), HASH BUCKET (c), RANGE (a, b, c)];
   Schema schema({ ColumnSchema("a", INT32),
                   ColumnSchema("b", STRING),
                   ColumnSchema("c", STRING) },
@@ -418,7 +421,7 @@ TEST_F(PartitionTest, TestCreateRangePartitions) {
 
 TEST_F(PartitionTest, TestCreateHashBucketPartitions) {
   // CREATE TABLE t (a VARCHAR PRIMARY KEY),
-  // PARITITION BY [HASH BUCKET (a)];
+  // PARTITION BY [HASH BUCKET (a)];
   Schema schema({ ColumnSchema("a", STRING) }, { ColumnId(0) }, 1);
 
   PartitionSchemaPB schema_builder;
@@ -433,10 +436,11 @@ TEST_F(PartitionTest, TestCreateHashBucketPartitions) {
   //
   // [ (_), (1) )
   // [ (1), (2) )
-  // [ (3), (_) )
+  // [ (2), (_) )
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, schema, &partitions));
+  ASSERT_OK(
+      partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {}, schema, &partitions));
   ASSERT_EQ(3, partitions.size());
 
   EXPECT_EQ(0, partitions[0].hash_buckets()[0]);
@@ -471,7 +475,7 @@ TEST_F(PartitionTest, TestCreatePartitions) {
   ASSERT_NE("", gflags::SetCommandLineOption("redact", "log"));
 
   // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
-  // PARITITION BY [HASH BUCKET (a), HASH BUCKET (b), RANGE (a, b, c)];
+  // PARTITION BY [HASH BUCKET (a), HASH BUCKET (b), RANGE (a, b, c)];
   Schema schema({ ColumnSchema("a", STRING),
                   ColumnSchema("b", STRING),
                   ColumnSchema("c", STRING) },
@@ -529,7 +533,7 @@ TEST_F(PartitionTest, TestCreatePartitions) {
   // Split keys need not be passed in sorted order.
   vector<KuduPartialRow> split_rows = { split_b, split_a };
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(split_rows, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(split_rows, {}, {}, schema, &partitions));
   ASSERT_EQ(12, partitions.size());
 
   EXPECT_EQ(0, partitions[0].hash_buckets()[0]);
@@ -659,7 +663,7 @@ TEST_F(PartitionTest, TestCreatePartitions) {
 
 TEST_F(PartitionTest, TestIncrementRangePartitionBounds) {
   // CREATE TABLE t (a INT8, b INT8, c INT8, PRIMARY KEY (a, b, c))
-  // PARITITION BY RANGE (a, b, c);
+  // PARTITION BY RANGE (a, b, c);
   Schema schema({ ColumnSchema("c1", INT8),
                   ColumnSchema("c2", INT8),
                   ColumnSchema("c3", INT8) },
@@ -752,7 +756,7 @@ TEST_F(PartitionTest, TestIncrementRangePartitionBounds) {
 
 TEST_F(PartitionTest, TestIncrementRangePartitionStringBounds) {
   // CREATE TABLE t (a STRING, b STRING, PRIMARY KEY (a, b))
-  // PARITITION BY RANGE (a, b, c);
+  // PARTITION BY RANGE (a, b, c);
   Schema schema({ ColumnSchema("c1", STRING),
                   ColumnSchema("c2", STRING) },
                 { ColumnId(0), ColumnId(1) }, 2);
@@ -857,4 +861,224 @@ TEST_F(PartitionTest, TestVarcharRangePartitions) {
     check(test, false);
   }
 }
+
+namespace {
+void CheckPartitions(const vector<Partition>& partitions) {
+  ASSERT_EQ(16, partitions.size());
+
+  EXPECT_EQ(0, partitions[0].hash_buckets()[0]);
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[0].range_key_start());
+  EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[0].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "a1\0\0\0\0c1", 12),partitions[0].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "a2\0\0\0\0c2", 12),partitions[0].partition_key_end());
+
+  EXPECT_EQ(1, partitions[1].hash_buckets()[0]);
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[1].range_key_start());
+  EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[1].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "a1\0\0\0\0c1", 12),partitions[1].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "a2\0\0\0\0c2", 12),partitions[1].partition_key_end());
+
+  EXPECT_EQ(2, partitions[2].hash_buckets()[0]);
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[2].range_key_start());
+  EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[2].range_key_end());
+  EXPECT_EQ(string("\0\0\0\2" "a1\0\0\0\0c1", 12),partitions[2].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\2" "a2\0\0\0\0c2", 12),partitions[2].partition_key_end());
+
+  EXPECT_EQ(3, partitions[3].hash_buckets()[0]);
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[3].range_key_start());
+  EXPECT_EQ(string("a2\0\0\0\0c2", 8), partitions[3].range_key_end());
+  EXPECT_EQ(string("\0\0\0\3" "a1\0\0\0\0c1", 12),partitions[3].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\3" "a2\0\0\0\0c2", 12),partitions[3].partition_key_end());
+
+  EXPECT_EQ(0, partitions[4].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[4].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8),partitions[4].range_key_start());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8),partitions[4].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a3\0\0b3\0\0", 16),partitions[4].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a4\0\0b4\0\0", 16),partitions[4].partition_key_end());
+
+  EXPECT_EQ(0, partitions[5].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[5].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8),partitions[5].range_key_start());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8),partitions[5].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a3\0\0b3\0\0", 16),partitions[5].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a4\0\0b4\0\0", 16),partitions[5].partition_key_end());
+
+  EXPECT_EQ(1, partitions[6].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[6].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8),partitions[6].range_key_start());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8),partitions[6].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a3\0\0b3\0\0", 16),partitions[6].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a4\0\0b4\0\0", 16),partitions[6].partition_key_end());
+
+  EXPECT_EQ(1, partitions[7].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[7].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8),partitions[7].range_key_start());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8),partitions[7].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a3\0\0b3\0\0", 16),partitions[7].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a4\0\0b4\0\0", 16),partitions[7].partition_key_end());
+
+  EXPECT_EQ(2, partitions[8].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[8].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8),partitions[8].range_key_start());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8),partitions[8].range_key_end());
+  EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a3\0\0b3\0\0", 16),partitions[8].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\2" "\0\0\0\0" "a4\0\0b4\0\0", 16),partitions[8].partition_key_end());
+
+  EXPECT_EQ(2, partitions[9].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[9].hash_buckets()[1]);
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8),partitions[9].range_key_start());
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8),partitions[9].range_key_end());
+  EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a3\0\0b3\0\0", 16),partitions[9].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\2" "\0\0\0\1" "a4\0\0b4\0\0", 16),partitions[9].partition_key_end());
+
+  EXPECT_EQ(0, partitions[10].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[10].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8),partitions[10].range_key_start());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8),partitions[10].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a5\0\0b5\0\0", 16),partitions[10].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a6\0\0\0\0c6", 16),partitions[10].partition_key_end());
+
+  EXPECT_EQ(0, partitions[11].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[11].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8),partitions[11].range_key_start());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8),partitions[11].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a5\0\0b5\0\0", 16),partitions[11].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a6\0\0\0\0c6", 16),partitions[11].partition_key_end());
+
+  EXPECT_EQ(0, partitions[12].hash_buckets()[0]);
+  EXPECT_EQ(2, partitions[12].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8),partitions[12].range_key_start());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8),partitions[12].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a5\0\0b5\0\0", 16),partitions[12].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a6\0\0\0\0c6", 16),partitions[12].partition_key_end());
+
+  EXPECT_EQ(1, partitions[13].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[13].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8),partitions[13].range_key_start());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8),partitions[13].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a5\0\0b5\0\0", 16),partitions[13].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a6\0\0\0\0c6", 16),partitions[13].partition_key_end());
+
+  EXPECT_EQ(1, partitions[14].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[14].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8),partitions[14].range_key_start());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8),partitions[14].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a5\0\0b5\0\0", 16),partitions[14].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a6\0\0\0\0c6", 16),partitions[14].partition_key_end());
+
+  EXPECT_EQ(1, partitions[15].hash_buckets()[0]);
+  EXPECT_EQ(2, partitions[15].hash_buckets()[1]);
+  EXPECT_EQ(string("a5\0\0b5\0\0", 8),partitions[15].range_key_start());
+  EXPECT_EQ(string("a6\0\0\0\0c6", 8),partitions[15].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a5\0\0b5\0\0", 16),partitions[15].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a6\0\0\0\0c6", 16),partitions[15].partition_key_end());
+}
+} // namespace
+
+TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
+  // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
+  // PARTITION BY [HASH BUCKET (a, c), HASH BUCKET (b), RANGE (a, b, c)];
+  Schema schema({ ColumnSchema("a", STRING),
+                  ColumnSchema("b", STRING),
+                  ColumnSchema("c", STRING) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
+
+  PartitionSchemaPB schema_builder;
+  // Table-wide HashSchema defined below, 3 by 2 buckets so 6 total.
+  AddHashBucketComponent(&schema_builder, { "a", "c" }, 3, 0);
+  AddHashBucketComponent(&schema_builder, { "b" }, 2, 0);
+  PartitionSchema partition_schema;
+  ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema));
+
+  ASSERT_EQ("HASH (a, c) PARTITIONS 3, HASH (b) PARTITIONS 2, RANGE (a, b, c)",
+            partition_schema.DebugString(schema));
+
+  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
+  PartitionSchema::RangeHashSchema range_hash_schemas;
+  vector<pair<pair<KuduPartialRow, KuduPartialRow>,
+      PartitionSchema::HashBucketSchemas>> bounds_with_hash_schemas;
+
+  { // [(a1, _, c1), (a2, _, c2))
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a1"));
+    ASSERT_OK(lower.SetStringCopy("c", "c1"));
+    ASSERT_OK(upper.SetStringCopy("a", "a2"));
+    ASSERT_OK(upper.SetStringCopy("c", "c2"));
+    PartitionSchema::HashBucketSchemas hash_schema_4_buckets = {{{ColumnId(0)}, 4, 0}};
+    bounds.emplace_back(lower, upper);
+    range_hash_schemas.emplace_back(hash_schema_4_buckets);
+    bounds_with_hash_schemas.emplace_back(make_pair(std::move(lower), std::move(upper)),
+                                          std::move(hash_schema_4_buckets));
+  }
+
+  { // [(a3, b3, _), (a4, b4, _))
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a3"));
+    ASSERT_OK(lower.SetStringCopy("b", "b3"));
+    ASSERT_OK(upper.SetStringCopy("a", "a4"));
+    ASSERT_OK(upper.SetStringCopy("b", "b4"));
+    bounds.emplace_back(lower, upper);
+    range_hash_schemas.emplace_back(PartitionSchema::HashBucketSchemas());
+    bounds_with_hash_schemas.emplace_back(make_pair(std::move(lower), std::move(upper)),
+                                          PartitionSchema::HashBucketSchemas());
+  }
+
+  { // [(a5, b5, _), (a6, _, c6))
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a5"));
+    ASSERT_OK(lower.SetStringCopy("b", "b5"));
+    ASSERT_OK(upper.SetStringCopy("a", "a6"));
+    ASSERT_OK(upper.SetStringCopy("c", "c6"));
+    PartitionSchema::HashBucketSchemas hash_schema_2_buckets_by_3 = {
+        {{ColumnId(0)}, 2, 0},
+        {{ColumnId(1)}, 3, 0}
+    };
+    bounds.emplace_back(lower, upper);
+    range_hash_schemas.emplace_back(hash_schema_2_buckets_by_3);
+    bounds_with_hash_schemas.emplace_back(make_pair(std::move(lower), std::move(upper)),
+                                          std::move(hash_schema_2_buckets_by_3));
+  }
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions));
+  CheckPartitions(partitions);
+
+  bounds.clear();
+  range_hash_schemas.clear();
+  partitions.clear();
+
+  // Using std::random_shuffle to insert bounds and their hash schemas out of sorted order,
+  // yet resulting partitions will still be the same.
+  std::mt19937 gen(SeedRandom());
+  std::shuffle(bounds_with_hash_schemas.begin(), bounds_with_hash_schemas.end(), gen);
+
+  for (const auto& bounds_and_schema : bounds_with_hash_schemas) {
+    bounds.emplace_back(bounds_and_schema.first);
+    range_hash_schemas.emplace_back(bounds_and_schema.second);
+  }
+
+  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions));
+  CheckPartitions(partitions);
+
+  // not clearing bounds or range_hash_schemas, adding a split row to test incompatibility
+  vector<KuduPartialRow> splits;
+  { // split: (a1, _, c12)
+    KuduPartialRow split(&schema);
+    ASSERT_OK(split.SetStringCopy("a", "a1"));
+    ASSERT_OK(split.SetStringCopy("c", "c12"));
+    splits.emplace_back(std::move(split));
+  }
+
+  // expecting Status:InvalidArgument due to 'splits' and schemas within 'range_hash_schemas'
+  // being defined at the same time.
+  Status s = partition_schema.CreatePartitions(splits, bounds, range_hash_schemas,
+                                               schema, &partitions);
+  ASSERT_EQ("Invalid argument: Both 'split_rows' and 'range_hash_schemas' "
+            "cannot be populated at the same time.", s.ToString());
+
+}
 } // namespace kudu
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index dd34d30..04c6f93 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <cstring>
+#include <iterator>
 #include <set>
 #include <string>
 #include <unordered_set>
@@ -110,34 +111,6 @@ void Partition::FromPB(const PartitionPB& pb, Partition* partition) {
 }
 
 namespace {
-// Extracts the column IDs from a protobuf repeated field of column identifiers.
-Status ExtractColumnIds(const RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>& identifiers,
-                        const Schema& schema,
-                        vector<ColumnId>* column_ids) {
-    column_ids->reserve(identifiers.size());
-    for (const auto& identifier : identifiers) {
-      switch (identifier.identifier_case()) {
-        case PartitionSchemaPB_ColumnIdentifierPB::kId: {
-          ColumnId column_id(identifier.id());
-          if (schema.find_column_by_id(column_id) == Schema::kColumnNotFound) {
-            return Status::InvalidArgument("unknown column id", SecureDebugString(identifier));
-          }
-          column_ids->push_back(column_id);
-          continue;
-        }
-        case PartitionSchemaPB_ColumnIdentifierPB::kName: {
-          int32_t column_idx = schema.find_column(identifier.name());
-          if (column_idx == Schema::kColumnNotFound) {
-            return Status::InvalidArgument("unknown column", SecureDebugString(identifier));
-          }
-          column_ids->push_back(schema.column_id(column_idx));
-          continue;
-        }
-        default: return Status::InvalidArgument("unknown column", SecureDebugString(identifier));
-      }
-    }
-    return Status::OK();
-}
 // Sets a repeated field of column identifiers to the provided column IDs.
 void SetColumnIdentifiers(const vector<ColumnId>& column_ids,
                           RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>* identifiers) {
@@ -148,6 +121,38 @@ void SetColumnIdentifiers(const vector<ColumnId>& column_ids,
 }
 } // namespace
 
+// Extracts the column IDs from a protobuf repeated field of column identifiers.
+Status PartitionSchema::ExtractColumnIds(
+    const RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>& identifiers,
+    const Schema& schema,
+    vector<ColumnId>* column_ids) {
+  vector<ColumnId> new_column_ids;
+  new_column_ids.reserve(identifiers.size());
+  for (const auto& identifier : identifiers) {
+    switch (identifier.identifier_case()) {
+      case PartitionSchemaPB_ColumnIdentifierPB::kId: {
+        ColumnId column_id(identifier.id());
+        if (schema.find_column_by_id(column_id) == Schema::kColumnNotFound) {
+          return Status::InvalidArgument("unknown column id", SecureDebugString(identifier));
+        }
+        new_column_ids.emplace_back(std::move(column_id));
+      continue;
+      }
+      case PartitionSchemaPB_ColumnIdentifierPB::kName: {
+        int32_t column_idx = schema.find_column(identifier.name());
+        if (column_idx == Schema::kColumnNotFound) {
+          return Status::InvalidArgument("unknown column", SecureDebugString(identifier));
+        }
+        new_column_ids.emplace_back(schema.column_id(column_idx));
+      continue;
+      }
+      default: return Status::InvalidArgument("unknown column", SecureDebugString(identifier));
+    }
+  }
+  *column_ids = std::move(new_column_ids);
+  return Status::OK();
+}
+
 Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
                                const Schema& schema,
                                PartitionSchema* partition_schema) {
@@ -247,6 +252,9 @@ Status PartitionSchema::EncodeRangeSplits(const vector<KuduPartialRow>& split_ro
                                           const Schema& schema,
                                           vector<string>* splits) const {
   DCHECK(splits->empty());
+  if (split_rows.empty()) {
+    return Status::OK();
+  }
   for (const KuduPartialRow& row : split_rows) {
     string split;
     RETURN_NOT_OK(EncodeRangeKey(row, schema, &split));
@@ -267,14 +275,17 @@ Status PartitionSchema::EncodeRangeSplits(const vector<KuduPartialRow>& split_ro
 
 Status PartitionSchema::EncodeRangeBounds(const vector<pair<KuduPartialRow,
                                                             KuduPartialRow>>& range_bounds,
+                                          const RangeHashSchema& range_hash_schemas,
                                           const Schema& schema,
-                                          vector<pair<string, string>>* range_partitions) const {
-  DCHECK(range_partitions->empty());
+                                          vector<RangeWithHashSchemas>*
+                                              bounds_with_hash_schemas) const {
+  DCHECK(bounds_with_hash_schemas->empty());
   if (range_bounds.empty()) {
-    range_partitions->emplace_back("", "");
+    bounds_with_hash_schemas->emplace_back(RangeWithHashSchemas{"", "", {}});
     return Status::OK();
   }
 
+  int j = 0;
   for (const auto& bound : range_bounds) {
     string lower;
     string upper;
@@ -286,24 +297,31 @@ Status PartitionSchema::EncodeRangeBounds(const vector<pair<KuduPartialRow,
           "range partition lower bound must be less than the upper bound",
           RangePartitionDebugString(bound.first, bound.second));
     }
-    range_partitions->emplace_back(std::move(lower), std::move(upper));
+    RangeWithHashSchemas temp{std::move(lower), std::move(upper), {}};
+    if (!range_hash_schemas.empty()) {
+      temp.hash_schemas = range_hash_schemas[j++];
+    }
+    bounds_with_hash_schemas->emplace_back(std::move(temp));
   }
 
+  std::sort(bounds_with_hash_schemas->begin(), bounds_with_hash_schemas->end(),
+            [](const RangeWithHashSchemas& s1, const RangeWithHashSchemas& s2) {
+    return s1.lower < s2.lower;
+  });
   // Check that the range bounds are non-overlapping
-  std::sort(range_partitions->begin(), range_partitions->end());
-  for (int i = 0; i < range_partitions->size() - 1; i++) {
-    const string& first_upper = range_partitions->at(i).second;
-    const string& second_lower = range_partitions->at(i + 1).first;
+  for (int i = 0; i < bounds_with_hash_schemas->size() - 1; i++) {
+    const string& first_upper = bounds_with_hash_schemas->at(i).upper;
+    const string& second_lower = bounds_with_hash_schemas->at(i + 1).lower;
 
     if (first_upper.empty() || second_lower.empty() || first_upper > second_lower) {
       return Status::InvalidArgument(
           "overlapping range partitions",
           strings::Substitute("first range partition: $0, second range partition: $1",
-                              RangePartitionDebugString(range_partitions->at(i).first,
-                                                        range_partitions->at(i).second,
+                              RangePartitionDebugString(bounds_with_hash_schemas->at(i).lower,
+                                                        bounds_with_hash_schemas->at(i).upper,
                                                         schema),
-                              RangePartitionDebugString(range_partitions->at(i + 1).first,
-                                                        range_partitions->at(i + 1).second,
+                              RangePartitionDebugString(bounds_with_hash_schemas->at(i + 1).lower,
+                                                        bounds_with_hash_schemas->at(i + 1).upper,
                                                         schema)));
     }
   }
@@ -313,19 +331,23 @@ Status PartitionSchema::EncodeRangeBounds(const vector<pair<KuduPartialRow,
 
 Status PartitionSchema::SplitRangeBounds(const Schema& schema,
                                          vector<string> splits,
-                                         vector<pair<string, string>>* bounds) const {
-  int expected_bounds = std::max(1UL, bounds->size()) + splits.size();
+                                         vector<RangeWithHashSchemas>*
+                                             bounds_with_hash_schemas) const {
+  if (splits.empty()) {
+    return Status::OK();
+  }
 
-  vector<pair<string, string>> new_bounds;
-  new_bounds.reserve(expected_bounds);
+  auto expected_bounds = std::max(1UL, bounds_with_hash_schemas->size()) + splits.size();
+  vector<RangeWithHashSchemas> new_bounds_with_hash_schemas;
+  new_bounds_with_hash_schemas.reserve(expected_bounds);
 
   // Iterate through the sorted bounds and sorted splits, splitting the bounds
   // as appropriate and adding them to the result list ('new_bounds').
 
   auto split = splits.begin();
-  for (auto& bound : *bounds) {
-    string& lower = bound.first;
-    const string& upper = bound.second;
+  for (auto& bound : *bounds_with_hash_schemas) {
+    string& lower = bound.lower;
+    const string& upper = bound.upper;
 
     for (; split != splits.end() && (upper.empty() || *split <= upper); split++) {
       if (!lower.empty() && *split < lower) {
@@ -337,36 +359,33 @@ Status PartitionSchema::SplitRangeBounds(const Schema& schema,
       }
       // Split the current bound. Add the lower section to the result list,
       // and continue iterating on the upper section.
-      new_bounds.emplace_back(std::move(lower), *split);
+      new_bounds_with_hash_schemas.emplace_back(RangeWithHashSchemas{std::move(lower), *split, {}});
       lower = std::move(*split);
     }
 
-    new_bounds.emplace_back(std::move(lower), upper);
+    new_bounds_with_hash_schemas.emplace_back(RangeWithHashSchemas{std::move(lower), upper, {}});
   }
 
   if (split != splits.end()) {
     return Status::InvalidArgument("split out of bounds", RangeKeyDebugString(*split, schema));
   }
 
-  bounds->swap(new_bounds);
-  CHECK_EQ(expected_bounds, bounds->size());
+  bounds_with_hash_schemas->swap(new_bounds_with_hash_schemas);
+  CHECK_EQ(expected_bounds, bounds_with_hash_schemas->size());
   return Status::OK();
 }
 
-Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>& split_rows,
-                                         const vector<pair<KuduPartialRow,
-                                                           KuduPartialRow>>& range_bounds,
-                                         const Schema& schema,
-                                         vector<Partition>* partitions) const {
-  const KeyEncoder<string>& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
-
-  // Create a partition per hash bucket combination.
-  *partitions = vector<Partition>(1);
-  for (const HashBucketSchema& bucket_schema : hash_bucket_schemas_) {
+vector<Partition> PartitionSchema::GenerateHashPartitions(const HashBucketSchemas& hash_schemas,
+                                                          const KeyEncoder<string>& hash_encoder) {
+  vector<Partition> hash_partitions(1);
+  // Create a partition for each hash bucket combination.
+  for (const HashBucketSchema& bucket_schema : hash_schemas) {
+    auto expected_partitions = hash_partitions.size() * bucket_schema.num_buckets;
     vector<Partition> new_partitions;
+    new_partitions.reserve(expected_partitions);
     // For each of the partitions created so far, replicate it
-    // by the number of buckets in the next hash bucketing component
-    for (const Partition& base_partition : *partitions) {
+    // by the number of buckets in the next hash bucketing component.
+    for (const Partition& base_partition : hash_partitions) {
       for (int32_t bucket = 0; bucket < bucket_schema.num_buckets; bucket++) {
         Partition partition = base_partition;
         partition.hash_buckets_.push_back(bucket);
@@ -375,9 +394,31 @@ Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>& split_row
         new_partitions.push_back(partition);
       }
     }
-    partitions->swap(new_partitions);
+    hash_partitions = std::move(new_partitions);
+  }
+  return hash_partitions;
+}
+
+Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>& split_rows,
+                                         const vector<pair<KuduPartialRow,
+                                                           KuduPartialRow>>& range_bounds,
+                                         const RangeHashSchema& range_hash_schemas,
+                                         const Schema& schema,
+                                         vector<Partition>* partitions) const {
+  const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
+
+  if (!split_rows.empty()) {
+    for (const auto& hash_schemas : range_hash_schemas) {
+      if (!hash_schemas.empty()) {
+        return Status::InvalidArgument("Both 'split_rows' and 'range_hash_schemas' cannot be "
+                                       "populated at the same time.");
+      }
+    }
   }
 
+  vector<Partition> base_hash_partitions = GenerateHashPartitions(hash_bucket_schemas_,
+                                                                  hash_encoder);
+
   std::unordered_set<int> range_column_idxs;
   for (const ColumnId& column_id : range_schema_.column_ids) {
     int column_idx = schema.find_column_by_id(column_id);
@@ -391,24 +432,52 @@ Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>& split_row
     }
   }
 
-  vector<pair<string, string>> bounds;
+  vector<RangeWithHashSchemas> bounds_with_hash_schemas;
   vector<string> splits;
-  RETURN_NOT_OK(EncodeRangeBounds(range_bounds, schema, &bounds));
+  RETURN_NOT_OK(EncodeRangeBounds(range_bounds, range_hash_schemas, schema,
+                                  &bounds_with_hash_schemas));
   RETURN_NOT_OK(EncodeRangeSplits(split_rows, schema, &splits));
-  RETURN_NOT_OK(SplitRangeBounds(schema, std::move(splits), &bounds));
-
-  // Create a partition per range bound and hash bucket combination.
-  vector<Partition> new_partitions;
-  for (const Partition& base_partition : *partitions) {
-    for (const auto& bound : bounds) {
-      Partition partition = base_partition;
-      partition.partition_key_start_.append(bound.first);
-      partition.partition_key_end_.append(bound.second);
-      new_partitions.push_back(partition);
+  RETURN_NOT_OK(SplitRangeBounds(schema, std::move(splits), &bounds_with_hash_schemas));
+
+  if (!range_hash_schemas.empty()) {
+    // Hash schemas per range cannot be applied to split rows.
+    DCHECK_EQ(range_hash_schemas.size(), bounds_with_hash_schemas.size());
+    vector<Partition> result_partitions;
+    // Iterate through each bound and its hash schemas to generate hash partitions.
+    for (const auto& bound : bounds_with_hash_schemas) {
+      const auto& current_range_hash_schemas = bound.hash_schemas;
+      vector<Partition> current_bound_hash_partitions;
+      // If current bound's HashBucketSchema is empty, implies use of default table-wide schema.
+      // If not empty, generate hash partitions for all the provided hash schemas in this range.
+      if (current_range_hash_schemas.empty()) {
+        current_bound_hash_partitions = base_hash_partitions;
+      } else {
+        current_bound_hash_partitions = GenerateHashPartitions(current_range_hash_schemas,
+                                                               hash_encoder);
+      }
+      // Adds range part to partition key.
+      for (Partition& partition : current_bound_hash_partitions) {
+        partition.partition_key_start_.append(bound.lower);
+        partition.partition_key_end_.append(bound.upper);
+      }
+      result_partitions.insert(result_partitions.end(),
+                               std::make_move_iterator(current_bound_hash_partitions.begin()),
+                               std::make_move_iterator(current_bound_hash_partitions.end()));
     }
+    *partitions = std::move(result_partitions);
+  } else {
+    // Create a partition per range bound and hash bucket combination.
+    vector<Partition> new_partitions;
+    for (const Partition &base_partition : base_hash_partitions) {
+      for (const auto& bound : bounds_with_hash_schemas) {
+        Partition partition = base_partition;
+        partition.partition_key_start_.append(bound.lower);
+        partition.partition_key_end_.append(bound.upper);
+        new_partitions.push_back(partition);
+      }
+    }
+    *partitions = std::move(new_partitions);
   }
-  partitions->swap(new_partitions);
-
   // Note: the following discussion and logic only takes effect when the table's
   // partition schema includes at least one hash bucket component, and the
   // absolute upper and/or absolute lower range bound is unbounded.
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index bf57d0f..22a065f 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_COMMON_PARTITION_H
-#define KUDU_COMMON_PARTITION_H
+#pragma once
 
 #include <cstdint>
 #include <string>
@@ -29,13 +28,21 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
+namespace google {
+namespace protobuf {
+template <typename T> class RepeatedPtrField;
+}  // namespace protobuf
+}  // namespace google
+
 namespace kudu {
 
 class Arena;
 class ConstContiguousRow;
 class KuduPartialRow;
-class PartitionSchemaPB;
 class PartitionPB;
+class PartitionSchemaPB;
+class PartitionSchemaPB_ColumnIdentifierPB;
+template <typename Buffer> class KeyEncoder;
 
 // A Partition describes the set of rows that a Tablet is responsible for
 // serving. Each tablet is assigned a single Partition.
@@ -146,6 +153,22 @@ class PartitionSchema {
     uint32_t seed;
   };
 
+  typedef std::vector<HashBucketSchema> HashBucketSchemas;
+  // Holds each bound's HashBucketSchemas.
+  typedef std::vector<HashBucketSchemas> RangeHashSchema;
+
+  struct RangeWithHashSchemas {
+    std::string lower;
+    std::string upper;
+    HashBucketSchemas hash_schemas;
+  };
+
+  // Extracts the column IDs from a protobuf repeated field of column identifiers.
+  static Status ExtractColumnIds(
+      const google::protobuf::RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>& identifiers,
+      const Schema& schema,
+      std::vector<ColumnId>* column_ids);
+
   // Deserializes a protobuf message into a partition schema.
   static Status FromPB(const PartitionSchemaPB& pb,
                        const Schema& schema,
@@ -168,9 +191,14 @@ class PartitionSchema {
   // of resulting partitions is the product of the number of hash buckets for
   // each hash bucket component, multiplied by
   // (split_rows.size() + max(1, range_bounds.size())).
+  // 'range_hash_schema' contains each range's HashBucketSchemas,
+  // its order corresponds to the bounds in 'range_bounds'.
+  // If 'range_hash_schemas' is empty, the table wide hash schema is used per range.
+  // Size of 'range_hash_schemas' and 'range_bounds' are equal if 'range_hash_schema' isn't empty.
   Status CreatePartitions(const std::vector<KuduPartialRow>& split_rows,
                           const std::vector<std::pair<KuduPartialRow,
                                                       KuduPartialRow>>& range_bounds,
+                          const RangeHashSchema& range_hash_schemas,
                           const Schema& schema,
                           std::vector<Partition>* partitions) const WARN_UNUSED_RESULT;
 
@@ -263,7 +291,7 @@ class PartitionSchema {
     return range_schema_;
   }
 
-  const std::vector<HashBucketSchema>& hash_partition_schemas() const {
+  const HashBucketSchemas& hash_partition_schemas() const {
     return hash_bucket_schemas_;
   }
 
@@ -344,7 +372,7 @@ class PartitionSchema {
   void AppendRangeDebugStringComponentsOrMin(const KuduPartialRow& row,
                                              std::vector<std::string>* components) const;
 
-  // Returns the stringified hash and range schema componenets of the partition
+  // Returns the stringified hash and range schema components of the partition
   // schema.
   //
   // Partition schemas are considered metadata, so no redaction will happen on
@@ -372,6 +400,10 @@ class PartitionSchema {
   // appropriate error code for an invalid partition schema.
   Status Validate(const Schema& schema) const;
 
+  // Generates hash partitions for each combination of hash buckets in hash_schemas.
+  static std::vector<Partition> GenerateHashPartitions(const HashBucketSchemas& hash_schemas,
+                                                       const KeyEncoder<std::string>& hash_encoder);
+
   // Validates the split rows, converts them to partition key form, and inserts
   // them into splits in sorted order.
   Status EncodeRangeSplits(const std::vector<KuduPartialRow>& split_rows,
@@ -379,30 +411,30 @@ class PartitionSchema {
                            std::vector<std::string>* splits) const;
 
   // Validates the range bounds, converts them to partition key form, and
-  // inserts them into encoded_range_partitions in sorted order.
+  // inserts them into 'bounds_with_hash_schemas' in sorted order. The hash schemas
+  // per range are stored within 'range_hash_schemas'. If 'range_hash_schemas' is empty,
+  // it indicates that the table wide hash schema will be used per range.
   Status EncodeRangeBounds(const std::vector<std::pair<KuduPartialRow,
                                                        KuduPartialRow>>& range_bounds,
+                           const RangeHashSchema& range_hash_schemas,
                            const Schema& schema,
-                           std::vector<std::pair<std::string,
-                                                 std::string>>* encoded_range_bounds) const;
+                           std::vector<RangeWithHashSchemas>* bounds_with_hash_schemas) const;
 
-  // Splits the encoded range bounds by the split points. The splits and bounds
-  // must be sorted. If `bounds` is empty, then a single unbounded range is
-  // assumed. If any of the splits falls outside of the bounds then an
-  // InvalidArgument status is returned.
+  // Splits the encoded range bounds by the split points. The splits and bounds within
+  // 'bounds_with_hash_schemas' must be sorted. If `bounds_with_hash_schemas` is empty,
+  // then a single unbounded range is assumed. If any of the splits falls outside
+  // of the bounds, then an InvalidArgument status is returned.
   Status SplitRangeBounds(const Schema& schema,
                           std::vector<std::string> splits,
-                          std::vector<std::pair<std::string, std::string>>* bounds) const;
+                          std::vector<RangeWithHashSchemas>* bounds_with_hash_schemas) const;
 
   // Increments a range partition key, setting 'increment' to true if the
   // increment succeeds, or false if all range partition columns are already the
   // maximum value. Unset columns will be incremented to increment(min_value).
   Status IncrementRangePartitionKey(KuduPartialRow* row, bool* increment) const;
 
-  std::vector<HashBucketSchema> hash_bucket_schemas_;
+  HashBucketSchemas hash_bucket_schemas_;
   RangeSchema range_schema_;
 };
 
 } // namespace kudu
-
-#endif
diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc
index e541951..45c2f34 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -114,7 +114,7 @@ TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) {
   ASSERT_OK(split2.SetInt8("c", 10));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, schema, &partitions));
 
   // Creates a scan with optional lower and upper bounds, and checks that the
   // expected number of tablets are pruned.
@@ -236,7 +236,7 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
   ASSERT_OK(split2.SetStringCopy("b", "r"));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, schema, &partitions));
 
   // Applies the specified lower and upper bound primary keys against the
   // schema, and checks that the expected number of partitions are pruned.
@@ -349,7 +349,7 @@ TEST_F(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning) {
   ASSERT_OK(split.SetInt8("b", 0));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ split }, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ split }, {}, {}, schema, &partitions));
 
   // Applies the specified lower and upper bound primary keys against the
   // schema, and checks that the expected number of partitions are pruned.
@@ -439,7 +439,7 @@ TEST_F(PartitionPrunerTest, TestRangePruning) {
   ASSERT_OK(split2.SetStringCopy("b", "r"));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, {}, {}, schema, &partitions));
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
@@ -617,7 +617,8 @@ TEST_F(PartitionPrunerTest, TestHashPruning) {
     ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
     vector<Partition> partitions;
-    ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, schema, &partitions));
+    ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {},
+                                                       schema, &partitions));
 
 
   // Applies the specified predicates to a scan and checks that the expected
@@ -706,7 +707,8 @@ TEST_F(PartitionPrunerTest, TestInListHashPruning) {
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {},
+                                                     schema, &partitions));
 
 
   // Applies the specified predicates to a scan and checks that the expected
@@ -794,7 +796,8 @@ TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) {
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), {}, {},
+                                                     schema, &partitions));
 
 
   // Applies the specified predicates to a scan and checks that the expected
@@ -899,8 +902,8 @@ TEST_F(PartitionPrunerTest, TestPruning) {
   ASSERT_OK(split.SetUnixTimeMicros("time", 10));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>{ split },
-                                              {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>{ split }, {}, {},
+                                                     schema, &partitions));
   ASSERT_EQ(4, partitions.size());
 
   // Applies the specified predicates to a scan and checks that the expected
@@ -1012,7 +1015,7 @@ TEST_F(PartitionPrunerTest, TestKudu2173) {
   KuduPartialRow split1(&schema);
   ASSERT_OK(split1.SetInt8("a", 10));
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ split1 }, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ split1 }, {}, {}, schema, &partitions));
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
diff --git a/src/kudu/common/scan_spec-test.cc b/src/kudu/common/scan_spec-test.cc
index 9357977..2f51824 100644
--- a/src/kudu/common/scan_spec-test.cc
+++ b/src/kudu/common/scan_spec-test.cc
@@ -413,7 +413,7 @@ TEST_F(CompositeIntKeysTest, TestOneHashKeyInListHashPruning) {
                           &partition_schema);
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions));
   ASSERT_EQ(3, partitions.size());
 
   // clone scan_spec for different partition.
@@ -466,7 +466,7 @@ TEST_F(CompositeIntKeysTest, TestHashKeyInListHashPruningEmptyDetect) {
                           &partition_schema);
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions));
   ASSERT_EQ(3, partitions.size());
 
   // clone scan_spec for different partition.
@@ -519,7 +519,7 @@ TEST_F(CompositeIntKeysTest, TestMultiHashKeyOneColumnInListHashPruning) {
                           &partition_schema);
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions));
   ASSERT_EQ(9, partitions.size());
 
   // clone scan_spec for different partition.
@@ -630,7 +630,7 @@ TEST_F(CompositeIntKeysTest, TesMultiHashColumnsInListHashPruning) {
                           &partition_schema);
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions));
   ASSERT_EQ(3, partitions.size());
 
   // clone scan_spec for different partition.
@@ -686,7 +686,7 @@ TEST_F(CompositeIntKeysTest, TesMultiHashKeyMultiHashInListHashPruning) {
                           &partition_schema);
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions));
   ASSERT_EQ(9, partitions.size());
 
   // clone scan_spec for different partition.
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 2134716..aa28d48 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -1016,7 +1016,7 @@ Status GetPartitionForTxnStatusTablet(int64_t start_txn_id, int64_t end_txn_id,
   RETURN_NOT_OK(upper_bound.SetInt64(TxnStatusTablet::kTxnIdColName, end_txn_id));
   vector<Partition> ps;
   RETURN_NOT_OK(pschema.CreatePartitions(/*split_rows=*/{},
-      { std::make_pair(lower_bound, upper_bound) }, schema, &ps));
+      { std::make_pair(lower_bound, upper_bound) }, {}, schema, &ps));
   *partition = ps[0];
   *partition_schema = pschema;
   return Status::OK();
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 5bad51c..c4eed38 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1721,7 +1721,8 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
 
   // Create partitions based on specified partition schema and split rows.
   vector<Partition> partitions;
-  RETURN_NOT_OK(partition_schema.CreatePartitions(split_rows, range_bounds, schema, &partitions));
+  RETURN_NOT_OK(partition_schema.CreatePartitions(split_rows, range_bounds,
+                                                  {}, schema, &partitions));
 
   // If they didn't specify a num_replicas, set it based on the default.
   if (!req.has_num_replicas()) {
@@ -2471,7 +2472,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
 
     vector<Partition> partitions;
     RETURN_NOT_OK(partition_schema.CreatePartitions(
-        {}, {{ *ops[0].split_row, *ops[1].split_row }}, schema, &partitions));
+        {}, {{ *ops[0].split_row, *ops[1].split_row }}, {}, schema, &partitions));
     switch (step.type()) {
       case AlterTableRequestPB::ADD_RANGE_PARTITION: {
         for (const Partition& partition : partitions) {
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index fa73b26..7e4699c 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -313,7 +313,7 @@ Status SysCatalogTable::CreateNew(FsManager *fs_manager) {
 
   vector<KuduPartialRow> split_rows;
   vector<Partition> partitions;
-  RETURN_NOT_OK(partition_schema.CreatePartitions(split_rows, {}, schema, &partitions));
+  RETURN_NOT_OK(partition_schema.CreatePartitions(split_rows, {}, {}, schema, &partitions));
   DCHECK_EQ(1, partitions.size());
 
   RETURN_NOT_OK(tablet::TabletMetadata::CreateNew(fs_manager,
diff --git a/src/kudu/tablet/tablet-harness.h b/src/kudu/tablet/tablet-harness.h
index b026554..9b23445 100644
--- a/src/kudu/tablet/tablet-harness.h
+++ b/src/kudu/tablet/tablet-harness.h
@@ -53,7 +53,7 @@ static std::pair<PartitionSchema, Partition> CreateDefaultPartition(const Schema
 
   // Create the tablet partitions.
   std::vector<Partition> partitions;
-  CHECK_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+  CHECK_OK(partition_schema.CreatePartitions({}, {}, {}, schema, &partitions));
   CHECK_EQ(1, partitions.size());
   return std::make_pair(partition_schema, partitions[0]);
 }
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 572edd3..8673ba5 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -3887,7 +3887,7 @@ TEST_F(TabletServerTest, TestWriteOutOfBounds) {
   ASSERT_OK(end_row.SetInt32("key", 20));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions({ start_row, end_row }, {}, schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions({ start_row, end_row }, {}, {}, schema, &partitions));
 
   ASSERT_EQ(3, partitions.size());