You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/08/04 22:55:56 UTC

[kudu] branch master updated: [pruning] KUDU-2671: Pruning compatible with custom hash schemas.

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new bca9d73  [pruning] KUDU-2671: Pruning compatible with custom hash schemas.
bca9d73 is described below

commit bca9d73f90d02209e67615c140cd9c5311a6d8fb
Author: Mahesh Reddy <mr...@cloudera.com>
AuthorDate: Wed Mar 24 17:47:14 2021 -0700

    [pruning] KUDU-2671: Pruning compatible with custom hash schemas.
    
    This patch introduces changes to the PartitionPruner class
    to be compatible with custom hash bucket schemas per range.
    
    There are three ways to set bounds on a scan.
    
    - Adding predicates (e.g. range/equality)
    - Setting lower and upper bound primary keys
    - Setting lower and upper bound partition keys
    
    This patch introduces changes that make the first two methods
    of setting bounds on a scan compatible with custom hash bucket
    schemas per range. The last way using partition keys is unstable
    and for internal use only. While it's not necessary for the last
    way to be compatible with per range hash bucket schemas, the
    entire pruning functionality will not be complete until
    PartitionPruner::RemovePartitionKeyRange() is modified.
    That work will be done in a follow up patch.
    
    Change-Id: I05c37495430f61a2c6f6012c72251138aee465b7
    Reviewed-on: http://gerrit.cloudera.org:8080/17643
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/common/partial_row.h            |    1 +
 src/kudu/common/partition_pruner-test.cc | 1008 +++++++++++++++++++++++-------
 src/kudu/common/partition_pruner.cc      |  377 ++++++-----
 src/kudu/common/partition_pruner.h       |   54 +-
 4 files changed, 1051 insertions(+), 389 deletions(-)

diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index 9983137..47033b1 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -658,6 +658,7 @@ class KUDU_EXPORT KuduPartialRow {
   FRIEND_TEST(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning);
   FRIEND_TEST(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning);
   FRIEND_TEST(PartitionPrunerTest, TestPrimaryKeyRangePruning);
+  FRIEND_TEST(PartitionPrunerTest, TestHashSchemasPerRangeWithPartialPrimaryKeyRangePruning);
   FRIEND_TEST(RowOperationsTest, ProjectionTestWholeSchemaSpecified);
   FRIEND_TEST(RowOperationsTest, TestProjectUpdates);
   FRIEND_TEST(RowOperationsTest, TestProjectDeletes);
diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc
index 45c2f34..1789154 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -20,13 +20,12 @@
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
-#include <memory>
 #include <string>
+#include <tuple>
 #include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
-#include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/common/column_predicate.h"
@@ -35,12 +34,13 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/row.h"
+#include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/slice.h"
-#include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
@@ -48,15 +48,34 @@ using boost::optional;
 using std::count_if;
 using std::get;
 using std::make_tuple;
-using std::move;
+using std::pair;
 using std::string;
 using std::tuple;
-using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
 
 class PartitionPrunerTest : public KuduTest {
+ public:
+  typedef tuple<vector<string>, int32_t, uint32_t> ColumnNamesNumBucketsAndSeed;
+  typedef pair<string, string> ColumnNameAndStringValue;
+  typedef pair<string, int8_t> ColumnNameAndIntValue;
+
+  static void CreatePartitionSchemaPB(
+      const vector<string>& range_columns,
+      const vector<ColumnNamesNumBucketsAndSeed>& table_hash_schema,
+      PartitionSchemaPB* partition_schema_pb);
+
+  static void AddRangePartitionWithSchema(
+      const Schema& schema,
+      const vector<ColumnNameAndStringValue>& lower_string_cols,
+      const vector<ColumnNameAndStringValue>& upper_string_cols,
+      const vector<ColumnNameAndIntValue>& lower_int_cols,
+      const vector<ColumnNameAndIntValue>& upper_int_cols,
+      const vector<ColumnNamesNumBucketsAndSeed>& hash_schemas,
+      vector<pair<KuduPartialRow, KuduPartialRow>>* bounds,
+      PartitionSchema::PerRangeHashBucketSchemas* range_hash_schemas,
+      PartitionSchemaPB* pb);
 };
 
 void CheckPrunedPartitions(const Schema& schema,
@@ -75,8 +94,12 @@ void CheckPrunedPartitions(const Schema& schema,
 
   SCOPED_TRACE(strings::Substitute("schema: $0", schema.ToString()));
   SCOPED_TRACE(strings::Substitute("partition schema: $0", partition_schema.DebugString(schema)));
-  SCOPED_TRACE(strings::Substitute("partition pruner: $0",
-                                   pruner.ToString(schema, partition_schema)));
+  // TODO(mreddy): once PartitionSchema::PartitionKeyDebugString() is compatible with per range
+  // hash schemas, remove this if check
+  if (partition_schema.ranges_with_hash_schemas().empty()) {
+    SCOPED_TRACE(strings::Substitute("partition pruner: $0",
+                                     pruner.ToString(schema, partition_schema)));
+  }
   SCOPED_TRACE(strings::Substitute("optimized scan spec: $0", opt_spec.ToString(schema)));
   SCOPED_TRACE(strings::Substitute("original  scan spec: $0", spec.ToString(schema)));
 
@@ -84,8 +107,74 @@ void CheckPrunedPartitions(const Schema& schema,
                                    [&] (const Partition& partition) {
                                      return pruner.ShouldPrune(partition);
                                    });
+
   ASSERT_EQ(remaining_tablets, partitions.size() - pruned_partitions);
-  ASSERT_EQ(pruner_ranges, pruner.NumRangesRemainingForTests());
+  ASSERT_EQ(pruner_ranges, pruner.NumRangesRemaining());
+}
+
+
+void PartitionPrunerTest::CreatePartitionSchemaPB(
+    const vector<string>& range_columns,
+    const vector<ColumnNamesNumBucketsAndSeed>& table_hash_schema,
+    PartitionSchemaPB* partition_schema_pb) {
+  auto* range_schema = partition_schema_pb->mutable_range_schema();
+  for (const auto& range_column : range_columns) {
+    range_schema->add_columns()->set_name(range_column);
+  }
+  for (const auto& hash_schema : table_hash_schema) {
+    auto* hash_schema_component = partition_schema_pb->add_hash_bucket_schemas();
+    for (const auto& hash_schema_columns : get<0>(hash_schema)) {
+      hash_schema_component->add_columns()->set_name(hash_schema_columns);
+    }
+    hash_schema_component->set_num_buckets(get<1>(hash_schema));
+    hash_schema_component->set_seed(get<2>(hash_schema));
+  }
+}
+
+void PartitionPrunerTest::AddRangePartitionWithSchema(
+    const Schema& schema,
+    const vector<ColumnNameAndStringValue>& lower_string_cols,
+    const vector<ColumnNameAndStringValue>& upper_string_cols,
+    const vector<ColumnNameAndIntValue>& lower_int_cols,
+    const vector<ColumnNameAndIntValue>& upper_int_cols,
+    const vector<ColumnNamesNumBucketsAndSeed>& hash_schemas,
+    vector<pair<KuduPartialRow, KuduPartialRow>>* bounds,
+    PartitionSchema::PerRangeHashBucketSchemas* range_hash_schemas,
+    PartitionSchemaPB* pb) {
+  RowOperationsPBEncoder encoder(pb->add_range_bounds());
+  KuduPartialRow lower(&schema);
+  KuduPartialRow upper(&schema);
+  for (const auto& bound : lower_string_cols) {
+    ASSERT_OK(lower.SetStringCopy(bound.first, bound.second));
+  }
+  for (const auto& bound : upper_string_cols) {
+    ASSERT_OK(upper.SetStringCopy(bound.first, bound.second));
+  }
+  for (const auto& bound : lower_int_cols) {
+    ASSERT_OK(lower.SetInt8(bound.first, bound.second));
+  }
+  for (const auto& bound : upper_int_cols) {
+    ASSERT_OK(upper.SetInt8(bound.first, bound.second));
+  }
+  encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+  encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+  auto* range_hash_component = pb->add_range_hash_schemas();
+  PartitionSchema::HashBucketSchemas hash_bucket_schemas;
+  for (const auto& hash_schema : hash_schemas) {
+    auto* hash_component_pb = range_hash_component->add_hash_schemas();
+    PartitionSchema::HashBucketSchema hash_bucket_schema;
+    for (const auto& hash_schema_columns : get<0>(hash_schema)) {
+      hash_component_pb->add_columns()->set_name(hash_schema_columns);
+      hash_bucket_schema.column_ids.emplace_back(schema.find_column(hash_schema_columns));
+    }
+    hash_component_pb->set_num_buckets(get<1>(hash_schema));
+    hash_bucket_schema.num_buckets = get<1>(hash_schema);
+    hash_component_pb->set_seed(get<2>(hash_schema));
+    hash_bucket_schema.seed = get<2>(hash_schema);
+    hash_bucket_schemas.emplace_back(hash_bucket_schema);
+  }
+  range_hash_schemas->emplace_back(hash_bucket_schemas);
+  bounds->emplace_back(lower, upper);
 }
 
 TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) {
@@ -118,7 +207,7 @@ TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) {
 
   // Creates a scan with optional lower and upper bounds, and checks that the
   // expected number of tablets are pruned.
-  auto Check = [&] (optional<tuple<int8_t, int8_t, int8_t>> lower,
+  const auto check = [&] (optional<tuple<int8_t, int8_t, int8_t>> lower,
                     optional<tuple<int8_t, int8_t, int8_t>> upper,
                     size_t remaining_tablets) {
     ScanSpec spec;
@@ -128,80 +217,80 @@ TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) {
     EncodedKey* enc_upper_bound = nullptr;
 
     if (lower) {
-      CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
-      CHECK_OK(lower_bound.SetInt8("b", get<1>(*lower)));
-      CHECK_OK(lower_bound.SetInt8("c", get<2>(*lower)));
+      ASSERT_OK(lower_bound.SetInt8("a", get<0>(*lower)));
+      ASSERT_OK(lower_bound.SetInt8("b", get<1>(*lower)));
+      ASSERT_OK(lower_bound.SetInt8("c", get<2>(*lower)));
       ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
       enc_lower_bound = EncodedKey::FromContiguousRow(row, &arena);
       spec.SetLowerBoundKey(enc_lower_bound);
     }
     if (upper) {
-      CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
-      CHECK_OK(upper_bound.SetInt8("b", get<1>(*upper)));
-      CHECK_OK(upper_bound.SetInt8("c", get<2>(*upper)));
+      ASSERT_OK(upper_bound.SetInt8("a", get<0>(*upper)));
+      ASSERT_OK(upper_bound.SetInt8("b", get<1>(*upper)));
+      ASSERT_OK(upper_bound.SetInt8("c", get<2>(*upper)));
       ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
       enc_upper_bound = EncodedKey::FromContiguousRow(row, &arena);
       spec.SetExclusiveUpperBoundKey(enc_upper_bound);
     }
     size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
-                          remaining_tablets, pruner_ranges);
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
   };
 
   // No bounds
-  Check(boost::none, boost::none, 3);
+  check(boost::none, boost::none, 3);
 
   // PK < (-1, min, min)
-  Check(boost::none,
+  check(boost::none,
         make_tuple<int8_t, int8_t, int8_t>(-1, INT8_MIN, INT8_MIN),
         1);
 
   // PK < (10, 10, 10)
-  Check(boost::none,
+  check(boost::none,
         make_tuple<int8_t, int8_t, int8_t>(10, 10, 10),
         2);
 
   // PK < (100, min, min)
-  Check(boost::none,
+  check(boost::none,
         make_tuple<int8_t, int8_t, int8_t>(100, INT8_MIN, INT8_MIN),
         3);
 
   // PK >= (-10, -10, -10)
-  Check(make_tuple<int8_t, int8_t, int8_t>(-10, -10, -10),
+  check(make_tuple<int8_t, int8_t, int8_t>(-10, -10, -10),
         boost::none,
         3);
 
   // PK >= (0, 0, 0)
-  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
+  check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
         boost::none,
         2);
 
   // PK >= (100, 0, 0)
-  Check(make_tuple<int8_t, int8_t, int8_t>(100, 0, 0),
+  check(make_tuple<int8_t, int8_t, int8_t>(100, 0, 0),
         boost::none,
         1);
 
   // PK >= (-10, 0, 0)
   // PK  < (100, 0, 0)
-  Check(make_tuple<int8_t, int8_t, int8_t>(-10, 0, 0),
+  check(make_tuple<int8_t, int8_t, int8_t>(-10, 0, 0),
         make_tuple<int8_t, int8_t, int8_t>(100, 0, 0),
         3);
 
   // PK >= (0, 0, 0)
   // PK  < (10, 10, 10)
-  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
+  check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
         make_tuple<int8_t, int8_t, int8_t>(10, 10, 10),
         1);
 
   // PK >= (0, 0, 0)
   // PK  < (10, 10, 11)
-  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
+  check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
         make_tuple<int8_t, int8_t, int8_t>(10, 10, 11),
         2);
 
   // PK  < (0, 0, 0)
   // PK >= (10, 10, 11)
-  Check(make_tuple<int8_t, int8_t, int8_t>(10, 10, 11),
+  check(make_tuple<int8_t, int8_t, int8_t>(10, 10, 11),
         make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
         0);
 }
@@ -222,9 +311,7 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
 
   PartitionSchema partition_schema;
   auto pb = PartitionSchemaPB();
-  auto range_schema = pb.mutable_range_schema();
-  range_schema->add_columns()->set_name("a");
-  range_schema->add_columns()->set_name("b");
+  CreatePartitionSchemaPB({"a", "b"}, {}, &pb);
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   KuduPartialRow split1(&schema);
@@ -240,7 +327,7 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
 
   // Applies the specified lower and upper bound primary keys against the
   // schema, and checks that the expected number of partitions are pruned.
-  auto Check = [&] (optional<tuple<int8_t, string, string>> lower,
+  const auto check = [&] (optional<tuple<int8_t, string, string>> lower,
                     optional<tuple<int8_t, string, string>> upper,
                     size_t remaining_tablets ) {
     ScanSpec spec;
@@ -250,76 +337,76 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
     EncodedKey* enc_upper_bound = nullptr;
 
     if (lower) {
-      CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
-      CHECK_OK(lower_bound.SetStringCopy("b", get<1>(*lower)));
-      CHECK_OK(lower_bound.SetStringCopy("c", get<2>(*lower)));
+      ASSERT_OK(lower_bound.SetInt8("a", get<0>(*lower)));
+      ASSERT_OK(lower_bound.SetStringCopy("b", get<1>(*lower)));
+      ASSERT_OK(lower_bound.SetStringCopy("c", get<2>(*lower)));
       ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
       enc_lower_bound = EncodedKey::FromContiguousRow(row, &arena);
       spec.SetLowerBoundKey(enc_lower_bound);
     }
     if (upper) {
-      CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
-      CHECK_OK(upper_bound.SetStringCopy("b", get<1>(*upper)));
-      CHECK_OK(upper_bound.SetStringCopy("c", get<2>(*upper)));
+      ASSERT_OK(upper_bound.SetInt8("a", get<0>(*upper)));
+      ASSERT_OK(upper_bound.SetStringCopy("b", get<1>(*upper)));
+      ASSERT_OK(upper_bound.SetStringCopy("c", get<2>(*upper)));
       ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
       enc_upper_bound = EncodedKey::FromContiguousRow(row, &arena);
       spec.SetExclusiveUpperBoundKey(enc_upper_bound);
     }
     size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
-                          remaining_tablets, pruner_ranges);
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
   };
 
   // No bounds
-  Check(boost::none, boost::none, 3);
+  check(boost::none, boost::none, 3);
 
-  // PK < (-1, min, "")
-  Check(boost::none, make_tuple<int8_t, string, string>(-1, "", ""), 1);
+  // PK < (-1, "", "")
+  check(boost::none, make_tuple<int8_t, string, string>(-1, "", ""), 1);
 
   // PK < (10, "r", "")
-  Check(boost::none, make_tuple<int8_t, string, string>(10, "r", ""), 2);
+  check(boost::none, make_tuple<int8_t, string, string>(10, "r", ""), 2);
 
   // PK < (10, "r", "z")
-  Check(boost::none, make_tuple<int8_t, string, string>(10, "r", "z"), 3);
+  check(boost::none, make_tuple<int8_t, string, string>(10, "r", "z"), 3);
 
-  // PK < (100, min, "")
-  Check(boost::none, make_tuple<int8_t, string, string>(100, "", ""), 3);
+  // PK < (100, "", "")
+  check(boost::none, make_tuple<int8_t, string, string>(100, "", ""), 3);
 
   // PK >= (-10, "m", "")
-  Check(make_tuple<int8_t, string, string>(-10, "m", ""), boost::none, 3);
+  check(make_tuple<int8_t, string, string>(-10, "m", ""), boost::none, 3);
 
   // PK >= (0, "", "")
-  Check(make_tuple<int8_t, string, string>(0, "", ""), boost::none, 3);
+  check(make_tuple<int8_t, string, string>(0, "", ""), boost::none, 3);
 
   // PK >= (0, "m", "")
-  Check(make_tuple<int8_t, string, string>(0, "m", ""), boost::none, 2);
+  check(make_tuple<int8_t, string, string>(0, "m", ""), boost::none, 2);
 
   // PK >= (100, "", "")
-  Check(make_tuple<int8_t, string, string>(100, "", ""), boost::none, 1);
+  check(make_tuple<int8_t, string, string>(100, "", ""), boost::none, 1);
 
-  // PK >= (-10, 0, "")
-  // PK  < (100, 0, "")
-  Check(make_tuple<int8_t, string, string>(-10, "", ""),
+  // PK >= (-10, "", "")
+  // PK  < (100, "", "")
+  check(make_tuple<int8_t, string, string>(-10, "", ""),
         make_tuple<int8_t, string, string>(100, "", ""), 3);
 
   // PK >= (0, "m", "")
   // PK  < (10, "r", "")
-  Check(make_tuple<int8_t, string, string>(0, "m", ""),
+  check(make_tuple<int8_t, string, string>(0, "m", ""),
         make_tuple<int8_t, string, string>(10, "r", ""), 1);
 
   // PK >= (0, "m", "")
   // PK  < (10, "r", "z")
-  Check(make_tuple<int8_t, string, string>(0, "m", ""),
+  check(make_tuple<int8_t, string, string>(0, "m", ""),
         make_tuple<int8_t, string, string>(10, "r", "z"), 2);
 
   // PK >= (0, "", "")
   // PK  < (10, "m", "z")
-  Check(make_tuple<int8_t, string, string>(0, "", ""),
+  check(make_tuple<int8_t, string, string>(0, "", ""),
         make_tuple<int8_t, string, string>(10, "m", "z"), 2);
 
   // PK >= (10, "m", "")
   // PK  < (10, "m", "z")
-  Check(make_tuple<int8_t, string, string>(10, "m", ""),
+  check(make_tuple<int8_t, string, string>(10, "m", ""),
         make_tuple<int8_t, string, string>(10, "m", "z"), 1);
 }
 
@@ -339,9 +426,7 @@ TEST_F(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning) {
 
   PartitionSchema partition_schema;
   auto pb = PartitionSchemaPB();
-  auto* range_schema = pb.mutable_range_schema();
-  range_schema->add_columns()->set_name("a");
-  range_schema->add_columns()->set_name("b");
+  CreatePartitionSchemaPB({"a", "b"}, {}, &pb);
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   KuduPartialRow split(&schema);
@@ -353,7 +438,7 @@ TEST_F(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning) {
 
   // Applies the specified lower and upper bound primary keys against the
   // schema, and checks that the expected number of partitions are pruned.
-  auto Check = [&] (optional<tuple<int8_t, int8_t, int8_t>> lower,
+  const auto check = [&] (optional<tuple<int8_t, int8_t, int8_t>> lower,
                     optional<tuple<int8_t, int8_t, int8_t>> upper,
                     size_t remaining_tablets ) {
     ScanSpec spec;
@@ -363,52 +448,52 @@ TEST_F(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning) {
     EncodedKey* enc_upper_bound = nullptr;
 
     if (lower) {
-      CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
-      CHECK_OK(lower_bound.SetInt8("b", get<1>(*lower)));
-      CHECK_OK(lower_bound.SetInt8("c", get<2>(*lower)));
+      ASSERT_OK(lower_bound.SetInt8("a", get<0>(*lower)));
+      ASSERT_OK(lower_bound.SetInt8("b", get<1>(*lower)));
+      ASSERT_OK(lower_bound.SetInt8("c", get<2>(*lower)));
       ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
       enc_lower_bound = EncodedKey::FromContiguousRow(row, &arena);
       spec.SetLowerBoundKey(enc_lower_bound);
     }
     if (upper) {
-      CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
-      CHECK_OK(upper_bound.SetInt8("b", get<1>(*upper)));
-      CHECK_OK(upper_bound.SetInt8("c", get<2>(*upper)));
+      ASSERT_OK(upper_bound.SetInt8("a", get<0>(*upper)));
+      ASSERT_OK(upper_bound.SetInt8("b", get<1>(*upper)));
+      ASSERT_OK(upper_bound.SetInt8("c", get<2>(*upper)));
       ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
       enc_upper_bound = EncodedKey::FromContiguousRow(row, &arena);
       spec.SetExclusiveUpperBoundKey(enc_upper_bound);
     }
     size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
-                          remaining_tablets, pruner_ranges);
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
   };
 
   // No bounds
-  Check(boost::none, boost::none, 2);
+  check(boost::none, boost::none, 2);
 
   // PK < (0, 0, min)
-  Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(0, INT8_MIN, INT8_MIN), 1);
+  check(boost::none, make_tuple<int8_t, int8_t, int8_t>(0, 0, INT8_MIN), 1);
 
   // PK < (0, 0, 0);
-  Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(0, 0, 0), 2);
+  check(boost::none, make_tuple<int8_t, int8_t, int8_t>(0, 0, 0), 2);
 
   // PK < (0, max, 0);
-  Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, 0), 2);
+  check(boost::none, make_tuple<int8_t, int8_t, int8_t>(0, INT8_MAX, 0), 2);
 
   // PK < (max, max, min);
-  Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, INT8_MIN), 2);
+  check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, INT8_MIN), 2);
 
   // PK < (max, max, 0);
-  Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, 0), 2);
+  check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, 0), 2);
 
   // PK >= (0, 0, 0);
-  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0), boost::none, 1);
+  check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0), boost::none, 1);
 
   // PK >= (0, 0, -1);
-  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, -1), boost::none, 1);
+  check(make_tuple<int8_t, int8_t, int8_t>(0, 0, -1), boost::none, 1);
 
   // PK >= (0, 0, min);
