You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/06/29 16:25:03 UTC

[kudu] branch master updated: KUDU-2671 update partition schema in catalog when dropping range

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new f55dd22bf KUDU-2671 update partition schema in catalog when dropping range
f55dd22bf is described below

commit f55dd22bfea4beee99d72891efbbc67307b19d1e
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Jun 20 20:34:20 2022 -0700

    KUDU-2671 update partition schema in catalog when dropping range
    
    When dropping a range with custom hash schema from a table, it's
    necessary to update the partition schema information stored in the
    system catalog correspondingly.  That was missing in one of the previous
    patches and this patch fixes the deficiency.
    
    This patch also adds a test scenario to spot regressions, if any.  The
    scenario was failing before the update in CatalogManager introduced
    in this patch.
    
    In addition, I updated the PartitionSchema class to store only the
    information on ranges with custom hash schemas in the
    ranges_with_custom_hash_schemas_ field.  I also added unit test
    scenarios to cover the updated functionality of the PartitionSchema
    class.
    
    A follow-up patch is needed to refresh PartitionPruner's code since
    the logic in PartitionPruner::Init() is expecting that the whole list
    of table's ranges would returned by the
    PartitionSchema::ranges_with_hash_schemas() method.
    
    This is a follow-up to 250eb90bc0e1f4f472f44de8a23ce213595d5ee7.
    
    Change-Id: Ib78afdd1a358751dca43f564c5d8e69191f165d4
    Reviewed-on: http://gerrit.cloudera.org:8080/18642
    Reviewed-by: Mahesh Reddy <mr...@cloudera.com>
    Reviewed-by: Abhishek Chennaka <ac...@cloudera.com>
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Attila Bukor <ab...@apache.org>
---
 src/kudu/common/partition-test.cc        | 465 ++++++++++++++++++++++++++++++-
 src/kudu/common/partition.cc             | 161 ++++++++---
 src/kudu/common/partition.h              |  64 +++--
 src/kudu/common/partition_pruner-test.cc |  92 +++---
 src/kudu/common/partition_pruner.cc      |   4 +-
 src/kudu/master/catalog_manager.cc       | 123 +++++---
 src/kudu/master/catalog_manager.h        |  15 +-
 src/kudu/master/master-test.cc           | 103 ++++++-
 8 files changed, 855 insertions(+), 172 deletions(-)

diff --git a/src/kudu/common/partition-test.cc b/src/kudu/common/partition-test.cc
index 3adacef56..1202f96ee 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -968,7 +968,9 @@ TEST_F(PartitionTest, VaryingHashSchemasPerRange) {
     ASSERT_OK(upper.SetStringCopy("a", "a4"));
     ASSERT_OK(upper.SetStringCopy("b", "b4"));
     AddRangePartitionWithSchema(
-        schema, lower, upper, table_wide_hash_schema, &ps_pb);
+        schema, lower, upper,
+        { { { ColumnId(0), ColumnId(2) }, 3, 1 }, { { ColumnId(1) }, 2, 10 } },
+        &ps_pb);
   }
 
   { // [(a5, b5, _), (a6, _, c6))
@@ -984,14 +986,15 @@ TEST_F(PartitionTest, VaryingHashSchemasPerRange) {
   }
 
   PartitionSchema ps;
-  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps));
+  PartitionSchema::RangesWithHashSchemas ranges;
+  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps, &ranges));
   CheckSerializationFunctions(ps_pb, ps, schema);
 
   ASSERT_EQ("HASH (a, c) PARTITIONS 3, HASH (b) PARTITIONS 2, RANGE (a, b, c)",
             ps.DebugString(schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(ps.CreatePartitions(schema, &partitions));
+  ASSERT_OK(ps.CreatePartitions(ranges, schema, &partitions));
 
   ASSERT_EQ(16, partitions.size());
 
@@ -1175,13 +1178,14 @@ TEST_F(PartitionTest, CustomHashSchemasPerRangeOnly) {
   }
 
   PartitionSchema ps;
-  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps));
+  PartitionSchema::RangesWithHashSchemas ranges;
+  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps, &ranges));
   CheckSerializationFunctions(ps_pb, ps, schema);
 
   ASSERT_EQ("RANGE (a, b)", ps.DebugString(schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(ps.CreatePartitions(schema, &partitions));
+  ASSERT_OK(ps.CreatePartitions(ranges, schema, &partitions));
   ASSERT_EQ(2, partitions.size());
 
   {
@@ -1237,23 +1241,20 @@ TEST_F(PartitionTest, VaryingHashSchemasPerUnboundedRanges) {
     KuduPartialRow upper(&schema);
     ASSERT_OK(lower.SetStringCopy("a", "a4"));
     ASSERT_OK(lower.SetStringCopy("b", "b4"));
-    PartitionSchema::HashSchema hash_schema_2_buckets_by_3 = {
-        {{ColumnId(0)}, 2, 0},
-        {{ColumnId(2)}, 3, 0}
-    };
     AddRangePartitionWithSchema(
         schema, lower, upper,
         {{{ColumnId(0)}, 2, 0}, {{ColumnId(2)}, 3, 0}}, &ps_pb);
   }
 
   PartitionSchema ps;
-  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps));
+  PartitionSchema::RangesWithHashSchemas ranges;
+  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps, &ranges));
   CheckSerializationFunctions(ps_pb, ps, schema);
 
   ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b, c)", ps.DebugString(schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(ps.CreatePartitions(schema, &partitions));
+  ASSERT_OK(ps.CreatePartitions(ranges, schema, &partitions));
   ASSERT_EQ(11, partitions.size());
   // Partitions below sorted by range, can verify that the partition keyspace is filled by checking
   // that the start key of the first partition and the end key of the last partition is cleared.
@@ -1381,13 +1382,14 @@ TEST_F(PartitionTest, NoHashSchemasForLastUnboundedRange) {
   }
 
   PartitionSchema ps;
-  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps));
+  PartitionSchema::RangesWithHashSchemas ranges;
+  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps, &ranges));
   CheckSerializationFunctions(ps_pb, ps, schema);
 
   ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b)", ps.DebugString(schema));
 
   vector<Partition> partitions;
-  ASSERT_OK(ps.CreatePartitions(schema, &partitions));
+  ASSERT_OK(ps.CreatePartitions(ranges, schema, &partitions));
   ASSERT_EQ(10, partitions.size());
 
   {
@@ -1487,6 +1489,102 @@ TEST_F(PartitionTest, NoHashSchemasForLastUnboundedRange) {
   }
 }
 
+// This test scenario verifies that when converting to PartitionSchemaPB,
+// the 'custom_hash_schema_ranges' field is populated only with ranges
+// that have different from the table-wide hash schema.
+// The rationale is the following: Kudu server side accepts input from the
+// client side that specify ranges with table-wide hash schema as elements
+// of the 'PartitionSchemaPB::custom_hash_schema_ranges' field, but when storing
+// the information in the system catalog, unnecessary parts are omitted.
+TEST_F(PartitionTest, CustomHashSchemaRangesToPB) {
+  const Schema schema({ ColumnSchema("a", STRING),
+                        ColumnSchema("b", STRING),
+                        ColumnSchema("c", STRING) },
+                      { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
+
+  PartitionSchemaPB ps_pb;
+  // Table-wide hash schema.
+  AddHashDimension(&ps_pb, { "b" }, 2, 0);
+
+  {
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    ASSERT_OK(upper.SetStringCopy("c", "c1"));
+    AddRangePartitionWithSchema(
+        schema, lower, upper, {{{ColumnId(0)}, 4, 1}}, &ps_pb);
+  }
+  {
+    KuduPartialRow lower(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a2.0"));
+    ASSERT_OK(lower.SetStringCopy("b", "b2.0"));
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a2.1"));
+    ASSERT_OK(upper.SetStringCopy("b", "b2.1"));
+    // This hash schema is actually the table-wide one.
+    AddRangePartitionWithSchema(
+        schema, lower, upper, {{{ColumnId(1)}, 2, 0}}, &ps_pb);
+  }
+  {
+    KuduPartialRow lower(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a3.0"));
+    ASSERT_OK(lower.SetStringCopy("b", "b3.0"));
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a3.1"));
+    ASSERT_OK(upper.SetStringCopy("b", "b3.1"));
+    AddRangePartitionWithSchema(
+        schema, lower, upper, {{{ColumnId(2)}, 3, 10}}, &ps_pb);
+  }
+  {
+    KuduPartialRow lower(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a4.0"));
+    ASSERT_OK(lower.SetStringCopy("b", "b4.0"));
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a4.1"));
+    ASSERT_OK(upper.SetStringCopy("b", "b4.1"));
+    // This hash schema is not the table-wide one: the seed is different.
+    AddRangePartitionWithSchema(
+        schema, lower, upper, {{{ColumnId(1)}, 2, 5}}, &ps_pb);
+  }
+
+  PartitionSchema ps;
+  PartitionSchema::RangesWithHashSchemas ranges;
+  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps, &ranges));
+  ASSERT_TRUE(ps.HasCustomHashSchemas());
+  // All the ranges are transcoded from the original PartitionSchemaPB.
+  ASSERT_EQ(4, ranges.size());
+  // There are only 3 ranges with range-specific hash schema.
+  ASSERT_EQ(3, ps.ranges_with_custom_hash_schemas().size());
+
+  PartitionSchemaPB ps_pb_other;
+  ASSERT_OK(ps.ToPB(schema, &ps_pb_other));
+  ASSERT_EQ(1, ps_pb_other.hash_schema_size());
+  // The range with the table-wide schema shouldn't be there.
+  ASSERT_EQ(3, ps_pb_other.custom_hash_schema_ranges_size());
+  for (const auto& range : ps_pb_other.custom_hash_schema_ranges()) {
+    // All the table's hash schemas have a single dimension.
+    ASSERT_EQ(1, range.hash_schema_size());
+    const auto& hash_dimension = range.hash_schema(0);
+    ASSERT_TRUE(hash_dimension.has_seed());
+    const auto seed = hash_dimension.seed();
+    ASSERT_NE(0, seed);
+    // In this scenario, only the table-wide hash schema has zero seed.
+    ASSERT_TRUE(seed == 1 || seed == 5 || seed == 10);
+    ASSERT_TRUE(hash_dimension.has_num_buckets());
+    const auto num_buckets = hash_dimension.num_buckets();
+    ASSERT_TRUE(num_buckets == 4 || num_buckets == 2 || num_buckets == 3);
+  }
+
+  PartitionSchema ps_other;
+  PartitionSchema::RangesWithHashSchemas ranges_other;
+  ASSERT_OK(PartitionSchema::FromPB(ps_pb_other, schema, &ps_other, &ranges_other));
+  ASSERT_TRUE(ps_other.HasCustomHashSchemas());
+  // The information on the ranges with custom hash schemas isn't persisted
+  // anywhere else but in the 'RangeSchemaPB::custom_hash_schema_ranges' field.
+  ASSERT_EQ(3, ps_other.ranges_with_custom_hash_schemas_.size());
+  ASSERT_EQ(3, ranges_other.size());
+}
+
 TEST_F(PartitionTest, TestPartitionSchemaPB) {
   // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
   // PARTITION BY [HASH BUCKET (b), RANGE (a, b, c)];
@@ -1560,11 +1658,13 @@ TEST_F(PartitionTest, TestPartitionSchemaPB) {
   }
 
   PartitionSchema partition_schema;
-  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+  PartitionSchema::RangesWithHashSchemas ranges;
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema, &ranges));
 
   // Check fields of 'partition_schema' to verify decoder function.
   ASSERT_EQ(1, partition_schema.hash_schema().size());
-  const auto& ranges_with_hash_schemas = partition_schema.ranges_with_hash_schemas();
+  const auto& ranges_with_hash_schemas =
+      partition_schema.ranges_with_custom_hash_schemas_;
   ASSERT_EQ(3, ranges_with_hash_schemas.size());
 
   EXPECT_EQ(string("a0\0\0\0\0c0", 8), ranges_with_hash_schemas[0].lower);
@@ -1844,4 +1944,339 @@ TEST_F(PartitionTest, HasCustomHashSchemasMethod) {
   }
 }
 
+// A test scenario to verify functionality of the
+// PartitionSchema::DropRange() method.
+TEST_F(PartitionTest, DropRange) {
+  const Schema schema({ ColumnSchema("a", STRING),
+                        ColumnSchema("b", STRING),
+                        ColumnSchema("c", STRING) },
+                      { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
+
+  // Try to drop non-existing range.
+  {
+    PartitionSchemaPB pb;
+    PartitionSchema ps;
+    ASSERT_OK(PartitionSchema::FromPB(pb, schema, &ps));
+
+    KuduPartialRow lower(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a0"));
+    ASSERT_OK(lower.SetStringCopy("c", "c0"));
+
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a0"));
+    ASSERT_OK(upper.SetStringCopy("c", "c1"));
+
+    const auto s = ps.DropRange(lower, upper, schema);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "range with specified lower bound not found");
+  }
+
+  // Single range with custom hash schema.
+  {
+    PartitionSchemaPB pb;
+    auto* range = pb.add_custom_hash_schema_ranges();
+
+    KuduPartialRow lower(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a0"));
+    ASSERT_OK(lower.SetStringCopy("c", "c0"));
+
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a0"));
+    ASSERT_OK(upper.SetStringCopy("c", "c1"));
+
+    RowOperationsPBEncoder encoder(range->mutable_range_bounds());
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    auto* hash_dimension = range->add_hash_schema();
+    hash_dimension->add_columns()->set_name("a");
+    hash_dimension->set_num_buckets(2);
+
+    PartitionSchema ps;
+    PartitionSchema::RangesWithHashSchemas ranges;
+    ASSERT_OK(PartitionSchema::FromPB(pb, schema, &ps, &ranges));
+    ASSERT_EQ(1, ranges.size());
+    ASSERT_EQ(1, ps.ranges_with_custom_hash_schemas_.size());
+    ASSERT_EQ(1, ps.hash_schema_idx_by_encoded_range_start_.size());
+    ASSERT_TRUE(ps.HasCustomHashSchemas());
+    ASSERT_OK(ps.DropRange(lower, upper, schema));
+    ASSERT_EQ(0, ps.ranges_with_custom_hash_schemas_.size());
+    ASSERT_EQ(0, ps.hash_schema_idx_by_encoded_range_start_.size());
+    ASSERT_FALSE(ps.HasCustomHashSchemas());
+
+    // Doing that one more time should not work.
+    const auto s = ps.DropRange(lower, upper, schema);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "range with specified lower bound not found");
+    ASSERT_EQ(0, ps.ranges_with_custom_hash_schemas_.size());
+    ASSERT_EQ(0, ps.hash_schema_idx_by_encoded_range_start_.size());
+    ASSERT_FALSE(ps.HasCustomHashSchemas());
+  }
+
+  // Two ranges with range-specific hash schemas.
+  {
+    PartitionSchemaPB pb;
+
+    KuduPartialRow lower_0(&schema);
+    ASSERT_OK(lower_0.SetStringCopy("a", "a0"));
+    ASSERT_OK(lower_0.SetStringCopy("c", "c0"));
+
+    KuduPartialRow upper_0(&schema);
+    ASSERT_OK(upper_0.SetStringCopy("a", "a0"));
+    ASSERT_OK(upper_0.SetStringCopy("c", "c1"));
+
+    {
+      auto* range = pb.add_custom_hash_schema_ranges();
+      RowOperationsPBEncoder encoder(range->mutable_range_bounds());
+      encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower_0);
+      encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper_0);
+
+      auto* hash_dimension = range->add_hash_schema();
+      hash_dimension->add_columns()->set_name("a");
+      hash_dimension->set_num_buckets(5);
+    }
+
+    KuduPartialRow lower_1(&schema);
+    ASSERT_OK(lower_1.SetStringCopy("a", "a1"));
+    ASSERT_OK(lower_1.SetStringCopy("c", "c1"));
+
+    KuduPartialRow upper_1(&schema);
+    ASSERT_OK(upper_1.SetStringCopy("a", "a1"));
+    ASSERT_OK(upper_1.SetStringCopy("c", "c2"));
+    {
+      auto* range = pb.add_custom_hash_schema_ranges();
+
+      RowOperationsPBEncoder encoder(range->mutable_range_bounds());
+      encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower_1);
+      encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper_1);
+
+      auto* hash_dimension = range->add_hash_schema();
+      hash_dimension->add_columns()->set_name("a");
+      hash_dimension->set_num_buckets(3);
+    }
+
+    PartitionSchema ps;
+    PartitionSchema::RangesWithHashSchemas ranges;
+    ASSERT_OK(PartitionSchema::FromPB(pb, schema, &ps, &ranges));
+    ASSERT_EQ(2, ranges.size());
+    ASSERT_EQ(2, ps.ranges_with_custom_hash_schemas_.size());
+    ASSERT_EQ(2, ps.hash_schema_idx_by_encoded_range_start_.size());
+    ASSERT_TRUE(ps.HasCustomHashSchemas());
+
+    // Try to drop a range with non-matching lower range.
+    {
+      KuduPartialRow lower_x(&schema);
+      ASSERT_OK(lower_x.SetStringCopy("a", "a0_x"));
+      ASSERT_OK(lower_x.SetStringCopy("c", "c0_x"));
+      const auto s = ps.DropRange(lower_x, upper_0, schema);
+      ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(),
+                          "range with specified lower bound not found");
+      ASSERT_EQ(2, ps.ranges_with_custom_hash_schemas_.size());
+      ASSERT_EQ(2, ps.hash_schema_idx_by_encoded_range_start_.size());
+    }
+
+    // Try to drop a range with non-matching upper range.
+    {
+      KuduPartialRow upper_x(&schema);
+      ASSERT_OK(upper_x.SetStringCopy("a", "a0_x"));
+      ASSERT_OK(upper_x.SetStringCopy("c", "c1_x"));
+
+      const auto s = ps.DropRange(lower_0, upper_x, schema);
+      ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "upper bound does not match");
+      ASSERT_EQ(2, ps.ranges_with_custom_hash_schemas_.size());
+      ASSERT_EQ(2, ps.hash_schema_idx_by_encoded_range_start_.size());
+    }
+
+    // Try dropping a range with mix-and-match range boundaries.
+    {
+      const auto s = ps.DropRange(lower_0, upper_1, schema);
+      ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "upper bound does not match");
+      ASSERT_EQ(2, ps.ranges_with_custom_hash_schemas_.size());
+      ASSERT_EQ(2, ps.hash_schema_idx_by_encoded_range_start_.size());
+    }
+    {
+      const auto s = ps.DropRange(lower_1, upper_0, schema);
+      ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "upper bound does not match");
+      ASSERT_EQ(2, ps.ranges_with_custom_hash_schemas_.size());
+      ASSERT_EQ(2, ps.hash_schema_idx_by_encoded_range_start_.size());
+    }
+  }
+}
+
+TEST_F(PartitionTest, HasCustomHashSchemasWhenAddingAndDroppingRanges) {
+  const Schema schema({ ColumnSchema("a", STRING),
+                        ColumnSchema("b", STRING) },
+                      { ColumnId(0), ColumnId(1) }, 2);
+
+  PartitionSchemaPB ps_pb;
+  // Add the information on the table-wide hash schema.
+  AddHashDimension(&ps_pb, { "b" }, 2, 0);
+
+  // No ranges defined yet, so there isn't any range with range-specific
+  // hash schema.
+  {
+    PartitionSchema ps;
+    PartitionSchema::RangesWithHashSchemas ranges;
+    ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps, &ranges));
+    ASSERT_FALSE(ps.HasCustomHashSchemas());
+    ASSERT_EQ(0, ranges.size());
+    ASSERT_EQ(0, ps.ranges_with_custom_hash_schemas_.size());
+    ASSERT_EQ(0, ps.hash_schema_idx_by_encoded_range_start_.size());
+  }
+
+  // Add a range with table-wide hash schema into the
+  // PartitionSchemaPB::custom_hash_schema_ranges.
+  {
+    KuduPartialRow lower(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a0.0"));
+    ASSERT_OK(lower.SetStringCopy("b", "b0.0"));
+
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a0.1"));
+    ASSERT_OK(upper.SetStringCopy("b", "b0.1"));
+
+    auto* range = ps_pb.add_custom_hash_schema_ranges();
+    RowOperationsPBEncoder encoder(range->mutable_range_bounds());
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    auto* hash_dimension = range->add_hash_schema();
+    hash_dimension->add_columns()->set_name("b");
+    hash_dimension->set_num_buckets(2);
+    hash_dimension->set_seed(0);
+
+    PartitionSchema ps;
+    PartitionSchema::RangesWithHashSchemas ranges;
+    ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps, &ranges));
+    ASSERT_FALSE(ps.HasCustomHashSchemas());
+    ASSERT_EQ(1, ranges.size());
+    ASSERT_EQ(0, ps.ranges_with_custom_hash_schemas_.size());
+    ASSERT_EQ(0, ps.hash_schema_idx_by_encoded_range_start_.size());
+  }
+
+  // Add a range with a custom hash schema.
+  {
+    auto* range = ps_pb.add_custom_hash_schema_ranges();
+
+    KuduPartialRow lower(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a1.0"));
+    ASSERT_OK(lower.SetStringCopy("b", "b1.0"));
+
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a1.1"));
+    ASSERT_OK(upper.SetStringCopy("b", "b1.1"));
+
+    RowOperationsPBEncoder encoder(range->mutable_range_bounds());
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    auto* hash_dimension = range->add_hash_schema();
+    hash_dimension->add_columns()->set_name("a");
+    hash_dimension->set_num_buckets(3);
+
+    PartitionSchema ps;
+    PartitionSchema::RangesWithHashSchemas ranges;
+    ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps, &ranges));
+    ASSERT_TRUE(ps.HasCustomHashSchemas());
+    ASSERT_EQ(2, ranges.size());
+    ASSERT_EQ(1, ps.ranges_with_custom_hash_schemas_.size());
+    ASSERT_EQ(1, ps.hash_schema_idx_by_encoded_range_start_.size());
+  }
+
+  // Add one more range with table-wide hash schema into the
+  // 'custom_hash_schema_ranges'.
+  {
+    KuduPartialRow lower(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a2.0"));
+    ASSERT_OK(lower.SetStringCopy("b", "b2.0"));
+
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a2.1"));
+    ASSERT_OK(upper.SetStringCopy("b", "b2.1"));
+
+    auto* range = ps_pb.add_custom_hash_schema_ranges();
+    RowOperationsPBEncoder encoder(range->mutable_range_bounds());
+    encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    auto* hash_dimension = range->add_hash_schema();
+    hash_dimension->add_columns()->set_name("b");
+    hash_dimension->set_num_buckets(2);
+    hash_dimension->set_seed(0);
+
+    PartitionSchema ps;
+    PartitionSchema::RangesWithHashSchemas ranges;
+    ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps, &ranges));
+    ASSERT_TRUE(ps.HasCustomHashSchemas());
+    ASSERT_EQ(3, ranges.size());
+    ASSERT_EQ(1, ps.ranges_with_custom_hash_schemas_.size());
+    ASSERT_EQ(1, ps.hash_schema_idx_by_encoded_range_start_.size());
+  }
+
+  // Now check how HasCustomHashSchema() works when dropping ranges.
+  PartitionSchema ps;
+  ASSERT_OK(PartitionSchema::FromPB(ps_pb, schema, &ps));
+
+  // Drop the first range that has the table-wide hash schema.
+  {
+    KuduPartialRow lower(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a0.0"));
+    ASSERT_OK(lower.SetStringCopy("b", "b0.0"));
+
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a0.1"));
+    ASSERT_OK(upper.SetStringCopy("b", "b0.1"));
+
+    const auto s = ps.DropRange(lower, upper, schema);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "range with specified lower bound not found");
+  }
+  // The range with custom hash schema is still there.
+  ASSERT_TRUE(ps.HasCustomHashSchemas());
+  ASSERT_EQ(1, ps.ranges_with_custom_hash_schemas_.size());
+  ASSERT_EQ(1, ps.hash_schema_idx_by_encoded_range_start_.size());
+
+  // Drop the range with range-specific hash schema.
+  {
+    KuduPartialRow lower(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a1.0"));
+    ASSERT_OK(lower.SetStringCopy("b", "b1.0"));
+
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a1.1"));
+    ASSERT_OK(upper.SetStringCopy("b", "b1.1"));
+
+    ASSERT_OK(ps.DropRange(lower, upper, schema));
+  }
+  ASSERT_FALSE(ps.HasCustomHashSchemas());
+  ASSERT_EQ(0, ps.ranges_with_custom_hash_schemas_.size());
+  ASSERT_EQ(0, ps.hash_schema_idx_by_encoded_range_start_.size());
+
+  // Drop the remaining range that has the table-wide hash schema.
+  {
+    KuduPartialRow lower(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a2.0"));
+    ASSERT_OK(lower.SetStringCopy("b", "b2.0"));
+
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a2.1"));
+    ASSERT_OK(upper.SetStringCopy("b", "b2.1"));
+
+    const auto s = ps.DropRange(lower, upper, schema);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "range with specified lower bound not found");
+  }
+  ASSERT_FALSE(ps.HasCustomHashSchemas());
+  ASSERT_EQ(0, ps.ranges_with_custom_hash_schemas_.size());
+  ASSERT_EQ(0, ps.hash_schema_idx_by_encoded_range_start_.size());
+}
+
 } // namespace kudu
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index d02838979..852f0fe15 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -209,9 +209,11 @@ Status PartitionSchema::ExtractHashSchemaFromPB(
   return Status::OK();
 }
 
-Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
-                               const Schema& schema,
-                               PartitionSchema* partition_schema) {
+Status PartitionSchema::FromPB(
+    const PartitionSchemaPB& pb,
+    const Schema& schema,
+    PartitionSchema* partition_schema,
+    RangesWithHashSchemas* ranges_with_hash_schemas) {
   partition_schema->Clear();
   RETURN_NOT_OK(ExtractHashSchemaFromPB(
       schema, pb.hash_schema(), &partition_schema->hash_schema_));
@@ -220,7 +222,7 @@ Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
   vector<HashSchema> range_hash_schemas;
   range_hash_schemas.resize(custom_ranges_num);
   vector<pair<KuduPartialRow, KuduPartialRow>> range_bounds;
-  for (int i = 0; i < custom_ranges_num; i++) {
+  for (auto i = 0; i < custom_ranges_num; ++i) {
     const auto& range = pb.custom_hash_schema_ranges(i);
     RETURN_NOT_OK(ExtractHashSchemaFromPB(
         schema, range.hash_schema(), &range_hash_schemas[i]));
@@ -276,24 +278,43 @@ Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
     }
   }
 
-  auto* ranges_ptr = &partition_schema->ranges_with_hash_schemas_;
+  RangesWithHashSchemas result_ranges_with_hash_schemas;
   if (!range_bounds.empty()) {
     RETURN_NOT_OK(partition_schema->EncodeRangeBounds(
-        range_bounds, range_hash_schemas, schema, ranges_ptr));
-  }
-  if (ranges_ptr != nullptr) {
-    auto& dict = partition_schema->hash_schema_idx_by_encoded_range_start_;
-    for (auto it = ranges_ptr->cbegin(); it != ranges_ptr->cend(); ++it) {
-      InsertOrDie(&dict, it->lower, std::distance(ranges_ptr->cbegin(), it));
-    }
+        range_bounds, range_hash_schemas, schema,
+        &result_ranges_with_hash_schemas));
   }