-  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, INT8_MIN), boost::none, 1);
+  check(make_tuple<int8_t, int8_t, int8_t>(0, 0, INT8_MIN), boost::none, 1);
 }
 
 TEST_F(PartitionPrunerTest, TestRangePruning) {
@@ -425,9 +510,7 @@ TEST_F(PartitionPrunerTest, TestRangePruning) {
 
   PartitionSchema partition_schema;
   auto pb = PartitionSchemaPB();
-  auto range_schema = pb.mutable_range_schema();
-  range_schema->add_columns()->set_name("c");
-  range_schema->add_columns()->set_name("b");
+  CreatePartitionSchemaPB({"c", "b"}, {}, &pb);
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   KuduPartialRow split1(&schema);
@@ -443,7 +526,7 @@ TEST_F(PartitionPrunerTest, TestRangePruning) {
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
-  auto Check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
+  const auto check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
     ScanSpec spec;
 
     for (const auto& pred : predicates) {
@@ -451,17 +534,17 @@ TEST_F(PartitionPrunerTest, TestRangePruning) {
     }
 
     size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
-                          remaining_tablets, pruner_ranges);
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
   };
 
-  int8_t neg_ten = -10;
-  int8_t zero = 0;
-  int8_t five = 5;
-  int8_t ten = 10;
-  int8_t hundred = 100;
-  int8_t min = INT8_MIN;
-  int8_t max = INT8_MAX;
+  constexpr int8_t neg_ten = -10;
+  constexpr int8_t zero = 0;
+  constexpr int8_t five = 5;
+  constexpr int8_t ten = 10;
+  constexpr int8_t hundred = 100;
+  constexpr int8_t min = INT8_MIN;
+  constexpr int8_t max = INT8_MAX;
 
   Slice empty = "";
   Slice a = "a";
@@ -471,124 +554,124 @@ TEST_F(PartitionPrunerTest, TestRangePruning) {
   Slice z = "z";
 
   // No Bounds
-  Check({}, 3);
+  check({}, 3);
 
   // c < -10
-  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &neg_ten) }, 1);
+  check({ ColumnPredicate::Range(schema.column(2), nullptr, &neg_ten) }, 1);
 
   // c = -10
-  Check({ ColumnPredicate::Equality(schema.column(2), &neg_ten) }, 1);
+  check({ ColumnPredicate::Equality(schema.column(2), &neg_ten) }, 1);
 
   // c < 10
-  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &ten) }, 2);
+  check({ ColumnPredicate::Range(schema.column(2), nullptr, &ten) }, 2);
 
   // c < 100
-  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &hundred) }, 3);
+  check({ ColumnPredicate::Range(schema.column(2), nullptr, &hundred) }, 3);
 
   // c < MIN
-  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &min) }, 0);
+  check({ ColumnPredicate::Range(schema.column(2), nullptr, &min) }, 0);
 
   // c < MAX
-  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &max) }, 3);
+  check({ ColumnPredicate::Range(schema.column(2), nullptr, &max) }, 3);
 
   // c >= -10
-  Check({ ColumnPredicate::Range(schema.column(0), &neg_ten, nullptr) }, 3);
+  check({ ColumnPredicate::Range(schema.column(0), &neg_ten, nullptr) }, 3);
 
   // c >= 0
-  Check({ ColumnPredicate::Range(schema.column(2), &zero, nullptr) }, 3);
+  check({ ColumnPredicate::Range(schema.column(2), &zero, nullptr) }, 3);
 
   // c >= 5
-  Check({ ColumnPredicate::Range(schema.column(2), &five, nullptr) }, 2);
+  check({ ColumnPredicate::Range(schema.column(2), &five, nullptr) }, 2);
 
   // c >= 10
-  Check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr) }, 2);
+  check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr) }, 2);
 
   // c >= 100
-  Check({ ColumnPredicate::Range(schema.column(2), &hundred, nullptr) }, 1);
+  check({ ColumnPredicate::Range(schema.column(2), &hundred, nullptr) }, 1);
 
   // c >= MIN
-  Check({ ColumnPredicate::Range(schema.column(2), &min, nullptr) }, 3);
+  check({ ColumnPredicate::Range(schema.column(2), &min, nullptr) }, 3);
 
   // c >= MAX
-  Check({ ColumnPredicate::Range(schema.column(2), &max, nullptr) }, 1);
+  check({ ColumnPredicate::Range(schema.column(2), &max, nullptr) }, 1);
 
   // c = MIN
-  Check({ ColumnPredicate::Equality(schema.column(2), &min) }, 1);
+  check({ ColumnPredicate::Equality(schema.column(2), &min) }, 1);
 
   // c = MAX
-  Check({ ColumnPredicate::Equality(schema.column(2), &max) }, 1);
+  check({ ColumnPredicate::Equality(schema.column(2), &max) }, 1);
 
   // c >= -10
   // c < 0
-  Check({ ColumnPredicate::Range(schema.column(2), &neg_ten, &zero) }, 1);
+  check({ ColumnPredicate::Range(schema.column(2), &neg_ten, &zero) }, 1);
 
   // c >= 5
   // c < 100
-  Check({ ColumnPredicate::Range(schema.column(2), &five, &hundred) }, 2);
+  check({ ColumnPredicate::Range(schema.column(2), &five, &hundred) }, 2);
 
   // b = ""
-  Check({ ColumnPredicate::Equality(schema.column(1), &empty) }, 3);
+  check({ ColumnPredicate::Equality(schema.column(1), &empty) }, 3);
 
   // b >= "z"
-  Check({ ColumnPredicate::Range(schema.column(1), &z, nullptr) }, 3);
+  check({ ColumnPredicate::Range(schema.column(1), &z, nullptr) }, 3);
 
   // b < "a"
-  Check({ ColumnPredicate::Range(schema.column(1), nullptr, &a) }, 3);
+  check({ ColumnPredicate::Range(schema.column(1), nullptr, &a) }, 3);
 
   // b >= "m"
   // b < "z"
-  Check({ ColumnPredicate::Range(schema.column(1), &m, &z) }, 3);
+  check({ ColumnPredicate::Range(schema.column(1), &m, &z) }, 3);
 
   // c >= 10
   // b >= "r"
-  Check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr),
+  check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr),
           ColumnPredicate::Range(schema.column(1), &r, nullptr) },
         1);
 
   // c >= 10
   // b < "r"
-  Check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr),
+  check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr),
           ColumnPredicate::Range(schema.column(1), nullptr, &r) },
         2);
 
   // c = 10
   // b < "r"
-  Check({ ColumnPredicate::Equality(schema.column(2), &ten),
+  check({ ColumnPredicate::Equality(schema.column(2), &ten),
           ColumnPredicate::Range(schema.column(1), nullptr, &r) },
         1);
 
   // c < 0
   // b < "m"
-  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &zero),
+  check({ ColumnPredicate::Range(schema.column(2), nullptr, &zero),
           ColumnPredicate::Range(schema.column(1), nullptr, &m) },
         1);
 
   // c < 0
   // b < "z"
-  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &zero),
+  check({ ColumnPredicate::Range(schema.column(2), nullptr, &zero),
           ColumnPredicate::Range(schema.column(1), nullptr, &z) },
         1);
 
   // c = 0
   // b = "m\0"
-  Check({ ColumnPredicate::Equality(schema.column(2), &zero),
+  check({ ColumnPredicate::Equality(schema.column(2), &zero),
           ColumnPredicate::Equality(schema.column(1), &m0) },
         1);
 
   // c = 0
   // b < "m"
-  Check({ ColumnPredicate::Equality(schema.column(2), &zero),
+  check({ ColumnPredicate::Equality(schema.column(2), &zero),
           ColumnPredicate::Range(schema.column(1), nullptr, &m) },
         1);
 
   // c = 0
   // b < "m\0"
-  Check({ ColumnPredicate::Equality(schema.column(2), &zero),
+  check({ ColumnPredicate::Equality(schema.column(2), &zero),
           ColumnPredicate::Range(schema.column(1), nullptr, &m0) },
         2);
 
   // c IS NOT NULL
-  Check({ ColumnPredicate::IsNotNull(schema.column(2)) }, 3);
+  check({ ColumnPredicate::IsNotNull(schema.column(2)) }, 3);
 }
 
 TEST_F(PartitionPrunerTest, TestHashPruning) {
@@ -605,15 +688,8 @@ TEST_F(PartitionPrunerTest, TestHashPruning) {
 
     PartitionSchema partition_schema;
     auto pb = PartitionSchemaPB();
+    CreatePartitionSchemaPB({}, { {{"a"}, 2, 0}, {{"b", "c"}, 2, 0} }, &pb);
     pb.mutable_range_schema()->Clear();
-    auto hash_component_1 = pb.add_hash_bucket_schemas();
-    hash_component_1->add_columns()->set_name("a");
-    hash_component_1->set_num_buckets(2);
-    auto hash_component_2 = pb.add_hash_bucket_schemas();
-    hash_component_2->add_columns()->set_name("b");
-    hash_component_2->add_columns()->set_name("c");
-    hash_component_2->set_num_buckets(2);
-
     ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
     vector<Partition> partitions;
@@ -623,7 +699,7 @@ TEST_F(PartitionPrunerTest, TestHashPruning) {
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
-  auto Check = [&] (const vector<ColumnPredicate>& predicates,
+  const auto check = [&] (const vector<ColumnPredicate>& predicates,
                     size_t remaining_tablets,
                     size_t pruner_ranges) {
     ScanSpec spec;
@@ -632,44 +708,44 @@ TEST_F(PartitionPrunerTest, TestHashPruning) {
       spec.AddPredicate(pred);
     }
 
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
-                          remaining_tablets, pruner_ranges);
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
   };
 
-  int8_t zero = 0;
-  int8_t one = 1;
-  int8_t two = 2;
+  constexpr int8_t zero = 0;
+  constexpr int8_t one = 1;
+  constexpr int8_t two = 2;
 
   // No Bounds
-  Check({}, 4, 1);
+  check({}, 4, 1);
 
   // a = 0;
-  Check({ ColumnPredicate::Equality(schema.column(0), &zero) }, 2, 1);
+  check({ ColumnPredicate::Equality(schema.column(0), &zero) }, 2, 1);
 
   // a >= 0;
-  Check({ ColumnPredicate::Range(schema.column(0), &zero, nullptr) }, 4, 1);
+  check({ ColumnPredicate::Range(schema.column(0), &zero, nullptr) }, 4, 1);
 
   // a >= 0;
   // a < 1;
-  Check({ ColumnPredicate::Range(schema.column(0), &zero, &one) }, 2, 1);
+  check({ ColumnPredicate::Range(schema.column(0), &zero, &one) }, 2, 1);
 
   // a >= 0;
   // a < 2;
-  Check({ ColumnPredicate::Range(schema.column(0), &zero, &two) }, 4, 1);
+  check({ ColumnPredicate::Range(schema.column(0), &zero, &two) }, 4, 1);
 
   // b = 1;
-  Check({ ColumnPredicate::Equality(schema.column(1), &one) }, 4, 1);
+  check({ ColumnPredicate::Equality(schema.column(1), &one) }, 4, 1);
 
   // b = 1;
   // c = 2;
-  Check({ ColumnPredicate::Equality(schema.column(1), &one),
+  check({ ColumnPredicate::Equality(schema.column(1), &one),
           ColumnPredicate::Equality(schema.column(2), &two) },
         2, 2);
 
   // a = 0;
   // b = 1;
   // c = 2;
-  Check({ ColumnPredicate::Equality(schema.column(0), &zero),
+  check({ ColumnPredicate::Equality(schema.column(0), &zero),
           ColumnPredicate::Equality(schema.column(1), &one),
           ColumnPredicate::Equality(schema.column(2), &two) },
         1, 1);
@@ -690,20 +766,8 @@ TEST_F(PartitionPrunerTest, TestInListHashPruning) {
 
   PartitionSchema partition_schema;
   auto pb = PartitionSchemaPB();
-  auto hash_component_1 = pb.add_hash_bucket_schemas();
-  hash_component_1->add_columns()->set_name("a");
-  hash_component_1->set_num_buckets(3);
-  hash_component_1->set_seed(0);
-  auto hash_component_2 = pb.add_hash_bucket_schemas();
-  hash_component_2->add_columns()->set_name("b");
-  hash_component_2->set_num_buckets(3);
-  hash_component_2->set_seed(0);
-  auto hash_component_3 = pb.add_hash_bucket_schemas();
-  hash_component_3->add_columns()->set_name("c");
-  hash_component_3->set_num_buckets(3);
-  hash_component_3->set_seed(0);
+  CreatePartitionSchemaPB({}, { {{"a"}, 3, 0}, {{"b"}, 3, 0}, {{"c"}, 3, 0} }, &pb);
   pb.mutable_range_schema()->clear_columns();
-
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   vector<Partition> partitions;
@@ -713,7 +777,7 @@ TEST_F(PartitionPrunerTest, TestInListHashPruning) {
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
-  auto Check = [&] (const vector<ColumnPredicate>& predicates,
+  const auto check = [&] (const vector<ColumnPredicate>& predicates,
                     size_t remaining_tablets,
                     size_t pruner_ranges) {
     ScanSpec spec;
@@ -722,14 +786,14 @@ TEST_F(PartitionPrunerTest, TestInListHashPruning) {
       spec.AddPredicate(pred);
     }
 
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
-                          remaining_tablets, pruner_ranges);
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
   };
 
   // zero, one, eight are in different buckets when bucket number is 3 and seed is 0.
-  int8_t zero = 0;
-  int8_t one = 1;
-  int8_t eight = 8;
+  constexpr int8_t zero = 0;
+  constexpr int8_t one = 1;
+  constexpr int8_t eight = 8;
 
   vector<const void*> a_values;
   vector<const void*> b_values;
@@ -737,32 +801,32 @@ TEST_F(PartitionPrunerTest, TestInListHashPruning) {
 
   // a in [0, 1];
   a_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 18, 2);
+  check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 18, 2);
 
   // a in [0, 1, 8];
   a_values = { &zero, &one, &eight };
-  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 27, 1);
+  check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 27, 1);
 
   // b in [0, 1]
   b_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(1), &b_values) }, 18, 6);
+  check({ ColumnPredicate::InList(schema.column(1), &b_values) }, 18, 6);
 
   // c in [0, 1]
   c_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(2), &c_values) }, 18, 18);
+  check({ ColumnPredicate::InList(schema.column(2), &c_values) }, 18, 18);
 
   // b in [0, 1], c in [0, 1]
   b_values = { &zero, &one };
   c_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(1), &b_values),
+  check({ ColumnPredicate::InList(schema.column(1), &b_values),
           ColumnPredicate::InList(schema.column(2), &c_values) },
         12, 12);
 
-  //a in [0, 1], b in [0, 1], c in [0, 1]
+  // a in [0, 1], b in [0, 1], c in [0, 1]
   a_values = { &zero, &one };
   b_values = { &zero, &one };
   c_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(0), &a_values),
+  check({ ColumnPredicate::InList(schema.column(0), &a_values),
           ColumnPredicate::InList(schema.column(1), &b_values),
           ColumnPredicate::InList(schema.column(2), &c_values) },
         8, 8);
@@ -782,17 +846,8 @@ TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) {
 
   PartitionSchema partition_schema;
   auto pb = PartitionSchemaPB();
-  auto hash_component_1 = pb.add_hash_bucket_schemas();
-  hash_component_1->add_columns()->set_name("a");
-  hash_component_1->set_num_buckets(3);
-  hash_component_1->set_seed(0);
-  auto hash_component_2 = pb.add_hash_bucket_schemas();
-  hash_component_2->add_columns()->set_name("b");
-  hash_component_2->add_columns()->set_name("c");
-  hash_component_2->set_num_buckets(3);
-  hash_component_2->set_seed(0);
+  CreatePartitionSchemaPB({}, { {{"a"}, 3, 0}, {{"b", "c"}, 3, 0} }, &pb);
   pb.mutable_range_schema()->clear_columns();
-
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   vector<Partition> partitions;
@@ -802,7 +857,7 @@ TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) {
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
-  auto Check = [&] (const vector<ColumnPredicate>& predicates,
+  const auto check = [&] (const vector<ColumnPredicate>& predicates,
                     size_t remaining_tablets,
                     size_t pruner_ranges) {
     ScanSpec spec;
@@ -811,14 +866,14 @@ TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) {
       spec.AddPredicate(pred);
     }
 
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
-                          remaining_tablets, pruner_ranges);
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
   };
 
   // zero, one, eight are in different buckets when bucket number is 3 and seed is 0.
-  int8_t zero = 0;
-  int8_t one = 1;
-  int8_t eight = 8;
+  constexpr int8_t zero = 0;
+  constexpr int8_t one = 1;
+  constexpr int8_t eight = 8;
 
   vector<const void*> a_values;
   vector<const void*> b_values;
@@ -826,19 +881,19 @@ TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) {
 
   // a in [0, 1];
   a_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 6, 2);
+  check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 6, 2);
 
   // a in [0, 1, 8];
   a_values = { &zero, &one, &eight };
-  Check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 9, 1);
+  check({ ColumnPredicate::InList(schema.column(0), &a_values) }, 9, 1);
 
   // b in [0, 1]
   b_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(1), &b_values) }, 9, 1);
+  check({ ColumnPredicate::InList(schema.column(1), &b_values) }, 9, 1);
 
   // c in [0, 1]
   c_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(2), &c_values) }, 9, 1);
+  check({ ColumnPredicate::InList(schema.column(2), &c_values) }, 9, 1);
 
   // b in [0, 1], c in [0, 1]
   // (0, 0) in bucket 2
@@ -847,19 +902,19 @@ TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) {
   // (1, 1) in bucket 0
   b_values = { &zero, &one };
   c_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(1), &b_values),
+  check({ ColumnPredicate::InList(schema.column(1), &b_values),
           ColumnPredicate::InList(schema.column(2), &c_values) },
         9, 1);
 
   // b = 0, c in [0, 1]
   c_values = { &zero, &one };
-  Check({ ColumnPredicate::Equality(schema.column(1), &zero),
+  check({ ColumnPredicate::Equality(schema.column(1), &zero),
           ColumnPredicate::InList(schema.column(2), &c_values) },
         3, 3);
 
   // b = 1, c in [0, 1]
   c_values = { &zero, &one };
-  Check({ ColumnPredicate::Equality(schema.column(1), &one),
+  check({ ColumnPredicate::Equality(schema.column(1), &one),
           ColumnPredicate::InList(schema.column(2), &c_values) },
         6, 6);
 
@@ -867,7 +922,7 @@ TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) {
   a_values = { &zero, &one };
   b_values = { &zero, &one };
   c_values = { &zero, &one };
-  Check({ ColumnPredicate::InList(schema.column(0), &a_values),
+  check({ ColumnPredicate::InList(schema.column(0), &a_values),
           ColumnPredicate::InList(schema.column(1), &b_values),
           ColumnPredicate::InList(schema.column(2), &c_values) },
         6, 2);
@@ -889,13 +944,7 @@ TEST_F(PartitionPrunerTest, TestPruning) {
 
   PartitionSchema partition_schema;
   auto pb = PartitionSchemaPB();
-  pb.mutable_range_schema()->add_columns()->set_name("time");
-
-  auto hash = pb.add_hash_bucket_schemas();
-  hash->add_columns()->set_name("host");
-  hash->add_columns()->set_name("metric");
-  hash->set_num_buckets(2);
-
+  CreatePartitionSchemaPB({"time"}, { {{"host", "metric"}, 2, 0} }, &pb);
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   KuduPartialRow split(&schema);
@@ -908,9 +957,9 @@ TEST_F(PartitionPrunerTest, TestPruning) {
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
-  auto Check = [&] (const vector<ColumnPredicate>& predicates,
-                    string lower_bound_partition_key,
-                    string upper_bound_partition_key,
+  const auto check = [&] (const vector<ColumnPredicate>& predicates,
+                    const string& lower_bound_partition_key,
+                    const string& upper_bound_partition_key,
                     size_t remaining_tablets,
                     size_t pruner_ranges) {
     ScanSpec spec;
@@ -921,20 +970,20 @@ TEST_F(PartitionPrunerTest, TestPruning) {
       spec.AddPredicate(pred);
     }
 
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
-                          remaining_tablets, pruner_ranges);
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
   };
 
   Slice a = "a";
 
-  int64_t nine = 9;
-  int64_t ten = 10;
-  int64_t twenty = 20;
+  constexpr int64_t nine = 9;
+  constexpr int64_t ten = 10;
+  constexpr int64_t twenty = 20;
 
   // host = "a"
   // metric = "a"
   // timestamp >= 9;
-  Check({ ColumnPredicate::Equality(schema.column(0), &a),
+  check({ ColumnPredicate::Equality(schema.column(0), &a),
           ColumnPredicate::Equality(schema.column(1), &a),
           ColumnPredicate::Range(schema.column(2), &nine, nullptr) },
         "", "",
@@ -944,7 +993,7 @@ TEST_F(PartitionPrunerTest, TestPruning) {
   // metric = "a"
   // timestamp >= 10;
   // timestamp < 20;
-  Check({ ColumnPredicate::Equality(schema.column(0), &a),
+  check({ ColumnPredicate::Equality(schema.column(0), &a),
           ColumnPredicate::Equality(schema.column(1), &a),
           ColumnPredicate::Range(schema.column(2), &ten, &twenty) },
         "", "",
@@ -953,7 +1002,7 @@ TEST_F(PartitionPrunerTest, TestPruning) {
   // host = "a"
   // metric = "a"
   // timestamp < 10;
-  Check({ ColumnPredicate::Equality(schema.column(0), &a),
+  check({ ColumnPredicate::Equality(schema.column(0), &a),
           ColumnPredicate::Equality(schema.column(1), &a),
           ColumnPredicate::Range(schema.column(2), nullptr, &ten) },
         "", "",
@@ -962,7 +1011,7 @@ TEST_F(PartitionPrunerTest, TestPruning) {
   // host = "a"
   // metric = "a"
   // timestamp >= 10;
-  Check({ ColumnPredicate::Equality(schema.column(0), &a),
+  check({ ColumnPredicate::Equality(schema.column(0), &a),
           ColumnPredicate::Equality(schema.column(1), &a),
           ColumnPredicate::Range(schema.column(2), &ten, nullptr) },
         "", "",
@@ -971,26 +1020,26 @@ TEST_F(PartitionPrunerTest, TestPruning) {
   // host = "a"
   // metric = "a"
   // timestamp = 10;
-  Check({ ColumnPredicate::Equality(schema.column(0), &a),
+  check({ ColumnPredicate::Equality(schema.column(0), &a),
           ColumnPredicate::Equality(schema.column(1), &a),
           ColumnPredicate::Equality(schema.column(2), &ten) },
         "", "",
         1, 1);
 
   // partition key < (hash=1)
-  Check({}, "", string("\0\0\0\1", 4), 2, 1);
+  check({}, "", string("\0\0\0\1", 4), 2, 1);
 
   // partition key >= (hash=1)
-  Check({}, string("\0\0\0\1", 4), "", 2, 1);
+  check({}, string("\0\0\0\1", 4), "", 2, 1);
 
   // timestamp = 10
   // partition key < (hash=1)
-  Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
+  check({ ColumnPredicate::Equality(schema.column(2), &ten) },
         "", string("\0\0\0\1", 4), 1, 1);
 
   // timestamp = 10
   // partition key >= (hash=1)
-  Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
+  check({ ColumnPredicate::Equality(schema.column(2), &ten) },
         string("\0\0\0\1", 4), "", 1, 1);
 }
 
@@ -1008,8 +1057,7 @@ TEST_F(PartitionPrunerTest, TestKudu2173) {
 
   PartitionSchema partition_schema;
   auto pb = PartitionSchemaPB();
-  auto range_schema = pb.mutable_range_schema();
-  range_schema->add_columns()->set_name("a");
+  CreatePartitionSchemaPB({"a"}, { }, &pb);
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
   KuduPartialRow split1(&schema);
@@ -1019,34 +1067,534 @@ TEST_F(PartitionPrunerTest, TestKudu2173) {
 
   // Applies the specified predicates to a scan and checks that the expected
   // number of partitions are pruned.
-  auto Check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
+  const auto check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
     ScanSpec spec;
 
     for (const auto& pred : predicates) {
       spec.AddPredicate(pred);
     }
     size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
-    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
-                          remaining_tablets, pruner_ranges);
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
   };
 
-  int8_t eleven = 11;
-  int8_t max = INT8_MAX;
+  constexpr int8_t eleven = 11;
+  constexpr int8_t max = INT8_MAX;
 
   // a < 11
-  Check({ ColumnPredicate::Range(schema.column(0), nullptr, &eleven) }, 2);
+  check({ ColumnPredicate::Range(schema.column(0), nullptr, &eleven) }, 2);
 
   // a < 11 AND b < 11
-  Check({ ColumnPredicate::Range(schema.column(0), nullptr, &eleven),
+  check({ ColumnPredicate::Range(schema.column(0), nullptr, &eleven),
           ColumnPredicate::Range(schema.column(1), nullptr, &eleven) },
         2);
 
   // a < max
-  Check({ ColumnPredicate::Range(schema.column(0), nullptr, &max) }, 2);
+  check({ ColumnPredicate::Range(schema.column(0), nullptr, &max) }, 2);
 
   // a < max AND b < 11
-  Check({ ColumnPredicate::Range(schema.column(0), nullptr, &max),
+  check({ ColumnPredicate::Range(schema.column(0), nullptr, &max),
           ColumnPredicate::Range(schema.column(1), nullptr, &eleven) },
         2);
 }
+
+TEST_F(PartitionPrunerTest, TestHashSchemasPerRangePruning) {
+  // CREATE TABLE t
+  // (A INT8, B INT8, C STRING)
+  // PRIMARY KEY (A, B, C)
+  // PARTITION BY RANGE (C)
+  // DISTRIBUTE BY HASH(A) INTO 2 BUCKETS
+  //               HASH(B) INTO 2 BUCKETS;
+  Schema schema({ ColumnSchema("A", INT8),
+                  ColumnSchema("B", INT8),
+                  ColumnSchema("C", STRING) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) },
+                3);
+
+  PartitionSchema partition_schema;
+  auto pb = PartitionSchemaPB();
+  CreatePartitionSchemaPB({"C"}, { {{"A"}, 2, 0}, {{"B"}, 2, 0} }, &pb);
+
+  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
+  PartitionSchema::PerRangeHashBucketSchemas range_hash_schemas;
+
+  // Need to add per range hash schema components to the field 'range_with_hash_schemas_'
+  // of PartitionSchema because PartitionPruner will use them to construct partition key ranges.
+  // Currently, PartitionSchema::CreatePartitions() does not leverage this field
+  // so these components will have to be passed separately to the function as well.
+
+  // [(_, _, a), (_, _, c))
+  {
+    AddRangePartitionWithSchema(schema, {{"C", "a"}}, {{"C", "c"}}, {}, {},
+                                { {{"A"}, 3, 0} }, &bounds, &range_hash_schemas, &pb);
+  }
+
+  // [(_, _, d), (_, _, f))
+  {
+    AddRangePartitionWithSchema(schema, {{"C", "d"}}, {{"C", "f"}}, {}, {},
+                                { {{"A"}, 2, 0}, {{"B"}, 3, 0} },
+                                &bounds, &range_hash_schemas, &pb);
+  }
+
+  // [(_, _, h), (_, _, j))
+  {
+    AddRangePartitionWithSchema(schema, {{"C", "h"}}, {{"C", "j"}}, {}, {},
+                                {}, &bounds, &range_hash_schemas, &pb);
+  }
+
+  // [(_, _, k), (_, _, m))
+  {
+    AddRangePartitionWithSchema(schema, {{"C", "k"}}, {{"C", "m"}}, {}, {},
+                                { {{"B"}, 2, 0} }, &bounds, &range_hash_schemas, &pb);
+
+  }
+
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions));
+
+  ASSERT_EQ(15, partitions.size());
+
+  // Applies the specified predicates to a scan and checks that the expected
+  // number of partitions are pruned.
+  const auto check = [&] (const vector<ColumnPredicate>& predicates,
+                    const string& lower_bound_partition_key,
+                    const string& upper_bound_partition_key,
+                    size_t remaining_tablets,
+                    size_t pruner_ranges) {
+    ScanSpec spec;
+
+    spec.SetLowerBoundPartitionKey(lower_bound_partition_key);
+    spec.SetExclusiveUpperBoundPartitionKey(upper_bound_partition_key);
+    for (const auto& pred : predicates) {
+      spec.AddPredicate(pred);
+    }
+
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
+  };
+
+  constexpr int8_t zero = 0;
+  constexpr int8_t one = 1;
+
+  Slice a = "a";
+  Slice b = "b";
+  Slice e = "e";
+  Slice f = "f";
+  Slice i = "i";
+  Slice l = "l";
+  Slice m = "m";
+
+  // No Bounds
+  check({}, "", "", 15, 15);
+
+  // A = 1
+  check({ ColumnPredicate::Equality(schema.column(0), &one)}, "", "", 8, 8);
+
+  // B = 1
+  check({ ColumnPredicate::Equality(schema.column(1), &one)}, "", "", 8, 8);
+
+  // A = 0
+  // B = 1
+  // C >= "e"
+  check({ ColumnPredicate::Equality(schema.column(0), &zero),
+          ColumnPredicate::Equality(schema.column(1), &one),
+          ColumnPredicate::Range(schema.column(2), &e, nullptr)}, "", "", 3, 3);
+
+  // A = 0
+  // B = 1
+  // C = "e"
+  check({ ColumnPredicate::Equality(schema.column(0), &zero),
+          ColumnPredicate::Equality(schema.column(1), &one),
+          ColumnPredicate::Equality(schema.column(2), &e)}, "", "", 1, 1);
+
+  // B = 1
+  // C >= "b"
+  // C < "j"
+  check({ ColumnPredicate::Equality(schema.column(1), &one),
+          ColumnPredicate::Range(schema.column(2), &b, nullptr),
+          ColumnPredicate::Range(schema.column(2), nullptr, &i)}, "", "", 7, 7);
+
+
+  // A = 0
+  // C >= "e"
+  // C < "l"
+  check({ ColumnPredicate::Equality(schema.column(1), &zero),
+          ColumnPredicate::Range(schema.column(2), &e, nullptr),
+          ColumnPredicate::Range(schema.column(2), nullptr, &l)}, "", "", 5, 5);
+
+  // C >= "a"
+  // C < "b"
+  check({ ColumnPredicate::Range(schema.column(2), &a, &b)}, "", "", 3, 3);
+
+  // C >= "a"
+  // C < "e"
+  check({ ColumnPredicate::Range(schema.column(2), &a, &e)}, "", "", 9, 9);
+
+  // C >= "e"
+  // C < "i"
+  check({ ColumnPredicate::Range(schema.column(2), &e, &i)}, "", "", 10, 10);
+
+  // C >= "a"
+  // C < "l"
+  check({ ColumnPredicate::Range(schema.column(2), &a, &l)}, "", "", 15, 15);
+
+  // C >= "i"
+  // C < "l"
+  check({ ColumnPredicate::Range(schema.column(2), &i, &l)}, "", "", 6, 6);
+
+  // C >= "e"
+  check({ ColumnPredicate::Range(schema.column(2), &e, nullptr)}, "", "", 12, 12);
+
+  // C < "f"
+  check({ ColumnPredicate::Range(schema.column(2), nullptr, &f)}, "", "", 9, 9);
+
+  // C >= "f"
+  check({ ColumnPredicate::Range(schema.column(2), &f, nullptr)}, "", "", 6, 6);
+
+  // C < "a"
+  check({ ColumnPredicate::Range(schema.column(2), nullptr, &a)}, "", "", 0, 0);
+
+  // C >= "m"
+  check({ ColumnPredicate::Range(schema.column(2), &m, nullptr)}, "", "", 0, 0);
+
+  // Uses None predicate to short circuit scan
+  check({ ColumnPredicate::None(schema.column(2))}, "", "", 0, 0);
+
+  // partition key >= (hash=1, hash=0)
+  check({}, string("\0\0\0\1\0\0\0\0", 8), "", 8, 8);
+
+  // partition key < (hash=1, hash=0)
+  check({}, "", string("\0\0\0\1\0\0\0\0", 8), 7, 7);
+
+  // C >= "e"
+  // C < "m"
+  // partition key >= (hash=1)
+  check({ColumnPredicate::Range(schema.column(2), &e, &m)}, string("\0\0\0\1", 4), "", 6, 6);
+
+  // C >= "e"
+  // C < "m"
+  // partition key < (hash=1)
+  check({ColumnPredicate::Range(schema.column(2), &e, &m)}, "", string("\0\0\0\1", 4), 6, 6);
+}
+
+TEST_F(PartitionPrunerTest, TestHashSchemasPerRangeWithPartialPrimaryKeyRangePruning) {
+  // CREATE TABLE t
+  // (a INT8, b INT8, c INT8)
+  // PRIMARY KEY (a, b, c)
+  // PARTITION BY RANGE(a, b)
+
+  // Setup the Schema
+  Schema schema({ ColumnSchema("a", INT8),
+                  ColumnSchema("b", INT8),
+                  ColumnSchema("c", INT8) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) },
+                3);
+
+  PartitionSchema partition_schema;
+  auto pb = PartitionSchemaPB();
+  CreatePartitionSchemaPB({"a", "b"}, {}, &pb);
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
+  PartitionSchema::PerRangeHashBucketSchemas range_hash_schemas;
+
+  // [(0, 0, _), (2, 2, _))
+  {
+    AddRangePartitionWithSchema(schema, {}, {}, {{"a", 0}, {"b", 0}}, {{"a", 2}, {"b", 2}},
+                                { {{"c"}, 2, 0} }, &bounds, &range_hash_schemas, &pb);
+  }
+
+  // [(2, 2, _), (4, 4, _))
+  {
+    AddRangePartitionWithSchema(schema, {}, {}, {{"a", 2}, {"b", 2}}, {{"a", 4}, {"b", 4}},
+                                { {{"c"}, 3, 0} }, &bounds, &range_hash_schemas, &pb);
+  }
+
+  // [(4, 4, _), (6, 6, _))
+  {
+    AddRangePartitionWithSchema(schema, {}, {}, {{"a", 4}, {"b", 4}}, {{"a", 6}, {"b", 6}},
+                                { {{"c"}, 4, 0} }, &bounds, &range_hash_schemas, &pb);
+  }
+
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions));
+
+  ASSERT_EQ(9, partitions.size());
+
+  Arena arena(1024);
+  // Applies the specified lower and upper bound primary keys against the
+  // schema, and checks that the expected number of partitions are pruned.
+  const auto check = [&] (optional<tuple<int8_t, int8_t, int8_t>> lower,
+                    optional<tuple<int8_t, int8_t, int8_t>> upper,
+                    size_t remaining_tablets,
+                    size_t pruner_ranges) {
+    ScanSpec spec;
+    KuduPartialRow lower_bound(&schema);
+    KuduPartialRow upper_bound(&schema);
+    EncodedKey* enc_lower_bound = nullptr;
+    EncodedKey* enc_upper_bound = nullptr;
+
+    if (lower) {
+      ASSERT_OK(lower_bound.SetInt8("a", get<0>(*lower)));
+      ASSERT_OK(lower_bound.SetInt8("b", get<1>(*lower)));
+      ASSERT_OK(lower_bound.SetInt8("c", get<2>(*lower)));
+      ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
+      enc_lower_bound = EncodedKey::FromContiguousRow(row, &arena);
+      spec.SetLowerBoundKey(enc_lower_bound);
+    }
+    if (upper) {
+      ASSERT_OK(upper_bound.SetInt8("a", get<0>(*upper)));
+      ASSERT_OK(upper_bound.SetInt8("b", get<1>(*upper)));
+      ASSERT_OK(upper_bound.SetInt8("c", get<2>(*upper)));
+      ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
+      enc_upper_bound = EncodedKey::FromContiguousRow(row, &arena);
+      spec.SetExclusiveUpperBoundKey(enc_upper_bound);
+    }
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
+  };
+
+  // No bounds
+  check(boost::none, boost::none, 9, 9);
+
+  // PK < (2, 2, min)
+  check(boost::none, make_tuple<int8_t, int8_t, int8_t>(2, 2, INT8_MIN), 2, 2);
+
+  // PK < (2, 2, 0)
+  check(boost::none, make_tuple<int8_t, int8_t, int8_t>(2, 2, 0), 5, 5);
+
+  // PK >= (2, 2, 0)
+  check(make_tuple<int8_t, int8_t, int8_t>(2, 2, 0), boost::none, 7, 7);
+
+  // PK >= (2, 2, min)
+  // PK < (4, 4, min)
+  check(make_tuple<int8_t, int8_t, int8_t>(2, 2, INT8_MIN),
+        make_tuple<int8_t, int8_t, int8_t>(4, 4, INT8_MIN), 3, 3);
+
+  // PK >= (2, 2, min)
+  // PK < (4, 4, 0)
+  check(make_tuple<int8_t, int8_t, int8_t>(2, 2, INT8_MIN),
+        make_tuple<int8_t, int8_t, int8_t>(4, 4, 0), 7, 7);
+
+  // PK >= (2, 0, min)
+  // PK < (4, 2, min)
+  check(make_tuple<int8_t, int8_t, int8_t>(2, 0, INT8_MIN),
+        make_tuple<int8_t, int8_t, int8_t>(4, 2, INT8_MIN), 5, 5);
+
+  // PK >= (6, 6, min)
+  check(make_tuple<int8_t, int8_t, int8_t>(6, 6, INT8_MIN), boost::none, 0, 0);
+
+  // PK >= (2, 2, min)
+  // PK < (4, 4, min)
+  // Lower bound PK > Upper bound PK so scan is short circuited
+  check(make_tuple<int8_t, int8_t, int8_t>(4, 4, INT8_MIN),
+        make_tuple<int8_t, int8_t, int8_t>(2, 2, INT8_MIN), 0, 0);
+}
+
+TEST_F(PartitionPrunerTest, TestInListHashPruningPerRange) {
+  // CREATE TABLE t
+  // (A STRING, B INT8, C INT8)
+  // PRIMARY KEY (A, B, C)
+  // PARTITION BY RANGE (A)
+  // DISTRIBUTE HASH(B, C) INTO 3 BUCKETS;
+  Schema schema({ ColumnSchema("A", STRING),
+                  ColumnSchema("B", INT8),
+                  ColumnSchema("C", INT8) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) },
+                3);
+
+  PartitionSchema partition_schema;
+  auto pb = PartitionSchemaPB();
+  CreatePartitionSchemaPB({"A"}, { {{"B", "C"}, 3, 0} }, &pb);
+
+  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
+  PartitionSchema::PerRangeHashBucketSchemas range_hash_schemas;
+
+  // [(a, _, _), (c, _, _))
+  {
+    AddRangePartitionWithSchema(schema, {{"A", "a"}}, {{"A", "c"}}, {}, {},
+                                { {{"B"}, 3, 0} }, &bounds, &range_hash_schemas, &pb);
+  }
+
+  // [(c, _, _), (e, _, _))
+  {
+    AddRangePartitionWithSchema(schema, {{"A", "c"}}, {{"A", "e"}}, {}, {},
+                                {}, &bounds, &range_hash_schemas, &pb);
+  }
+
+  // [(e, _, _), (g, _, _))
+  {
+    AddRangePartitionWithSchema(schema, {{"A", "e"}}, {{"A", "g"}}, {}, {},
+                                { {{"C"}, 3, 0} }, &bounds, &range_hash_schemas, &pb);
+  }
+
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions));
+
+  ASSERT_EQ(9, partitions.size());
+
+  // Applies the specified predicates to a scan and checks that the expected
+  // number of partitions are pruned.
+  const auto check = [&] (const vector<ColumnPredicate>& predicates,
+                    size_t remaining_tablets,
+                    size_t pruner_ranges) {
+    ScanSpec spec;
+
+    for (const auto& pred : predicates) {
+      spec.AddPredicate(pred);
+    }
+
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
+  };
+
+  // zero, one, eight are in different buckets when bucket number is 3 and seed is 0.
+  constexpr int8_t zero = 0;
+  constexpr int8_t one = 1;
+  constexpr int8_t eight = 8;
+
+  vector<const void*> B_values;
+  vector<const void*> C_values;
+
+  // B in [0, 1, 8];
+  B_values = { &zero, &one, &eight };
+  check({ ColumnPredicate::InList(schema.column(1), &B_values) }, 9, 9);
+
+  // B in [0, 1];
+  B_values = { &zero, &one };
+  check({ ColumnPredicate::InList(schema.column(1), &B_values) }, 8, 8);
+
+  // C in [0, 1];
+  C_values = { &zero, &one };
+  check({ ColumnPredicate::InList(schema.column(2), &C_values) }, 8, 8);
+
+  // B in [0, 1], C in [0, 1]
+  // (0, 0) in bucket 2
+  // (0, 1) in bucket 2
+  // (1, 0) in bucket 1
+  // (1, 1) in bucket 0
+  B_values = { &zero, &one };
+  C_values = { &zero, &one };
+  check({ ColumnPredicate::InList(schema.column(1), &B_values),
+          ColumnPredicate::InList(schema.column(2), &C_values) },
+        7,  7);
+
+  // B = 0, C in [0, 1]
+  C_values = { &zero, &one };
+  check({ ColumnPredicate::Equality(schema.column(1), &zero),
+          ColumnPredicate::InList(schema.column(2), &C_values) },
+        4, 4);
+
+  // B = 1, C in [0, 1]
+  C_values = { &zero, &one };
+  check({ ColumnPredicate::Equality(schema.column(1), &one),
+          ColumnPredicate::InList(schema.column(2), &C_values) },
+        5, 5);
+}
+
+TEST_F(PartitionPrunerTest, TestSingleRangeElementAndBoundaryCase) {
+  // CREATE TABLE t
+  // (A INT8, B STRING)
+  // PRIMARY KEY (A, B)
+  // PARTITION BY RANGE (A)
+  // DISTRIBUTE BY HASH(B) INTO 2 BUCKETS;
+  Schema schema({ ColumnSchema("A", INT8),
+                  ColumnSchema("B", STRING) },
+                { ColumnId(0), ColumnId(1) },
+                2);
+
+  PartitionSchema partition_schema;
+  auto pb = PartitionSchemaPB();
+  CreatePartitionSchemaPB({"A"}, { {{"B"}, 2, 0} }, &pb);
+
+  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
+  PartitionSchema::PerRangeHashBucketSchemas range_hash_schemas;
+
+  // [(0, _), (1, _))
+  {
+    AddRangePartitionWithSchema(schema, {}, {}, {{"A", 0}}, {{"A", 1}},
+                                {{{"B"}, 4, 0}}, &bounds, &range_hash_schemas, &pb);
+  }
+
+  // [(1, _), (2, _))
+  {
+    AddRangePartitionWithSchema(schema, {}, {}, {{"A", 1}}, {{"A", 2}},
+                                {}, &bounds, &range_hash_schemas, &pb);
+  }
+
+  // [(2, _), (3, _))
+  {
+    AddRangePartitionWithSchema(schema, {}, {}, {{"A", 2}}, {{"A", 3}},
+                                { {{"B"}, 3, 0} }, &bounds, &range_hash_schemas, &pb);
+  }
+
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions));
+
+  ASSERT_EQ(9, partitions.size());
+
+  // Applies the specified predicates to a scan and checks that the expected
+  // number of partitions are pruned.
+  const auto check = [&] (const vector<ColumnPredicate>& predicates,
+      size_t remaining_tablets,
+      size_t pruner_ranges) {
+    ScanSpec spec;
+
+    for (const auto& pred : predicates) {
+      spec.AddPredicate(pred);
+    }
+
+    NO_FATALS(CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                                    remaining_tablets, pruner_ranges));
+  };
+
+  constexpr int8_t zero = 0;
+  constexpr int8_t one = 1;
+  constexpr int8_t two = 2;
+  constexpr int8_t three = 3;
+
+  // No Bounds
+  check({}, 9, 9);
+
+  // A >= 0
+  check({ ColumnPredicate::Range(schema.column(0), &zero, nullptr)}, 9, 9);
+
+  // A >= 1
+  check({ ColumnPredicate::Range(schema.column(0), &one, nullptr)}, 5, 5);
+
+  // A < 1
+  check({ ColumnPredicate::Range(schema.column(0), nullptr, &one)}, 4, 4);
+
+  // A >= 2
+  check({ ColumnPredicate::Range(schema.column(0), &two, nullptr)}, 3, 3);
+
+  // A < 2
+  check({ ColumnPredicate::Range(schema.column(0), nullptr, &two)}, 6, 6);
+
+  // A < 3
+  check({ ColumnPredicate::Range(schema.column(0), nullptr, &three)}, 9, 9);
+
+  // A >= 0
+  // A < 2
+  check({ ColumnPredicate::Range(schema.column(0), &zero, &two)}, 6, 6);
+
+  // A >= 1
+  // A < 2
+  check({ ColumnPredicate::Range(schema.column(0), &one, &two)}, 2, 2);
+
+  // A >= 1
+  // A < 3
+  check({ ColumnPredicate::Range(schema.column(0), &one, &three)}, 5, 5);
+
+  // A >= 3
+  check({ ColumnPredicate::Range(schema.column(0), &three, nullptr)}, 0, 0);
+}
 } // namespace kudu
diff --git a/src/kudu/common/partition_pruner.cc b/src/kudu/common/partition_pruner.cc
index 2c39e10..191c035 100644
--- a/src/kudu/common/partition_pruner.cc
+++ b/src/kudu/common/partition_pruner.cc
@@ -24,7 +24,6 @@
 #include <memory>
 #include <numeric>
 #include <string>
-#include <tuple>
 #include <unordered_map>
 #include <vector>
 
@@ -49,15 +48,11 @@
 
 using std::distance;
 using std::find;
-using std::get;
 using std::iota;
 using std::lower_bound;
-using std::make_tuple;
 using std::memcpy;
-using std::min;
 using std::move;
 using std::string;
-using std::tuple;
 using std::unique_ptr;
 using std::unordered_map;
 using std::vector;
@@ -70,7 +65,7 @@ namespace {
 bool AreRangeColumnsPrefixOfPrimaryKey(const Schema& schema,
                                        const vector<ColumnId>& range_columns) {
   CHECK(range_columns.size() <= schema.num_key_columns());
-  for (int32_t col_idx = 0; col_idx < range_columns.size(); col_idx++) {
+  for (int32_t col_idx = 0; col_idx < range_columns.size(); ++col_idx) {
     if (schema.column_id(col_idx) != range_columns[col_idx]) {
       return false;
     }
@@ -132,7 +127,7 @@ void EncodeRangeKeysFromPrimaryKeyBounds(const Schema& schema,
       // exclusive bound. If not, then we increment the range-key prefix in
       // order to transform it from inclusive to exclusive.
       bool min_suffix = true;
-      for (int32_t idx = num_range_columns; idx < schema.num_key_columns(); idx++) {
+      for (int32_t idx = num_range_columns; idx < schema.num_key_columns(); ++idx) {
         min_suffix &= schema.column(idx)
                             .type_info()
                             ->IsMinValue(scan_spec.exclusive_upper_bound_key()->raw_keys()[idx]);
@@ -185,7 +180,6 @@ void EncodeRangeKeysFromPredicates(const Schema& schema,
 } // anonymous namespace
 
 vector<bool> PartitionPruner::PruneHashComponent(
-    const PartitionSchema& partition_schema,
     const PartitionSchema::HashBucketSchema& hash_bucket_schema,
     const Schema& schema,
     const ScanSpec& scan_spec) {
@@ -220,16 +214,121 @@ vector<bool> PartitionPruner::PruneHashComponent(
     encoded_strings.swap(new_encoded_strings);
   }
   for (const string& encoded_string : encoded_strings) {
-    uint32_t hash = partition_schema.BucketForEncodedColumns(encoded_string, hash_bucket_schema);
+    uint32_t hash = PartitionSchema::BucketForEncodedColumns(encoded_string, hash_bucket_schema);
     hash_bucket_bitset[hash] = true;
   }
   return hash_bucket_bitset;
 }
 
+void PartitionPruner::ConstructPartitionKeyRanges(
+    const Schema& schema,
+    const ScanSpec& scan_spec,
+    const PartitionSchema::HashBucketSchemas& hash_bucket_schemas,
+    const RangeBounds& range_bounds,
+    vector<PartitionKeyRange>* partition_key_ranges) {
+  // Create the hash bucket portion of the partition key.
+
+  // The list of hash buckets bitset per hash component
+  vector<vector<bool>> hash_bucket_bitsets;
+  hash_bucket_bitsets.reserve(hash_bucket_schemas.size());
+  for (const auto& hash_bucket_schema : hash_bucket_schemas) {
+    bool can_prune = true;
+    for (const auto& column_id : hash_bucket_schema.column_ids) {
+      const ColumnSchema& column = schema.column_by_id(column_id);
+      const ColumnPredicate* predicate = FindOrNull(scan_spec.predicates(), column.name());
+      if (predicate == nullptr ||
+          (predicate->predicate_type() != PredicateType::Equality &&
+              predicate->predicate_type() != PredicateType::InList)) {
+        can_prune = false;
+        break;
+      }
+    }
+    if (can_prune) {
+      auto hash_bucket_bitset = PruneHashComponent(hash_bucket_schema,
+                                                   schema,
+                                                   scan_spec);
+      hash_bucket_bitsets.emplace_back(std::move(hash_bucket_bitset));
+    } else {
+      hash_bucket_bitsets.emplace_back(hash_bucket_schema.num_buckets, true);
+    }
+  }
+
+  // The index of the final constrained component in the partition key.
+  size_t constrained_index;
+  if (!range_bounds.lower.empty() || !range_bounds.upper.empty()) {
+    // The range component is constrained.
+    constrained_index = hash_bucket_schemas.size();
+  } else {
+    // Search the hash bucket constraints from right to left, looking for the
+    // first constrained component.
+    constrained_index = hash_bucket_schemas.size() -
+        distance(hash_bucket_bitsets.rbegin(),
+                 find_if(hash_bucket_bitsets.rbegin(),
+                         hash_bucket_bitsets.rend(),
+                         [] (const vector<bool>& x) {
+                           return std::find(x.begin(), x.end(), false) != x.end();
+                         }));
+  }
+
+  // Build up a set of partition key ranges out of the hash components.
+  //
+  // Each hash component simply appends its bucket number to the
+  // partition key ranges (possibly incrementing the upper bound by one bucket
+  // number if this is the final constraint, see note 2 in the example above).
+  const KeyEncoder<string>& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
+  for (size_t hash_idx = 0; hash_idx < constrained_index; ++hash_idx) {
+    // This is the final partition key component if this is the final constrained
+    // bucket, and the range upper bound is empty. In this case we need to
+    // increment the bucket on the upper bound to convert from inclusive to
+    // exclusive.
+    bool is_last = hash_idx + 1 == constrained_index && range_bounds.upper.empty();
+
+    vector<PartitionKeyRange> new_partition_key_ranges;
+    for (const auto& partition_key_range : *partition_key_ranges) {
+      const vector<bool>& buckets_bitset = hash_bucket_bitsets[hash_idx];
+      for (uint32_t bucket = 0; bucket < buckets_bitset.size(); ++bucket) {
+        if (!buckets_bitset[bucket]) {
+          continue;
+        }
+        uint32_t bucket_upper = is_last ? bucket + 1 : bucket;
+        string lower = partition_key_range.start;
+        string upper = partition_key_range.end;
+        hash_encoder.Encode(&bucket, &lower);
+        hash_encoder.Encode(&bucket_upper, &upper);
+        new_partition_key_ranges.emplace_back(PartitionKeyRange{move(lower), move(upper)});
+      }
+    }
+    partition_key_ranges->swap(new_partition_key_ranges);
+  }
+
+  // Append the (possibly empty) range bounds to the partition key ranges.
+  for (auto& range : *partition_key_ranges) {
+    range.start.append(range_bounds.lower);
+    range.end.append(range_bounds.upper);
+  }
+
+  // Remove all partition key ranges past the scan spec's upper bound partition key.
+  if (!scan_spec.exclusive_upper_bound_partition_key().empty()) {
+    for (auto range = partition_key_ranges->rbegin();
+         range != partition_key_ranges->rend();
+         ++range) {
+      if (!(*range).end.empty() &&
+          scan_spec.exclusive_upper_bound_partition_key() >= (*range).end) {
+        break;
+      }
+      if (scan_spec.exclusive_upper_bound_partition_key() <= (*range).start) {
+        partition_key_ranges->pop_back();
+      } else {
+        (*range).end = scan_spec.exclusive_upper_bound_partition_key();
+      }
+    }
+  }
+}
+
 void PartitionPruner::Init(const Schema& schema,
                            const PartitionSchema& partition_schema,
                            const ScanSpec& scan_spec) {
-  // If we can already short circuit the scan we don't need to bother with
+  // If we can already short circuit the scan, we don't need to bother with
   // partition pruning. This also allows us to assume some invariants of the
   // scan spec, such as no None predicates and that the lower bound PK < upper
   // bound PK.
@@ -302,192 +401,174 @@ void PartitionPruner::Init(const Schema& schema,
   //    since it is precisely these highly-hash-partitioned tables which get the
   //    most benefit from pruning.
 
-  // Step 1: Build the range portion of the partition key.
-  string range_lower_bound;
-  string range_upper_bound;
-  const vector<ColumnId>& range_columns = partition_schema.range_schema_.column_ids;
+  // Build the range portion of the partition key by using
+  // the lower and upper bounds specified by the scan.
+  string scan_range_lower_bound;
+  string scan_range_upper_bound;
+  const vector<ColumnId> &range_columns = partition_schema.range_schema_.column_ids;
   if (!range_columns.empty()) {
     if (AreRangeColumnsPrefixOfPrimaryKey(schema, range_columns)) {
       EncodeRangeKeysFromPrimaryKeyBounds(schema,
                                           scan_spec,
                                           range_columns.size(),
-                                          &range_lower_bound,
-                                          &range_upper_bound);
+                                          &scan_range_lower_bound,
+                                          &scan_range_upper_bound);
     } else {
       EncodeRangeKeysFromPredicates(schema,
                                     scan_spec.predicates(),
                                     range_columns,
-                                    &range_lower_bound,
-                                    &range_upper_bound);
+                                    &scan_range_lower_bound,
+                                    &scan_range_upper_bound);
     }
   }
 
-  // Step 2: Create the hash bucket portion of the partition key.
-
-  // The list of hash buckets bitset per hash component
-  vector<vector<bool>> hash_bucket_bitsets;
-  hash_bucket_bitsets.reserve(partition_schema.hash_bucket_schemas_.size());
-  for (int hash_idx = 0; hash_idx < partition_schema.hash_bucket_schemas_.size(); hash_idx++) {
-    const auto& hash_bucket_schema = partition_schema.hash_bucket_schemas_[hash_idx];
-    bool can_prune = true;
-    for (const ColumnId& column_id : hash_bucket_schema.column_ids) {
-      const ColumnSchema& column = schema.column_by_id(column_id);
-      const ColumnPredicate *predicate = FindOrNull(scan_spec.predicates(), column.name());
-      if (predicate == nullptr ||
-          (predicate->predicate_type() != PredicateType::Equality &&
-           predicate->predicate_type() != PredicateType::InList)) {
-        can_prune = false;
-        break;
-      }
-    }
-    if (can_prune) {
-      auto hash_bucket_bitset = PruneHashComponent(partition_schema,
-                                                   hash_bucket_schema,
-                                                   schema,
-                                                   scan_spec);
-      hash_bucket_bitsets.emplace_back(std::move(hash_bucket_bitset));
-    } else {
-      hash_bucket_bitsets.emplace_back(hash_bucket_schema.num_buckets, true);
-    }
-  }
-
-  // The index of the final constrained component in the partition key.
-  int constrained_index;
-  if (!range_lower_bound.empty() || !range_upper_bound.empty()) {
-    // The range component is constrained.
-    constrained_index = partition_schema.hash_bucket_schemas_.size();
+  // Store ranges and their corresponding hash schemas if they fall within
+  // the range bounds specified by the scan.
+  if (partition_schema.ranges_with_hash_schemas_.empty()) {
+    vector<PartitionKeyRange> partition_key_ranges(1);
+    ConstructPartitionKeyRanges(schema, scan_spec, partition_schema.hash_bucket_schemas_,
+                                {scan_range_lower_bound, scan_range_upper_bound},
+                                &partition_key_ranges);
+    // Reverse the order of the partition key ranges, so that it is
+    // efficient to remove the partition key ranges from the vector in ascending order.
+    range_bounds_to_partition_key_ranges_.resize(1);
+    auto& first_range = range_bounds_to_partition_key_ranges_[0];
+    first_range.partition_key_ranges.resize(partition_key_ranges.size());
+    move(partition_key_ranges.rbegin(), partition_key_ranges.rend(),
+         first_range.partition_key_ranges.begin());
   } else {
-    // Search the hash bucket constraints from right to left, looking for the
-    // first constrained component.
-    constrained_index = partition_schema.hash_bucket_schemas_.size() -
-                        distance(hash_bucket_bitsets.rbegin(),
-                                 find_if(hash_bucket_bitsets.rbegin(),
-                                         hash_bucket_bitsets.rend(),
-                                         [] (const vector<bool>& x) {
-                                           return std::find(x.begin(), x.end(), false) != x.end();
-                                         }));
-  }
-
-  // Build up a set of partition key ranges out of the hash components.
-  //
-  // Each hash component simply appends its bucket number to the
-  // partition key ranges (possibly incrementing the upper bound by one bucket
-  // number if this is the final constraint, see note 2 in the example above).
-  vector<tuple<string, string>> partition_key_ranges(1);
-  const KeyEncoder<string>& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
-  for (int hash_idx = 0; hash_idx < constrained_index; hash_idx++) {
-    // This is the final partition key component if this is the final constrained
-    // bucket, and the range upper bound is empty. In this case we need to
-    // increment the bucket on the upper bound to convert from inclusive to
-    // exclusive.
-    bool is_last = hash_idx + 1 == constrained_index && range_upper_bound.empty();
-
-    vector<tuple<string, string>> new_partition_key_ranges;
-    for (const auto& partition_key_range : partition_key_ranges) {
-      const vector<bool>& buckets_bitset = hash_bucket_bitsets[hash_idx];
-      for (uint32_t bucket = 0; bucket < buckets_bitset.size(); ++bucket) {
-        if (!buckets_bitset[bucket]) {
-          continue;
+    vector<RangeBounds> range_bounds;
+    vector<PartitionSchema::HashBucketSchemas> hash_schemas_per_range;
+    for (const auto& range : partition_schema.ranges_with_hash_schemas_) {
+      const auto& hash_schemas = range.hash_schemas.empty() ?
+          partition_schema.hash_bucket_schemas_ : range.hash_schemas;
+      // Both lower and upper bounds are unbounded.
+      if (scan_range_lower_bound.empty() && scan_range_upper_bound.empty()) {
+        range_bounds.emplace_back(RangeBounds{range.lower, range.upper});
+        hash_schemas_per_range.emplace_back(hash_schemas);
+        continue;
+      }
+      // Only one of the lower/upper bounds is unbounded.
+      if (scan_range_lower_bound.empty()) {
+        if (scan_range_upper_bound > range.lower) {
+          range_bounds.emplace_back(RangeBounds{range.lower, range.upper});
+          hash_schemas_per_range.emplace_back(hash_schemas);
         }
-        uint32_t bucket_upper = is_last ? bucket + 1 : bucket;
-        string lower = get<0>(partition_key_range);
-        string upper = get<1>(partition_key_range);
-        hash_encoder.Encode(&bucket, &lower);
-        hash_encoder.Encode(&bucket_upper, &upper);
-        new_partition_key_ranges.emplace_back(move(lower), move(upper));
+        continue;
       }
-    }
-    partition_key_ranges.swap(new_partition_key_ranges);
-  }
-
-  // Step 3: append the (possibly empty) range bounds to the partition key ranges.
-  for (auto& range : partition_key_ranges) {
-    get<0>(range).append(range_lower_bound);
-    get<1>(range).append(range_upper_bound);
-  }
-
-  // Step 4: remove all partition key ranges past the scan spec's upper bound partition key.
-  if (!scan_spec.exclusive_upper_bound_partition_key().empty()) {
-    for (auto range = partition_key_ranges.rbegin();
-         range != partition_key_ranges.rend();
-         range++) {
-      if (!get<1>(*range).empty() &&
-          scan_spec.exclusive_upper_bound_partition_key() >= get<1>(*range)) {
-        break;
+      if (scan_range_upper_bound.empty()) {
+        if (scan_range_lower_bound < range.upper) {
+          range_bounds.emplace_back(RangeBounds{range.lower, range.upper});
+          hash_schemas_per_range.emplace_back(hash_schemas);
+        }
+        continue;
       }
-      if (scan_spec.exclusive_upper_bound_partition_key() <= get<0>(*range)) {
-        partition_key_ranges.pop_back();
+      // Both lower and upper ranges are bounded.
+      if (scan_range_lower_bound < range.upper && scan_range_upper_bound > range.lower) {
+        range_bounds.emplace_back(RangeBounds{range.lower, range.upper});
+        hash_schemas_per_range.emplace_back(hash_schemas);
+      }
+    }
+    DCHECK_EQ(range_bounds.size(), hash_schemas_per_range.size());
+    range_bounds_to_partition_key_ranges_.resize(hash_schemas_per_range.size());
+    // Construct partition key ranges from the ranges and their respective hash schemas
+    // that falls within the scan's bounds.
+    for (size_t i = 0; i < hash_schemas_per_range.size(); ++i) {
+      const auto& hash_schema = hash_schemas_per_range[i];
+      vector<PartitionKeyRange> partition_key_ranges(1);
+      if (scan_range_lower_bound.empty() && scan_range_upper_bound.empty()) {
+        ConstructPartitionKeyRanges(schema, scan_spec, hash_schema,
+                                    {range_bounds[i].lower, range_bounds[i].upper},
+                                    &partition_key_ranges);
       } else {
-        get<1>(*range) = scan_spec.exclusive_upper_bound_partition_key();
+        ConstructPartitionKeyRanges(schema, scan_spec, hash_schema,
+                                    {scan_range_lower_bound, scan_range_upper_bound},
+                                    &partition_key_ranges);
       }
+      auto& current_range = range_bounds_to_partition_key_ranges_[i];
+      current_range.range_bounds = range_bounds[i];
+      current_range.partition_key_ranges.resize(partition_key_ranges.size());
+      move(partition_key_ranges.rbegin(), partition_key_ranges.rend(),
+           current_range.partition_key_ranges.begin());
     }
   }
 
-  // Step 5: Reverse the order of the partition key ranges, so that it is
-  // efficient to remove the partition key ranges from the vector in ascending order.
-  partition_key_ranges_.resize(partition_key_ranges.size());
-  move(partition_key_ranges.rbegin(), partition_key_ranges.rend(), partition_key_ranges_.begin());
-
-  // Step 6: Remove all partition key ranges before the scan spec's lower bound partition key.
+  // Remove all partition key ranges before the scan spec's lower bound partition key.
   if (!scan_spec.lower_bound_partition_key().empty()) {
     RemovePartitionKeyRange(scan_spec.lower_bound_partition_key());
   }
-}
 
+}
 bool PartitionPruner::HasMorePartitionKeyRanges() const {
-  return !partition_key_ranges_.empty();
+  return NumRangesRemaining() != 0;
 }
 
 const string& PartitionPruner::NextPartitionKey() const {
   CHECK(HasMorePartitionKeyRanges());
-  return get<0>(partition_key_ranges_.back());
+  return range_bounds_to_partition_key_ranges_.back().partition_key_ranges.back().start;
 }
 
 void PartitionPruner::RemovePartitionKeyRange(const string& upper_bound) {
   if (upper_bound.empty()) {
-    partition_key_ranges_.clear();
+    range_bounds_to_partition_key_ranges_.clear();
     return;
   }
 
-  for (auto range = partition_key_ranges_.rbegin();
-       range != partition_key_ranges_.rend();
-       range++) {
-    if (upper_bound <= get<0>(*range)) { break; }
-    if (get<1>(*range).empty() || upper_bound < get<1>(*range)) {
-      get<0>(*range) = upper_bound;
-    } else {
-      partition_key_ranges_.pop_back();
+  for (auto& range_bounds_and_partition_key_range : range_bounds_to_partition_key_ranges_) {
+    auto& partition_key_range = range_bounds_and_partition_key_range.partition_key_ranges;
+    for (auto range_it = partition_key_range.rbegin();
+         range_it != partition_key_range.rend();
+         ++range_it) {
+      if (upper_bound <= (*range_it).start) { break; }
+      if ((*range_it).end.empty() || upper_bound < (*range_it).end) {
+        (*range_it).start = upper_bound;
+      } else {
+        partition_key_range.pop_back();
+      }
     }
   }
 }
 
 bool PartitionPruner::ShouldPrune(const Partition& partition) const {
-  // range is an iterator that points to the first partition key range which
-  // overlaps or is greater than the partition.
-  auto range = lower_bound(partition_key_ranges_.rbegin(), partition_key_ranges_.rend(), partition,
-    [] (const tuple<string, string>& scan_range, const Partition& partition) {
-      // return true if scan_range < partition
-      const string& scan_upper = get<1>(scan_range);
-      return !scan_upper.empty() && scan_upper <= partition.partition_key_start();
-    });
-
-  return range == partition_key_ranges_.rend() ||
-         (!partition.partition_key_end().empty() &&
-          partition.partition_key_end() <= get<0>(*range));
+  for (const auto& [range_bounds, partition_key_range] : range_bounds_to_partition_key_ranges_) {
+    // Check if the partition belongs to the same range as the partition key range.
+    if (!range_bounds.lower.empty() && partition.range_key_start() != range_bounds.lower &&
+        !range_bounds.upper.empty() && partition.range_key_end() != range_bounds.upper) {
+      continue;
+    }
+    // range is an iterator that points to the first partition key range which
+    // overlaps or is greater than the partition.
+    auto range = lower_bound(partition_key_range.rbegin(), partition_key_range.rend(),
+                             partition, [] (const PartitionKeyRange& scan_range,
+                                 const Partition& partition) {
+                               // return true if scan_range < partition
+                               const string& scan_upper = scan_range.end;
+                               return !scan_upper.empty()
+                               && scan_upper <= partition.partition_key_start();
+                             });
+    if (!(range == partition_key_range.rend() ||
+       (!partition.partition_key_end().empty() &&
+       partition.partition_key_end() <= (*range).start))) {
+      return false;
+    }
+  }
+  return true;
 }
 
 string PartitionPruner::ToString(const Schema& schema,
                                  const PartitionSchema& partition_schema) const {
   vector<string> strings;
-  for (auto range = partition_key_ranges_.rbegin();
-       range != partition_key_ranges_.rend();
-       range++) {
-    strings.push_back(strings::Substitute(
+  for (const auto& partition_key_range : range_bounds_to_partition_key_ranges_) {
+    for (auto range = partition_key_range.partition_key_ranges.rbegin();
+         range != partition_key_range.partition_key_ranges.rend();
+         ++range) {
+      strings.push_back(strings::Substitute(
           "[($0), ($1))",
-          get<0>(*range).empty() ? "<start>" :
-              partition_schema.PartitionKeyDebugString(get<0>(*range), schema),
-          get<1>(*range).empty() ? "<end>" :
-              partition_schema.PartitionKeyDebugString(get<1>(*range), schema)));
+          (*range).start.empty() ? "<start>" :
+            partition_schema.PartitionKeyDebugString((*range).start, schema),
+          (*range).end.empty() ? "<end>" :
+            partition_schema.PartitionKeyDebugString((*range).end, schema)));
+    }
   }
 
   return JoinStrings(strings, ", ");
diff --git a/src/kudu/common/partition_pruner.h b/src/kudu/common/partition_pruner.h
index 6dec6de..4e3a1f4 100644
--- a/src/kudu/common/partition_pruner.h
+++ b/src/kudu/common/partition_pruner.h
@@ -19,11 +19,10 @@
 
 #include <cstddef>
 #include <string>
-#include <tuple>
 #include <vector>
 
-#include "kudu/gutil/macros.h"
 #include "kudu/common/partition.h"
+#include "kudu/gutil/macros.h"
 
 namespace kudu {
 
@@ -42,6 +41,27 @@ class PartitionPruner {
 
   PartitionPruner() = default;
 
+  struct RangeBounds {
+    RangeBounds() = default;
+
+    std::string lower;
+    std::string upper;
+  };
+
+  struct PartitionKeyRange {
+    PartitionKeyRange() = default;
+
+    std::string start;
+    std::string end;
+  };
+
+  struct RangeBoundsAndPartitionKeyRanges {
+    RangeBoundsAndPartitionKeyRanges() = default;
+
+    RangeBounds range_bounds;
+    std::vector<PartitionKeyRange> partition_key_ranges;
+  };
+
   // Initializes the partition pruner for a new scan. The scan spec should
   // already be optimized by the ScanSpec::Optimize method.
   void Init(const Schema& schema,
@@ -61,8 +81,12 @@ class PartitionPruner {
   bool ShouldPrune(const Partition& partition) const;
 
   // Returns the number of partition key ranges remaining in the scan.
-  size_t NumRangesRemainingForTests() const {
-    return partition_key_ranges_.size();
+  size_t NumRangesRemaining() const {
+    size_t num_ranges = 0;
+    for (const auto& range: range_bounds_to_partition_key_ranges_) {
+      num_ranges += range.partition_key_ranges.size();
+    }
+    return num_ranges;
   }
 
   // Returns a text description of this partition pruner suitable for debug
@@ -70,17 +94,25 @@ class PartitionPruner {
   std::string ToString(const Schema& schema, const PartitionSchema& partition_schema) const;
 
  private:
-  // Search all combination of in-list and equality predicates.
-  // Return hash values bitset of these combination.
-  std::vector<bool> PruneHashComponent(
-      const PartitionSchema& partition_schema,
+  // Search all combinations of in-list and equality predicates.
+  // Return hash values bitset of these combinations.
+  static std::vector<bool> PruneHashComponent(
       const PartitionSchema::HashBucketSchema& hash_bucket_schema,
       const Schema& schema,
       const ScanSpec& scan_spec);
 
-  // The reverse sorted set of partition key ranges. Each range has an inclusive
-  // lower and exclusive upper bound.
-  std::vector<std::tuple<std::string, std::string>> partition_key_ranges_;
+  // Given the range bounds and the hash bucket schemas, constructs a set of partition key ranges.
+  static void ConstructPartitionKeyRanges(
+      const Schema& schema,
+      const ScanSpec& scan_spec,
+      const PartitionSchema::HashBucketSchemas& hash_bucket_schemas,
+      const RangeBounds& range_bounds,
+      std::vector<PartitionKeyRange>* partition_key_ranges);
+
+  // A vector of a pair of lower and upper range bounds mapped to a
+  // reverse sorted set of partition key ranges. Each partition key range within the set
+  // has an inclusive lower bound and an exclusive upper bound.
+  std::vector<RangeBoundsAndPartitionKeyRanges> range_bounds_to_partition_key_ranges_;
 
   DISALLOW_COPY_AND_ASSIGN(PartitionPruner);
 };