-  if (range_bounds.size() != ranges_ptr->size()) {
+
+  if (range_bounds.size() != result_ranges_with_hash_schemas.size()) {
     return Status::InvalidArgument(Substitute("the number of range bounds "
         "($0) differs from the number ranges with hash schemas ($1)",
-        range_bounds.size(), ranges_ptr->size()));
+        range_bounds.size(), result_ranges_with_hash_schemas.size()));
   }
 
-  return partition_schema->Validate(schema);
+  auto& ranges_with_custom_hash_schemas =
+      partition_schema->ranges_with_custom_hash_schemas_;
+  const auto& table_wide_hash_schema = partition_schema->hash_schema_;
+  ranges_with_custom_hash_schemas.reserve(result_ranges_with_hash_schemas.size());
+  for (const auto& elem : result_ranges_with_hash_schemas) {
+    if (elem.hash_schema != table_wide_hash_schema) {
+      ranges_with_custom_hash_schemas.emplace_back(elem);
+    }
+  }
+
+  auto& dict = partition_schema->hash_schema_idx_by_encoded_range_start_;
+  for (auto it = ranges_with_custom_hash_schemas.cbegin();
+       it != ranges_with_custom_hash_schemas.cend(); ++it) {
+    InsertOrDie(&dict, it->lower, std::distance(
+        ranges_with_custom_hash_schemas.cbegin(), it));
+  }
+  DCHECK_EQ(ranges_with_custom_hash_schemas.size(), dict.size());
+
+  RETURN_NOT_OK(partition_schema->Validate(schema));
+  if (ranges_with_hash_schemas) {
+    *ranges_with_hash_schemas = std::move(result_ranges_with_hash_schemas);
+  }
+
+  return Status::OK();
 }
 
 Status PartitionSchema::ToPB(const Schema& schema, PartitionSchemaPB* pb) const {
@@ -307,11 +328,17 @@ Status PartitionSchema::ToPB(const Schema& schema, PartitionSchemaPB* pb) const
     hash_dimension_pb->set_seed(hash_dimension.seed);
   }
 
-  if (!ranges_with_hash_schemas_.empty()) {
+  if (!ranges_with_custom_hash_schemas_.empty()) {
+    // Consumers of this method does not expect the information
+    // on ranges with the table-wide hash schema persisted into the
+    // 'custom_hash_schema_ranges' field.
     pb->mutable_custom_hash_schema_ranges()->Reserve(
-        ranges_with_hash_schemas_.size());
+        ranges_with_custom_hash_schemas_.size());
     Arena arena(256);
-    for (const auto& range_hash_schema : ranges_with_hash_schemas_) {
+    for (const auto& range_hash_schema : ranges_with_custom_hash_schemas_) {
+      if (range_hash_schema.hash_schema == hash_schema_) {
+        continue;
+      }
       auto* range_pb = pb->add_custom_hash_schema_ranges();
 
       arena.Reset();
@@ -451,8 +478,8 @@ Status PartitionSchema::EncodeRangeBounds(
   size_t j = 0;
   for (const auto& bound : range_bounds) {
     string lower;
-    string upper;
     RETURN_NOT_OK(EncodeRangeKey(bound.first, schema, &lower));
+    string upper;
     RETURN_NOT_OK(EncodeRangeKey(bound.second, schema, &upper));
 
     if (!lower.empty() && !upper.empty() && lower >= upper) {
@@ -623,11 +650,6 @@ Status PartitionSchema::CreatePartitions(
   return Status::OK();
 }
 
-Status PartitionSchema::CreatePartitions(const Schema& schema,
-                                         vector<Partition>* partitions) const {
-  return CreatePartitions(ranges_with_hash_schemas_, schema, partitions);
-}
-
 Status PartitionSchema::CreatePartitionsForRange(
     const pair<KuduPartialRow, KuduPartialRow>& range_bound,
     const HashSchema& range_hash_schema,
@@ -1122,6 +1144,7 @@ string PartitionSchema::PartitionTableEntry(const Schema& schema,
 }
 
 bool PartitionSchema::operator==(const PartitionSchema& rhs) const {
+  // TODO(aserbin): what about ranges_with_custom_hash_schemas_?
   if (this == &rhs) {
     return true;
   }
@@ -1131,8 +1154,10 @@ bool PartitionSchema::operator==(const PartitionSchema& rhs) const {
     return false;
   }
 
+  const auto& lhs_ranges = ranges_with_custom_hash_schemas_;
+  const auto& rhs_ranges = rhs.ranges_with_custom_hash_schemas_;
   if (hash_schema_.size() != rhs.hash_schema_.size() ||
-      ranges_with_hash_schemas_.size() != rhs.ranges_with_hash_schemas_.size()) {
+      lhs_ranges.size() != rhs_ranges.size()) {
     return false;
   }
 
@@ -1144,13 +1169,13 @@ bool PartitionSchema::operator==(const PartitionSchema& rhs) const {
   }
 
   // Compare range bounds and per range hash bucket schemas.
-  for (size_t i = 0; i < ranges_with_hash_schemas_.size(); ++i) {
-    if (ranges_with_hash_schemas_[i].lower != rhs.ranges_with_hash_schemas_[i].lower ||
-        ranges_with_hash_schemas_[i].upper != rhs.ranges_with_hash_schemas_[i].upper) {
+  for (size_t i = 0; i < lhs_ranges.size(); ++i) {
+    if (lhs_ranges[i].lower != rhs_ranges[i].lower ||
+        lhs_ranges[i].upper != rhs_ranges[i].upper) {
       return false;
     }
-    const auto& lhs_hash_schema = ranges_with_hash_schemas_[i].hash_schema;
-    const auto& rhs_hash_schema = rhs.ranges_with_hash_schemas_[i].hash_schema;
+    const auto& lhs_hash_schema = lhs_ranges[i].hash_schema;
+    const auto& rhs_hash_schema = rhs_ranges[i].hash_schema;
     if (lhs_hash_schema.size() != rhs_hash_schema.size()) {
       return false;
     }
@@ -1290,7 +1315,7 @@ uint32_t PartitionSchema::HashValueForRow(const ConstContiguousRow& row,
 
 void PartitionSchema::Clear() {
   hash_schema_idx_by_encoded_range_start_.clear();
-  ranges_with_hash_schemas_.clear();
+  ranges_with_custom_hash_schemas_.clear();
   hash_schema_.clear();
   range_schema_.column_ids.clear();
 }
@@ -1298,7 +1323,7 @@ void PartitionSchema::Clear() {
 Status PartitionSchema::Validate(const Schema& schema) const {
   RETURN_NOT_OK(ValidateHashSchema(schema, hash_schema_));
 
-  for (const auto& range_with_hash_schema : ranges_with_hash_schemas_) {
+  for (const auto& range_with_hash_schema : ranges_with_custom_hash_schemas_) {
     RETURN_NOT_OK(ValidateHashSchema(schema, range_with_hash_schema.hash_schema));
   }
 
@@ -1587,17 +1612,14 @@ const PartitionSchema::HashSchema& PartitionSchema::GetHashSchemaForRange(
   const auto* idx = FindFloorOrNull(
       hash_schema_idx_by_encoded_range_start_, range_key);
   bool has_custom_range = (idx != nullptr);
-  // Check for the case of a non-covered range between two covered ranges.
-  // TODO(aserbin): maybe, it's better to build ranges_with_hash_schemas_ not
-  //                having any range gaps?
   if (has_custom_range) {
-    DCHECK_LT(*idx, ranges_with_hash_schemas_.size());
-    const auto& upper = ranges_with_hash_schemas_[*idx].upper;
+    DCHECK_LT(*idx, ranges_with_custom_hash_schemas_.size());
+    const auto& upper = ranges_with_custom_hash_schemas_[*idx].upper;
     if (!upper.empty() && upper <= range_key) {
       has_custom_range = false;
     }
   }
-  return has_custom_range ? ranges_with_hash_schemas_[*idx].hash_schema
+  return has_custom_range ? ranges_with_custom_hash_schemas_[*idx].hash_schema
                           : hash_schema_;
 }
 
@@ -1694,4 +1716,67 @@ Status PartitionSchema::GetRangeSchemaColumnIndexes(
   return Status::OK();
 }
 
+Status PartitionSchema::GetHashSchemaForRange(const KuduPartialRow& lower,
+                                              const Schema& schema,
+                                              HashSchema* hash_schema) const {
+  string lower_enc;
+  RETURN_NOT_OK(EncodeRangeKey(lower, schema, &lower_enc));
+  *hash_schema = GetHashSchemaForRange(lower_enc);
+  return Status::OK();
+}
+
+Status PartitionSchema::DropRange(const KuduPartialRow& lower,
+                                  const KuduPartialRow& upper,
+                                  const Schema& schema) {
+  string lower_enc;
+  RETURN_NOT_OK(EncodeRangeKey(lower, schema, &lower_enc));
+  const auto* idx_ptr = FindOrNull(
+      hash_schema_idx_by_encoded_range_start_, lower_enc);
+  if (!idx_ptr) {
+    return Status::NotFound(Substitute(
+        "'$0': range with specified lower bound not found",
+        RangeKeyDebugString(lower_enc, schema)));
+  }
+  const auto dropped_range_idx = *idx_ptr;
+
+  // Check for the upper range match.
+  string upper_enc;
+  RETURN_NOT_OK(EncodeRangeKey(upper, schema, &upper_enc));
+  const auto& dropped_range_upper_enc =
+      ranges_with_custom_hash_schemas_[dropped_range_idx].upper;
+  if (dropped_range_upper_enc != upper_enc) {
+    // Using Status::InvalidArgument() to distinguish between an absent record
+    // for the range's lower bound and non-matching upper bound.
+    return Status::InvalidArgument(Substitute(
+        "range ['$0', '$1'): '$2' vs '$3': upper bound does not match",
+        RangeKeyDebugString(lower_enc, schema),
+        RangeKeyDebugString(upper_enc, schema),
+        RangeKeyDebugString(dropped_range_upper_enc, schema),
+        RangeKeyDebugString(upper_enc, schema)));
+  }
+
+  // Update the 'ranges_with_custom_hash_schemas_' array and the
+  // 'hash_schema_idx_by_encoded_range_start_' helper map.
+  const auto size = ranges_with_custom_hash_schemas_.size();
+  DCHECK_GE(size, 1);
+  decltype(hash_schema_idx_by_encoded_range_start_) updated_mapping;
+  decltype(ranges_with_custom_hash_schemas_) updated_array;
+  updated_array.reserve(size - 1);
+  for (size_t idx = 0; idx < size; ++idx) {
+    if (idx == dropped_range_idx) {
+      continue;
+    }
+    updated_array.emplace_back(std::move(ranges_with_custom_hash_schemas_[idx]));
+    const auto cur_idx = updated_array.size() - 1;
+    InsertOrDie(&updated_mapping, updated_array[cur_idx].lower, cur_idx);
+  }
+  DCHECK_EQ(updated_array.size() + 1, size);
+  DCHECK_EQ(updated_array.size(), updated_mapping.size());
+
+  ranges_with_custom_hash_schemas_.swap(updated_array);
+  hash_schema_idx_by_encoded_range_start_.swap(updated_mapping);
+
+  return Status::OK();
+}
+
 } // namespace kudu
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index 0cddc64d2..b74ce566b 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -39,6 +39,10 @@ template <typename T> class RepeatedPtrField;
 
 namespace kudu {
 
+namespace master {
+class MasterTest_AlterTableAddAndDropRangeWithSpecificHashSchema_Test;
+}
+
 class Arena;
 class ConstContiguousRow;
 class KuduPartialRow;
@@ -301,10 +305,14 @@ class PartitionSchema {
           hash_schema_pb,
       HashSchema* hash_schema);
 
-  // Deserializes a protobuf message into a partition schema.
-  static Status FromPB(const PartitionSchemaPB& pb,
-                       const Schema& schema,
-                       PartitionSchema* partition_schema) WARN_UNUSED_RESULT;
+  // Deserializes a protobuf message into a partition schema. If not nullptr,
+  // the optional output parameter 'ranges_with_hash_schemas' is populated
+  // with the information on table's ranges with their hash schemas.
+  static Status FromPB(
+      const PartitionSchemaPB& pb,
+      const Schema& schema,
+      PartitionSchema* partition_schema,
+      RangesWithHashSchemas* ranges_with_hash_schemas = nullptr) WARN_UNUSED_RESULT;
 
   // Serializes a partition schema into a protobuf message.
   // Requires a schema to encode the range bounds.
@@ -329,9 +337,10 @@ class PartitionSchema {
       const Schema& schema,
       std::vector<Partition>* partitions) const WARN_UNUSED_RESULT;
 
-  // Similar to the method above, but uses the 'ranges_with_hash_schemas_'
-  // member field as the input to produce the result partitions.
+  // Create the set of partitions given the specified ranges with per-range
+  // hash schemas. The 'partitions' output parameter must be non-null.
   Status CreatePartitions(
+      const RangesWithHashSchemas& ranges_with_hash_schemas,
       const Schema& schema,
       std::vector<Partition>* partitions) const WARN_UNUSED_RESULT;
 
@@ -453,12 +462,12 @@ class PartitionSchema {
     return hash_schema_;
   }
 
-  const RangesWithHashSchemas& ranges_with_hash_schemas() const {
-    return ranges_with_hash_schemas_;
+  const RangesWithHashSchemas& ranges_with_custom_hash_schemas() const {
+    return ranges_with_custom_hash_schemas_;
   }
 
   bool HasCustomHashSchemas() const {
-    return !ranges_with_hash_schemas_.empty();
+    return !ranges_with_custom_hash_schemas_.empty();
   }
 
   // Given the specified table schema, populate the 'range_column_indexes'
@@ -469,8 +478,25 @@ class PartitionSchema {
       const Schema& schema,
       std::vector<int>* range_column_indexes) const;
 
+  Status GetHashSchemaForRange(const KuduPartialRow& lower,
+                               const Schema& schema,
+                               HashSchema* hash_schema) const;
+
+  // Drop range partition with the specified lower and upper bounds. The
+  // method updates member fields of this class, so that PartitionSchema::ToPB()
+  // generates PartitionSchemaPB::custom_hash_schema_ranges field accordingly.
+  Status DropRange(const KuduPartialRow& lower,
+                   const KuduPartialRow& upper,
+                   const Schema& schema);
+
  private:
   friend class PartitionPruner;
+  friend class PartitionPrunerTest;
+  FRIEND_TEST(master::MasterTest, AlterTableAddAndDropRangeWithSpecificHashSchema);
+  FRIEND_TEST(PartitionTest, CustomHashSchemaRangesToPB);
+  FRIEND_TEST(PartitionTest, DropRange);
+  FRIEND_TEST(PartitionTest, HasCustomHashSchemasWhenAddingAndDroppingRanges);
+  FRIEND_TEST(PartitionTest, TestPartitionSchemaPB);
   FRIEND_TEST(PartitionTest, TestIncrementRangePartitionBounds);
   FRIEND_TEST(PartitionTest, TestIncrementRangePartitionStringBounds);
   FRIEND_TEST(PartitionTest, TestVarcharRangePartitions);
@@ -521,13 +547,6 @@ class PartitionSchema {
       const HashSchema& hash_schema,
       const KeyEncoder<std::string>& hash_encoder);
 
-  // Create the set of partitions given the specified ranges with per-range
-  // hash schemas. The 'partitions' output parameter must be non-null.
-  Status CreatePartitions(
-      const RangesWithHashSchemas& ranges_with_hash_schemas,
-      const Schema& schema,
-      std::vector<Partition>* partitions) const;
-
   // PartitionKeyDebugString implementation for row types.
   template<typename Row>
   std::string PartitionKeyDebugStringImpl(const Row& row) const;
@@ -645,13 +664,16 @@ class PartitionSchema {
 
   RangeSchema range_schema_;
   HashSchema hash_schema_;
-  RangesWithHashSchemas ranges_with_hash_schemas_;
+
+  // This contains only ranges with range-specific (i.e. different from
+  // the table-wide) hash schemas.
+  RangesWithHashSchemas ranges_with_custom_hash_schemas_;
 
   // Encoded start of the range --> index of the hash bucket schemas for the
-  // range in the 'ranges_with_hash_schemas_' array container.
-  // NOTE: the contents of this map and 'ranges_with_hash_schemas_' are tightly
-  //       coupled -- it's necessary to clear/set this map along with
-  //       'ranges_with_hash_schemas_'.
+  // range in the 'ranges_with_custom_hash_schemas_' array container.
+  // NOTE: the contents of this map and 'ranges_with_custom_hash_schemas_'
+  //       are tightly coupled -- it's necessary to clear/set this map
+  //       along with 'ranges_with_custom_hash_schemas_'.
   typedef std::map<std::string, size_t> HashSchemasByEncodedLowerRange;
   HashSchemasByEncodedLowerRange hash_schema_idx_by_encoded_range_start_;
 };
diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc
index 3e2bc0da1..85c3cef47 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -75,41 +75,14 @@ class PartitionPrunerTest : public KuduTest {
       const vector<ColumnNameAndIntValue>& upper_int_cols,
       const vector<ColumnNamesNumBucketsAndSeed>& hash_schemas,
       PartitionSchemaPB* pb);
-};
-
-void CheckPrunedPartitions(const Schema& schema,
-                           const PartitionSchema& partition_schema,
-                           const vector<Partition>& partitions,
-                           const ScanSpec& spec,
-                           size_t remaining_tablets,
-                           size_t pruner_ranges) {
-
-  ScanSpec opt_spec(spec);
-  Arena arena(256);
-  opt_spec.OptimizeScan(schema, &arena, false);
-
-  PartitionPruner pruner;
-  pruner.Init(schema, partition_schema, opt_spec);
-
-  SCOPED_TRACE(strings::Substitute("schema: $0", schema.ToString()));
-  SCOPED_TRACE(strings::Substitute("partition schema: $0", partition_schema.DebugString(schema)));
-  // TODO(mreddy): Remove if check once PartitionSchema::PartitionKeyDebugString is modified.
-  if (partition_schema.ranges_with_hash_schemas().empty()) {
-    SCOPED_TRACE(strings::Substitute("partition pruner: $0",
-                                     pruner.ToString(schema, partition_schema)));
-  }
-  SCOPED_TRACE(strings::Substitute("optimized scan spec: $0", opt_spec.ToString(schema)));
-  SCOPED_TRACE(strings::Substitute("original  scan spec: $0", spec.ToString(schema)));
-
-  int pruned_partitions = count_if(partitions.begin(), partitions.end(),
-                                   [&] (const Partition& partition) {
-                                     return pruner.ShouldPrune(partition);
-                                   });
-
-  ASSERT_EQ(remaining_tablets, partitions.size() - pruned_partitions);
-  ASSERT_EQ(pruner_ranges, pruner.NumRangesRemaining());
-}
 
+  static void CheckPrunedPartitions(const Schema& schema,
+                                    const PartitionSchema& partition_schema,
+                                    const vector<Partition>& partitions,
+                                    const ScanSpec& spec,
+                                    size_t remaining_tablets,
+                                    size_t pruner_ranges);
+};
 
 void PartitionPrunerTest::CreatePartitionSchemaPB(
     const vector<string>& range_columns,
@@ -171,6 +144,37 @@ void PartitionPrunerTest::AddRangePartitionWithSchema(
   }
 }
 
+void PartitionPrunerTest::CheckPrunedPartitions(
+    const Schema& schema,
+    const PartitionSchema& partition_schema,
+    const vector<Partition>& partitions,
+    const ScanSpec& spec,
+    size_t remaining_tablets,
+    size_t pruner_ranges) {
+
+  ScanSpec opt_spec(spec);
+  Arena arena(256);
+  opt_spec.OptimizeScan(schema, &arena, false);
+
+  PartitionPruner pruner;
+  pruner.Init(schema, partition_schema, opt_spec);
+
+  SCOPED_TRACE(strings::Substitute("schema: $0", schema.ToString()));
+  SCOPED_TRACE(strings::Substitute("partition schema: $0", partition_schema.DebugString(schema)));
+  SCOPED_TRACE(strings::Substitute("partition pruner: $0",
+                                   pruner.ToString(schema, partition_schema)));
+  SCOPED_TRACE(strings::Substitute("optimized scan spec: $0", opt_spec.ToString(schema)));
+  SCOPED_TRACE(strings::Substitute("original  scan spec: $0", spec.ToString(schema)));
+
+  int pruned_partitions = count_if(partitions.begin(), partitions.end(),
+                                   [&] (const Partition& partition) {
+                                     return pruner.ShouldPrune(partition);
+                                   });
+
+  ASSERT_EQ(remaining_tablets, partitions.size() - pruned_partitions);
+  ASSERT_EQ(pruner_ranges, pruner.NumRangesRemaining());
+}
+
 TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) {
   // CREATE TABLE t
   // (a INT8, b INT8, c INT8)
@@ -1117,10 +1121,11 @@ TEST_F(PartitionPrunerTest, DISABLED_TestHashSchemasPerRangePruning) {
                               { {{"B"}, 2, 0} }, &pb);
 
   PartitionSchema partition_schema;
-  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+  PartitionSchema::RangesWithHashSchemas ranges;
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema, &ranges));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(ranges, schema, &partitions));
   ASSERT_EQ(12, partitions.size());
 
   // Applies the specified predicates to a scan and checks that the expected
@@ -1293,10 +1298,11 @@ TEST_F(PartitionPrunerTest, TestHashSchemasPerRangeWithPartialPrimaryKeyRangePru
                               { {{"c"}, 4, 0} }, &pb);
 
   PartitionSchema partition_schema;
-  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+  PartitionSchema::RangesWithHashSchemas ranges;
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema, &ranges));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(ranges, schema, &partitions));
   ASSERT_EQ(9, partitions.size());
 
   Arena arena(1024);
@@ -1399,10 +1405,11 @@ TEST_F(PartitionPrunerTest, TestInListHashPruningPerRange) {
                               { {{"C"}, 3, 0} }, &pb);
 
   PartitionSchema partition_schema;
-  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+  PartitionSchema::RangesWithHashSchemas ranges;
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema, &ranges));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(ranges, schema, &partitions));
   ASSERT_EQ(7, partitions.size());
 
   // Applies the specified predicates to a scan and checks that the expected
@@ -1496,10 +1503,11 @@ TEST_F(PartitionPrunerTest, DISABLED_TestSingleRangeElementAndBoundaryCase) {
   AddRangePartitionWithSchema(schema, {}, {}, {{"A", 3}}, {}, {}, &pb);
 
   PartitionSchema partition_schema;
-  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+  PartitionSchema::RangesWithHashSchemas ranges;
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema, &ranges));
 
   vector<Partition> partitions;
-  ASSERT_OK(partition_schema.CreatePartitions(schema, &partitions));
+  ASSERT_OK(partition_schema.CreatePartitions(ranges, schema, &partitions));
   ASSERT_EQ(10, partitions.size());
 
   // Applies the specified predicates to a scan and checks that the expected
diff --git a/src/kudu/common/partition_pruner.cc b/src/kudu/common/partition_pruner.cc
index fea1d3d5e..f7240c60d 100644
--- a/src/kudu/common/partition_pruner.cc
+++ b/src/kudu/common/partition_pruner.cc
@@ -428,7 +428,7 @@ void PartitionPruner::Init(const Schema& schema,
 
   // Store ranges and their corresponding hash schemas if they fall within
   // the range bounds specified by the scan.
-  if (partition_schema.ranges_with_hash_schemas_.empty()) {
+  if (partition_schema.ranges_with_custom_hash_schemas().empty()) {
     auto partition_key_ranges = ConstructPartitionKeyRanges(
         schema, scan_spec, partition_schema.hash_schema_,
         {scan_range_lower_bound, scan_range_upper_bound});
@@ -442,7 +442,7 @@ void PartitionPruner::Init(const Schema& schema,
   } else {
     vector<RangeBounds> range_bounds;
     vector<PartitionSchema::HashSchema> hash_schemas_per_range;
-    for (const auto& range : partition_schema.ranges_with_hash_schemas_) {
+    for (const auto& range : partition_schema.ranges_with_custom_hash_schemas()) {
       const auto& hash_schema = range.hash_schema;
       // Both lower and upper bounds of the scan are unbounded.
       if (scan_range_lower_bound.empty() && scan_range_upper_bound.empty()) {
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 770f9e285..2a0d88200 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1872,8 +1872,12 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   // the default partition schema (no hash bucket components and a range
   // partitioned on the primary key columns) will be used.
   PartitionSchema partition_schema;
+  PartitionSchema::RangesWithHashSchemas ranges_with_hash_schemas;
   RETURN_NOT_OK(SetupError(
-      PartitionSchema::FromPB(req.partition_schema(), schema, &partition_schema),
+      PartitionSchema::FromPB(req.partition_schema(),
+                              schema,
+                              &partition_schema,
+                              &ranges_with_hash_schemas),
       resp, MasterErrorPB::INVALID_SCHEMA));
 
   // Decode split rows and range bounds.
@@ -1932,8 +1936,9 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
           "both range bounds and custom hash schema ranges must not be "
           "populated at the same time");
     }
-    // Create partitions based on specified ranges with custom hash schemas.
-    RETURN_NOT_OK(partition_schema.CreatePartitions(schema, &partitions));
+    // Create partitions based on the specified ranges and their hash schemas.
+    RETURN_NOT_OK(partition_schema.CreatePartitions(
+        ranges_with_hash_schemas, schema, &partitions));
   } else {
     // Create partitions based on specified partition schema and split rows.
     RETURN_NOT_OK(partition_schema.CreatePartitions(
@@ -2609,7 +2614,12 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
     const vector<AlterTableRequestPB::Step>& steps,
     TableMetadataLock* l,
     vector<scoped_refptr<TabletInfo>>* tablets_to_add,
-    vector<scoped_refptr<TabletInfo>>* tablets_to_drop) {
+    vector<scoped_refptr<TabletInfo>>* tablets_to_drop,
+    bool* partition_schema_updated) {
+  DCHECK(l);
+  DCHECK(tablets_to_add);
+  DCHECK(tablets_to_drop);
+  DCHECK(partition_schema_updated);
 
   // Get the table's schema as it's known to the catalog manager.
   Schema schema;
@@ -2627,6 +2637,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
   });
 
   vector<PartitionSchema::HashSchema> range_hash_schemas;
+  size_t partition_schema_updates = 0;
   for (const auto& step : steps) {
     CHECK(step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION ||
           step.type() == AlterTableRequestPB::DROP_RANGE_PARTITION);
@@ -2665,39 +2676,64 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
     vector<Partition> partitions;
     const pair<KuduPartialRow, KuduPartialRow> range_bound =
         { *ops[0].split_row, *ops[1].split_row };
-    if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION &&
-        FLAGS_enable_per_range_hash_schemas &&
-        step.add_range_partition().custom_hash_schema_size() > 0) {
+    if (!FLAGS_enable_per_range_hash_schemas) {
+      RETURN_NOT_OK(partition_schema.CreatePartitions(
+          {}, { range_bound }, schema, &partitions));
+    } else {
       const Schema schema = client_schema.CopyWithColumnIds();
-      PartitionSchema::HashSchema hash_schema;
-      RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB(
-          schema, step.add_range_partition().custom_hash_schema(), &hash_schema));
-      if (partition_schema.hash_schema().size() != hash_schema.size()) {
-        return Status::NotSupported(
-            "varying number of hash dimensions per range is not yet supported");
-      }
-      RETURN_NOT_OK(partition_schema.CreatePartitionsForRange(
-          range_bound, hash_schema, schema, &partitions));
-
-      // Add information on the new range with custom hash schema into the
-      // PartitionSchema for the table stored in the system catalog.
-      auto* p = l->mutable_data()->pb.mutable_partition_schema();
-      auto* range = p->add_custom_hash_schema_ranges();
-      RowOperationsPBEncoder encoder(range->mutable_range_bounds());
-      encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, *ops[0].split_row);
-      encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, *ops[1].split_row);
-      for (const auto& hash_dimension : hash_schema) {
-        auto* hash_dimension_pb = range->add_hash_schema();
-        hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
-        hash_dimension_pb->set_seed(hash_dimension.seed);
-        auto* columns = hash_dimension_pb->add_columns();
-        for (const auto& column_id : hash_dimension.column_ids) {
-          columns->set_id(column_id);
+      if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION &&
+          step.add_range_partition().custom_hash_schema_size() > 0) {
+        PartitionSchema::HashSchema hash_schema;
+        RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB(
+            schema, step.add_range_partition().custom_hash_schema(), &hash_schema));
+        if (partition_schema.hash_schema().size() != hash_schema.size()) {
+          return Status::NotSupported(
+              "varying number of hash dimensions per range is not yet supported");
+        }
+        RETURN_NOT_OK(partition_schema.CreatePartitionsForRange(
+            range_bound, hash_schema, schema, &partitions));
+
+        // Add information on the new range with custom hash schema into the
+        // PartitionSchema for the table stored in the system catalog.
+        auto* p = l->mutable_data()->pb.mutable_partition_schema();
+        auto* range = p->add_custom_hash_schema_ranges();
+        RowOperationsPBEncoder encoder(range->mutable_range_bounds());
+        encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, range_bound.first);
+        encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, range_bound.second);
+        for (const auto& hash_dimension : hash_schema) {
+          auto* hash_dimension_pb = range->add_hash_schema();
+          hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
+          hash_dimension_pb->set_seed(hash_dimension.seed);
+          auto* columns = hash_dimension_pb->add_columns();
+          for (const auto& column_id : hash_dimension.column_ids) {
+            columns->set_id(column_id);
+          }
+        }
+        ++partition_schema_updates;
+      } else if (step.type() == AlterTableRequestPB::DROP_RANGE_PARTITION) {
+        PartitionSchema::HashSchema range_hash_schema;
+        RETURN_NOT_OK(partition_schema.GetHashSchemaForRange(
+            range_bound.first, schema, &range_hash_schema));
+        RETURN_NOT_OK(partition_schema.CreatePartitionsForRange(
+            range_bound, range_hash_schema, schema, &partitions));
+
+        // Update the partition schema information to be stored in the system
+        // catalog table. The information on a range with the table-wide hash
+        // schema must not be present in the PartitionSchemaPB that the system
+        // catalog stores, so this is necessary only if the range has custom
+        // (i.e. other than the table-wide) hash schema.
+        if (range_hash_schema != partition_schema.hash_schema()) {
+          RETURN_NOT_OK(partition_schema.DropRange(
+              range_bound.first, range_bound.second, schema));
+          PartitionSchemaPB ps_pb;
+          partition_schema.ToPB(schema, &ps_pb);
+          // Make sure exactly one range is gone.
+          DCHECK_EQ(ps_pb.custom_hash_schema_ranges_size() + 1,
+                    l->data().pb.partition_schema().custom_hash_schema_ranges_size());
+          *(l->mutable_data()->pb.mutable_partition_schema()) = std::move(ps_pb);
+          ++partition_schema_updates;
         }
       }
-    } else {
-      RETURN_NOT_OK(partition_schema.CreatePartitions(
-          {}, { range_bound }, schema, &partitions));
     }
 
     switch (step.type()) {
@@ -2860,6 +2896,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
     tablets_to_add->emplace_back(std::move(tablet.second));
   }
   abort_mutations.cancel();
+  *partition_schema_updated = partition_schema_updates > 0;
   return Status::OK();
 }
 
@@ -3218,6 +3255,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
   // 7. Alter table partitioning.
   vector<scoped_refptr<TabletInfo>> tablets_to_add;
   vector<scoped_refptr<TabletInfo>> tablets_to_drop;
+  bool partition_schema_updated = false;
   if (!alter_partitioning_steps.empty()) {
     TRACE("Apply alter partitioning");
     Schema client_schema;
@@ -3225,7 +3263,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
         resp, MasterErrorPB::UNKNOWN_ERROR));
     RETURN_NOT_OK(SetupError(ApplyAlterPartitioningSteps(
         table, client_schema, alter_partitioning_steps, &l,
-        &tablets_to_add, &tablets_to_drop),
+        &tablets_to_add, &tablets_to_drop, &partition_schema_updated),
                              resp, MasterErrorPB::UNKNOWN_ERROR));
   }
 
@@ -3257,18 +3295,19 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
   }
 
   // Set to true if columns are altered, added or dropped.
-  bool has_schema_changes = !alter_schema_steps.empty();
+  const bool has_schema_changes = !alter_schema_steps.empty();
   // Set to true if there are schema changes, the table is renamed,
   // or if any other table properties changed.
-  bool has_metadata_changes = has_schema_changes ||
+  const bool has_metadata_changes = has_schema_changes ||
       req.has_new_table_name() || req.has_new_table_owner() ||
       !req.new_extra_configs().empty() || req.has_disk_size_limit() ||
       req.has_row_count_limit() || req.has_new_table_comment() ||
       num_replicas_changed;
   // Set to true if there are partitioning changes.
-  bool has_partitioning_changes = !alter_partitioning_steps.empty();
+  const bool has_partitioning_changes = !alter_partitioning_steps.empty() ||
+      partition_schema_updated;
   // Set to true if metadata changes need to be applied to existing tablets.
-  bool has_metadata_changes_for_existing_tablets =
+  const bool has_metadata_changes_for_existing_tablets =
     has_metadata_changes &&
     (table->num_tablets() > tablets_to_drop.size() || num_replicas_changed);
 
@@ -3414,7 +3453,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
     }
   }
 
-  if (!tablets_to_add.empty() || has_metadata_changes) {
+  if (!tablets_to_add.empty() || has_metadata_changes || partition_schema_updated) {
     l.Commit();
   } else {
     l.Unlock();
@@ -3516,9 +3555,7 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req,
   resp->set_owner(l.data().pb.owner());
   resp->set_comment(l.data().pb.comment());
 
-  RETURN_NOT_OK(ExtraConfigPBToPBMap(l.data().pb.extra_config(), resp->mutable_extra_configs()));
-
-  return Status::OK();
+  return ExtraConfigPBToPBMap(l.data().pb.extra_config(), resp->mutable_extra_configs());
 }
 
 Status CatalogManager::ListTables(const ListTablesRequestPB* req,
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 08a78f5ad..9de7ba230 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -871,6 +871,12 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   static bool IsTableWriteDisabled(const scoped_refptr<TableInfo>& table,
                                    const std::string& table_name);
 
+  static Status ApplyAlterSchemaSteps(
+      const SysTablesEntryPB& current_pb,
+      const std::vector<AlterTableRequestPB::Step>& steps,
+      Schema* new_schema,
+      ColumnId* next_col_id);
+
   // Delete the specified table in the catalog. If 'user' is provided,
   // checks that the user is authorized to delete the table. Otherwise,
   // it indicates its an internal operation (originates from catalog
@@ -1070,19 +1076,14 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // container must not be empty.
   Status DeleteTskEntries(const std::set<std::string>& entry_ids);
 
-  Status ApplyAlterSchemaSteps(
-      const SysTablesEntryPB& current_pb,
-      const std::vector<AlterTableRequestPB::Step>& steps,
-      Schema* new_schema,
-      ColumnId* next_col_id);
-
   Status ApplyAlterPartitioningSteps(
       const scoped_refptr<TableInfo>& table,
       const Schema& client_schema,
       const std::vector<AlterTableRequestPB::Step>& steps,
       TableMetadataLock* l,
       std::vector<scoped_refptr<TabletInfo>>* tablets_to_add,
-      std::vector<scoped_refptr<TabletInfo>>* tablets_to_drop);
+      std::vector<scoped_refptr<TabletInfo>>* tablets_to_drop,
+      bool* partition_schema_updated);
 
   // Task that takes care of the tablet assignments/creations.
   // Loops through the "not created" tablets and sends a CreateTablet() request.
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index cd466b256..ff91b65db 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -1001,7 +1001,7 @@ TEST_P(AlterTableWithRangeSpecificHashSchema, TestAlterTableWithDifferentHashDim
 INSTANTIATE_TEST_SUITE_P(AlterTableWithCustomHashSchema,
                          AlterTableWithRangeSpecificHashSchema, ::testing::Bool());
 
-TEST_F(MasterTest, AlterTableAddRangeWithSpecificHashSchema) {
+TEST_F(MasterTest, AlterTableAddAndDropRangeWithSpecificHashSchema) {
   constexpr const char* const kTableName = "alter_table_custom_hash_schema";
   constexpr const char* const kCol0 = "c_int32";
   constexpr const char* const kCol1 = "c_int64";
@@ -1010,7 +1010,24 @@ TEST_F(MasterTest, AlterTableAddRangeWithSpecificHashSchema) {
   FLAGS_enable_per_range_hash_schemas = true;
   FLAGS_default_num_replicas = 1;
 
-  // Create a table with one range partition based in the table-wide hash schema.
+  const auto partition_schema_retriever = [this](PartitionSchemaPB* ps_pb) {
+    GetTableSchemaRequestPB req;
+    req.mutable_table()->set_table_name(kTableName);
+
+    RpcController ctl;
+    GetTableSchemaResponsePB resp;
+    RETURN_NOT_OK(proxy_->GetTableSchema(req, &resp, &ctl));
+    if (resp.has_error()) {
+      return StatusFromPB(resp.error().status());
+    }
+    if (!resp.has_partition_schema()) {
+      return Status::IllegalState("partition schema information is missing");
+    }
+    *ps_pb = resp.partition_schema();
+    return Status::OK();
+  };
+
+  // Create a table with one range partition based on the table-wide hash schema.
   CreateTableResponsePB create_table_resp;
   {
     KuduPartialRow lower(&kTableSchema);
@@ -1073,6 +1090,11 @@ TEST_F(MasterTest, AlterTableAddRangeWithSpecificHashSchema) {
       ASSERT_EQ(1, tables.size());
       // 2 tablets (because of 2 hash buckets) for already existing range.
       ASSERT_EQ(2, tables.front()->num_tablets());
+
+      // Check the partition schema stored in the system catalog.
+      PartitionSchemaPB ps_pb;
+      ASSERT_OK(partition_schema_retriever(&ps_pb));
+      ASSERT_EQ(0, ps_pb.custom_hash_schema_ranges_size());
     }
 
     RpcController ctl;
@@ -1088,6 +1110,11 @@ TEST_F(MasterTest, AlterTableAddRangeWithSpecificHashSchema) {
       ASSERT_EQ(1, tables.size());
       // Extra 5 tablets (because of 5 hash buckets) for newly added range.
       ASSERT_EQ(7, tables.front()->num_tablets());
+
+      // Check the partition schema stored in the system catalog.
+      PartitionSchemaPB ps_pb;
+      ASSERT_OK(partition_schema_retriever(&ps_pb));
+      ASSERT_EQ(1, ps_pb.custom_hash_schema_ranges_size());
     }
   }
 
@@ -1125,13 +1152,81 @@ TEST_F(MasterTest, AlterTableAddRangeWithSpecificHashSchema) {
     ASSERT_EQ(1, table_wide_hash_schema.size());
     ASSERT_EQ(2, table_wide_hash_schema.front().num_buckets);
 
-    const auto& ranges_with_hash_schemas = ps.ranges_with_hash_schemas();
-    ASSERT_EQ(ranges_with_hash_schemas.size(), 1);
+    const auto& ranges_with_hash_schemas = ps.ranges_with_custom_hash_schemas();
+    ASSERT_EQ(1, ranges_with_hash_schemas.size());
     const auto& custom_hash_schema = ranges_with_hash_schemas.front().hash_schema;
     ASSERT_EQ(1, custom_hash_schema.size());
     ASSERT_EQ(5, custom_hash_schema.front().num_buckets);
     ASSERT_EQ(1, custom_hash_schema.front().seed);
   }
+
+  // Now verify that everything works as expected when dropping a range
+  // partition with custom hash schema.
+  {
+    AlterTableRequestPB req;
+    req.mutable_table()->set_table_name(kTableName);
+    req.mutable_table()->set_table_id(table_id);
+
+    // Add the required information on the table's schema:
+    // key and non-null columns must be present in the request.
+    {
+      ColumnSchemaPB* col0 = req.mutable_schema()->add_columns();
+      col0->set_name(kCol0);
+      col0->set_type(INT32);
+      col0->set_is_key(true);
+
+      ColumnSchemaPB* col1 = req.mutable_schema()->add_columns();
+      col1->set_name(kCol1);
+      col1->set_type(INT64);
+    }
+
+    AlterTableRequestPB::Step* step = req.add_alter_schema_steps();
+    step->set_type(AlterTableRequestPB::DROP_RANGE_PARTITION);
+    KuduPartialRow lower(&kTableSchema);
+    ASSERT_OK(lower.SetInt32(kCol0, 100));
+    KuduPartialRow upper(&kTableSchema);
+    ASSERT_OK(upper.SetInt32(kCol0, 200));
+    RowOperationsPBEncoder enc(
+        step->mutable_drop_range_partition()->mutable_range_bounds());
+    enc.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    enc.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    // Check the number of tablets in the table before ALTER TABLE.
+    {
+      CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+      std::vector<scoped_refptr<TableInfo>> tables;
+      master_->catalog_manager()->GetAllTables(&tables);
+      ASSERT_EQ(1, tables.size());
+      // Extra 5 tablets (because of 5 hash buckets) for newly added range.
+      ASSERT_EQ(7, tables.front()->num_tablets());
+
+      // Check the partition schema stored in the system catalog.
+      PartitionSchemaPB ps_pb;
+      ASSERT_OK(partition_schema_retriever(&ps_pb));
+      ASSERT_EQ(1, ps_pb.custom_hash_schema_ranges_size());
+    }
+
+    RpcController ctl;
+    AlterTableResponsePB resp;
+    ASSERT_OK(proxy_->AlterTable(req, &resp, &ctl));
+    ASSERT_FALSE(resp.has_error())
+        << StatusFromPB(resp.error().status()).ToString();
+
+    // Check the number of tablets in the table after ALTER TABLE.
+    {
+      CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+      std::vector<scoped_refptr<TableInfo>> tables;
+      master_->catalog_manager()->GetAllTables(&tables);
+      ASSERT_EQ(1, tables.size());
+      // 2 tablets (because of 2 hash buckets) for already existing range.
+      ASSERT_EQ(2, tables.front()->num_tablets());
+
+      // Check the partition schema stored in the system catalog.
+      PartitionSchemaPB ps_pb;
+      ASSERT_OK(partition_schema_retriever(&ps_pb));
+      ASSERT_EQ(0, ps_pb.custom_hash_schema_ranges_size());
+    }
+  }
 }
 
 TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {