You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/08/13 20:32:21 UTC

[kudu] branch master updated: KUDU-2671 key encoding for custom hash bucket schemas

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ecbd870  KUDU-2671 key encoding for custom hash bucket schemas
ecbd870 is described below

commit ecbd8700e49bf60f15a9d261f0de425f2ba5413e
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Tue Aug 3 19:33:19 2021 -0700

    KUDU-2671 key encoding for custom hash bucket schemas
    
    This patch updates the PartitionSchema class to properly encode range
    keys in the presence of range partitions with custom hash bucket schemas
    in a Kudu table.  I added a few new test scenarios and updated the
    existing ones accordingly to make it possible to send write operations
    into appropriate tablets.
    
    In addition, I'm sneaking in minor style-related updates in
      * src/kudu/tablet/tablet.cc
      * src/kudu/master/catalog_manager.cc
    
    As one can see, the newly added method GetHashBucketSchemasForKey() is
    used in PartitionSchema also in case when Partition object is at hand.
    A follow-up changelist is going to change that, so there will be no need
    to perform a dictionary lookup once the Partition class contains
    information about its hash bucket schema.
    
    This is a follow-up to 586b7913258df2d0ee75470ddfb2b88d472ba235.
    
    Change-Id: I81aa1c10998d88a1bd5314fc3132037b1c8bbe4b
    Reviewed-on: http://gerrit.cloudera.org:8080/17769
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/client.cc                        |  17 ++-
 src/kudu/client/client.h                         |   2 +-
 src/kudu/client/flex_partitioning_client-test.cc | 176 +++++++++++++++++++++--
 src/kudu/client/meta_cache.cc                    |   3 +-
 src/kudu/client/write_op.h                       |   2 +-
 src/kudu/common/key_encoder.cc                   |   4 +-
 src/kudu/common/key_encoder.h                    |  18 +--
 src/kudu/common/partition-test.cc                |   2 +-
 src/kudu/common/partition.cc                     | 164 +++++++++++++--------
 src/kudu/common/partition.h                      |  28 +++-
 src/kudu/master/catalog_manager.cc               |  16 +--
 src/kudu/tablet/tablet.cc                        |  11 +-
 12 files changed, 338 insertions(+), 105 deletions(-)

diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index d05aaa8..1befc90 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -1013,12 +1013,17 @@ Status KuduTableCreator::Create() {
       // Populate corresponding element in 'range_hash_schemas' if there is at
       // least one range with custom hash partitioning schema.
       auto* schemas_pb = partition_schema->add_range_hash_schemas();
-      for (const auto& schema : range->hash_bucket_schemas_) {
-        auto* pb = schemas_pb->add_hash_schemas();
-        pb->set_seed(schema.seed);
-        pb->set_num_buckets(schema.num_buckets);
-        for (const auto& column_name : schema.column_names) {
-          pb->add_columns()->set_name(column_name);
+      if (range->hash_bucket_schemas_.empty()) {
+        schemas_pb->mutable_hash_schemas()->CopyFrom(
+            data_->partition_schema_.hash_bucket_schemas());
+      } else {
+        for (const auto& schema : range->hash_bucket_schemas_) {
+          auto* pb = schemas_pb->add_hash_schemas();
+          pb->set_seed(schema.seed);
+          pb->set_num_buckets(schema.num_buckets);
+          for (const auto& column_name : schema.column_names) {
+            pb->add_columns()->set_name(column_name);
+          }
         }
       }
     }
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 853d0a1..ab3ff08 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1267,7 +1267,7 @@ class KUDU_EXPORT KuduTableCreator {
     DISALLOW_COPY_AND_ASSIGN(KuduRangePartition);
   };
 
-  /// Add a range partition to the table.
+  /// Add a range partition with table-wide hash bucket schema.
   ///
   /// Multiple range partitions may be added, but they must not overlap. All
   /// range splits specified by @c add_range_partition_split must fall in a
diff --git a/src/kudu/client/flex_partitioning_client-test.cc b/src/kudu/client/flex_partitioning_client-test.cc
index 0d303ab..f7725d0 100644
--- a/src/kudu/client/flex_partitioning_client-test.cc
+++ b/src/kudu/client/flex_partitioning_client-test.cc
@@ -27,15 +27,22 @@
 #include <gtest/gtest.h>
 
 #include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/write_op.h"
 #include "kudu/common/partial_row.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -44,10 +51,12 @@
 DECLARE_bool(enable_per_range_hash_schemas);
 DECLARE_int32(heartbeat_interval_ms);
 
+using kudu::client::sp::shared_ptr;
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
 using kudu::master::CatalogManager;
-using kudu::client::sp::shared_ptr;
+using kudu::master::TabletInfo;
+using kudu::tablet::TabletReplica;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -114,11 +123,24 @@ class FlexPartitioningTest : public KuduTest {
     return session->Apply(insert.release());
   }
 
+  static Status FetchSessionErrors(KuduSession* session,
+                                   vector<KuduError*>* errors = nullptr) {
+    if (errors) {
+      bool overflowed;
+      session->GetPendingErrors(errors, &overflowed);
+      if (PREDICT_FALSE(overflowed)) {
+        return Status::RuntimeError("session error buffer overflowed");
+      }
+    }
+    return Status::OK();
+  }
+
   Status InsertTestRows(
       const char* table_name,
       int32_t key_beg,
       int32_t key_end,
-      KuduSession::FlushMode flush_mode = KuduSession::AUTO_FLUSH_SYNC) {
+      KuduSession::FlushMode flush_mode = KuduSession::AUTO_FLUSH_SYNC,
+      vector<KuduError*>* errors = nullptr) {
     CHECK_LE(key_beg, key_end);
     shared_ptr<KuduTable> table;
     RETURN_NOT_OK(client_->OpenTable(table_name, &table));
@@ -126,10 +148,23 @@ class FlexPartitioningTest : public KuduTest {
     RETURN_NOT_OK(session->SetFlushMode(flush_mode));
     session->SetTimeoutMillis(60000);
     for (int32_t key_val = key_beg; key_val < key_end; ++key_val) {
-      RETURN_NOT_OK(ApplyInsert(
-          session.get(), table, key_val, rand(), std::to_string(rand())));
+      if (const auto s = ApplyInsert(session.get(),
+                                     table,
+                                     key_val,
+                                     rand(),
+                                     std::to_string(rand()));
+          !s.ok()) {
+        RETURN_NOT_OK(FetchSessionErrors(session.get(), errors));
+        return s;
+      }
+    }
+
+    const auto s = session->Flush();
+    if (!s.ok()) {
+      RETURN_NOT_OK(FetchSessionErrors(session.get(), errors));
     }
-    return session->Flush();
+
+    return s;
   }
 
   Status CreateTable(const char* table_name, RangePartitions partitions) {
@@ -170,6 +205,54 @@ class FlexPartitioningTest : public KuduTest {
     ASSERT_EQ(expected_count, table_info->num_tablets());
   }
 
+  void CheckTableRowsNum(const char* table_name,
+                         int64_t expected_count) {
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client_->OpenTable(table_name, &table));
+    KuduScanner scanner(table.get());
+    ASSERT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
+    ASSERT_OK(scanner.SetReadMode(KuduScanner::READ_YOUR_WRITES));
+    ASSERT_OK(scanner.Open());
+
+    int64_t count = 0;
+    KuduScanBatch batch;
+    while (scanner.HasMoreRows()) {
+      ASSERT_OK(scanner.NextBatch(&batch));
+      count += batch.NumRows();
+    }
+    ASSERT_EQ(expected_count, count);
+  }
+
+  void CheckLiveRowCount(const char* table_name,
+                         int64_t expected_count) {
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client_->OpenTable(table_name, &table));
+
+    vector<scoped_refptr<TabletInfo>> all_tablets_info;
+    {
+      auto* cm = cluster_->mini_master(0)->master()->catalog_manager();
+      CatalogManager::ScopedLeaderSharedLock l(cm);
+      scoped_refptr<master::TableInfo> table_info;
+      ASSERT_OK(cm->GetTableInfo(table->id(), &table_info));
+      table_info->GetAllTablets(&all_tablets_info);
+    }
+    vector<scoped_refptr<TabletReplica>> replicas;
+    for (const auto& tablet_info : all_tablets_info) {
+      for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+        scoped_refptr<TabletReplica> r;
+        ASSERT_TRUE(cluster_->mini_tablet_server(i)->server()->
+                        tablet_manager()->LookupTablet(tablet_info->id(), &r));
+        replicas.emplace_back(std::move(r));
+      }
+    }
+
+    int64_t count = 0;
+    for (const auto& r : replicas) {
+      count += r->CountLiveRowsNoFail();
+    }
+    ASSERT_EQ(expected_count, count);
+  }
+
   KuduSchema schema_;
   unique_ptr<InternalMiniCluster> cluster_;
   shared_ptr<KuduClient> client_;
@@ -187,20 +270,52 @@ class FlexPartitioningCreateTableTest : public FlexPartitioningTest {};
 // TODO(aserbin): add verification based on PartitionSchema provided by
 //                KuduTable::partition_schema() once PartitionPruner
 //                recognized custom hash bucket schema for ranges
-// TODO(aserbin): add InsertTestRows() when proper key encoding is implemented
 TEST_F(FlexPartitioningCreateTableTest, CustomHashBuckets) {
   // One-level hash bucket structure: { 3, "key" }.
   {
     constexpr const char* const kTableName = "3@key";
     RangePartitions partitions;
-    partitions.emplace_back(CreateRangePartition());
+    partitions.emplace_back(CreateRangePartition(0, 100));
     auto& p = partitions.back();
     ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 0));
     ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
     NO_FATALS(CheckTabletCount(kTableName, 3));
+    ASSERT_OK(InsertTestRows(kTableName, 0, 100));
+    NO_FATALS(CheckTableRowsNum(kTableName, 100));
   }
 }
 
+TEST_F(FlexPartitioningCreateTableTest, TableWideHashBuckets) {
+  // Create a table with the following partitions:
+  //
+  //            hash bucket
+  //   key    0           1
+  //         -------------------------
+  //  <111    x:{key}     x:{key}
+  constexpr const char* const kTableName = "TableWideHashBuckets";
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  table_creator->table_name(kTableName)
+      .schema(&schema_)
+      .num_replicas(1)
+      .add_hash_partitions({ kKeyColumn }, 2)
+      .set_range_partition_columns({ kKeyColumn });
+
+  // Add a range partition with the table-wide hash partitioning rules.
+  {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    ASSERT_OK(lower->SetInt32(kKeyColumn, INT32_MIN));
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
+    table_creator->add_range_partition(lower.release(), upper.release());
+  }
+
+  ASSERT_OK(table_creator->Create());
+  NO_FATALS(CheckTabletCount(kTableName, 2));
+  ASSERT_OK(InsertTestRows(kTableName, -111, 111, KuduSession::MANUAL_FLUSH));
+  NO_FATALS(CheckTableRowsNum(kTableName, 222));
+}
+
 // Create a table with mixed set of range partitions, using both table-wide and
 // custom hash bucket schemas.
 //
@@ -262,8 +377,51 @@ TEST_F(FlexPartitioningCreateTableTest, DefaultAndCustomHashBuckets) {
 
   ASSERT_OK(table_creator->Create());
   NO_FATALS(CheckTabletCount(kTableName, 11));
-  // Make sure it's possible to insert rows into the table.
-  //ASSERT_OK(InsertTestRows(kTableName, 111, 444));
+
+  // Make sure it's possible to insert rows into the table for all the existing
+  // the paritions: first check the range of table-wide schema, then check
+  // the ranges with custom hash schemas.
+  // TODO(aserbin): uncomment CheckTableRowsNum() once partition pruning works
+  ASSERT_OK(InsertTestRows(kTableName, -111, 111));
+  NO_FATALS(CheckLiveRowCount(kTableName, 222));
+  //NO_FATALS(CheckTableRowsNum(kTableName, 222));
+  ASSERT_OK(InsertTestRows(kTableName, 111, 444));
+  NO_FATALS(CheckLiveRowCount(kTableName, 555));
+  //NO_FATALS(CheckTableRowsNum(kTableName, 555));
+
+  // Meanwhile, inserting into non-covered ranges should result in a proper
+  // error status return to the client attempting such an operation.
+  {
+    vector<KuduError*> errors;
+    ElementDeleter drop(&errors);
+    auto s = InsertTestRows(
+        kTableName, 444, 445, KuduSession::AUTO_FLUSH_SYNC, &errors);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
+    ASSERT_EQ(1, errors.size());
+    const auto& err = errors[0]->status();
+    EXPECT_TRUE(err.IsNotFound()) << err.ToString();
+    ASSERT_STR_CONTAINS(err.ToString(),
+                        "No tablet covering the requested range partition");
+  }
+  // Try same as in the scope above, but do so for multiple rows to make sure
+  // custom hash bucketing isn't inducing any unexpected side-effects.
+  {
+    constexpr int kNumRows = 10;
+    vector<KuduError*> errors;
+    ElementDeleter drop(&errors);
+    auto s = InsertTestRows(
+        kTableName, 445, 445 + kNumRows, KuduSession::MANUAL_FLUSH, &errors);
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
+    ASSERT_EQ(kNumRows, errors.size());
+    for (const auto& e : errors) {
+      const auto& err = e->status();
+      EXPECT_TRUE(err.IsNotFound()) << err.ToString();
+      ASSERT_STR_CONTAINS(err.ToString(),
+                          "No tablet covering the requested range partition");
+    }
+  }
 }
 
 // Negative tests scenarios to cover non-OK status codes for various operations
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 70fdc36..9f16beb 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -385,7 +385,8 @@ string RemoteTablet::ReplicasAsStringUnlocked() const {
 bool MetaCacheEntry::Contains(const string& partition_key) const {
   DCHECK(Initialized());
   return lower_bound_partition_key() <= partition_key &&
-         (upper_bound_partition_key().empty() || upper_bound_partition_key() > partition_key);
+         (upper_bound_partition_key().empty() ||
+          upper_bound_partition_key() > partition_key);
 }
 
 bool MetaCacheEntry::stale() const {
diff --git a/src/kudu/client/write_op.h b/src/kudu/client/write_op.h
index 9420cea..243aca8 100644
--- a/src/kudu/client/write_op.h
+++ b/src/kudu/client/write_op.h
@@ -61,7 +61,7 @@ class KuduTable;
 ///   KuduInsert* t = table->NewInsert();
 ///   KUDU_CHECK_OK(t->mutable_row()->SetInt32("key", 1234));
 ///   KUDU_CHECK_OK(t->mutable_row()->SetStringCopy("foo", "bar"));
-///   session->Apply(t);
+///   KUDU_CHECK_OK(session->Apply(t));
 /// @endcode
 class KUDU_EXPORT KuduWriteOperation {
  public:
diff --git a/src/kudu/common/key_encoder.cc b/src/kudu/common/key_encoder.cc
index cd0439a..a2053fb 100644
--- a/src/kudu/common/key_encoder.cc
+++ b/src/kudu/common/key_encoder.cc
@@ -38,7 +38,7 @@ class EncoderResolver {
     return *encoders_[t];
   }
 
-  const bool HasKeyEncoderForType(DataType t) {
+  bool HasKeyEncoderForType(DataType t) {
     return t < encoders_.size() && encoders_[t];
   }
 
@@ -78,7 +78,7 @@ const KeyEncoder<Buffer>& GetKeyEncoder(const TypeInfo* typeinfo) {
 }
 
 // Returns true if the type is allowed in keys.
-const bool IsTypeAllowableInKey(const TypeInfo* typeinfo) {
+bool IsTypeAllowableInKey(const TypeInfo* typeinfo) {
   return Singleton<EncoderResolver<faststring>>::get()->HasKeyEncoderForType(
       typeinfo->physical_type());
 }
diff --git a/src/kudu/common/key_encoder.h b/src/kudu/common/key_encoder.h
index 214cc57..adad1e7 100644
--- a/src/kudu/common/key_encoder.h
+++ b/src/kudu/common/key_encoder.h
@@ -96,7 +96,8 @@ struct KeyEncoderTraits<Type,
     dst->append(reinterpret_cast<const char*>(&key_unsigned), sizeof(key_unsigned));
   }
 
-  static void EncodeWithSeparators(const void* key, bool is_last, Buffer* dst) {
+  static void EncodeWithSeparators(
+      const void* key, bool /*is_last*/, Buffer* dst) {
     Encode(key, dst);
   }
 
@@ -105,7 +106,8 @@ struct KeyEncoderTraits<Type,
                                  Arena* /*arena*/,
                                  uint8_t* cell_ptr) {
     if (PREDICT_FALSE(encoded_key->size() < sizeof(cpp_type))) {
-      return Status::InvalidArgument("key too short", KUDU_REDACT(encoded_key->ToDebugString()));
+      return Status::InvalidArgument(
+          "key too short", KUDU_REDACT(encoded_key->ToDebugString()));
     }
 
     unsigned_cpp_type val;
@@ -343,10 +345,10 @@ class KeyEncoder {
  private:
   friend class EncoderResolver<Buffer>;
   template<typename EncoderTraitsClass>
-  explicit KeyEncoder(EncoderTraitsClass t)
-    : encode_func_(EncoderTraitsClass::Encode),
-      encode_with_separators_func_(EncoderTraitsClass::EncodeWithSeparators),
-      decode_key_portion_func_(EncoderTraitsClass::DecodeKeyPortion) {
+  explicit KeyEncoder(EncoderTraitsClass /*t*/)
+      : encode_func_(EncoderTraitsClass::Encode),
+        encode_with_separators_func_(EncoderTraitsClass::EncodeWithSeparators),
+        decode_key_portion_func_(EncoderTraitsClass::DecodeKeyPortion) {
   }
 
   typedef void (*EncodeFunc)(const void* key, Buffer* dst);
@@ -355,7 +357,7 @@ class KeyEncoder {
   const EncodeWithSeparatorsFunc encode_with_separators_func_;
 
   typedef Status (*DecodeKeyPortionFunc)(Slice* enc_key, bool is_last,
-                                       Arena* arena, uint8_t* cell_ptr);
+                                         Arena* arena, uint8_t* cell_ptr);
   const DecodeKeyPortionFunc decode_key_portion_func_;
 
  private:
@@ -365,7 +367,7 @@ class KeyEncoder {
 template <typename Buffer>
 extern const KeyEncoder<Buffer>& GetKeyEncoder(const TypeInfo* typeinfo);
 
-extern const bool IsTypeAllowableInKey(const TypeInfo* typeinfo);
+extern bool IsTypeAllowableInKey(const TypeInfo* typeinfo);
 
 } // namespace kudu
 
diff --git a/src/kudu/common/partition-test.cc b/src/kudu/common/partition-test.cc
index 411fd3c..1666491 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -300,7 +300,7 @@ TEST_F(PartitionTest, TestPartitionKeyEncoding) {
     // Check that row values are redacted from error messages when
     // --redact is set with 'log'.
 
-    EXPECT_EQ("<hash-decode-error>",
+    EXPECT_EQ("<range-decode-error>",
               partition_schema.PartitionKeyDebugString(string("\0\1\0\1", 4), schema));
     EXPECT_EQ("HASH (a, b): 0, HASH (c): 0, RANGE (a, b, c): "
               "<range-key-decode-error: Invalid argument: "
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index 1a47db0..d52b09e 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -21,7 +21,6 @@
 #include <cstring>
 #include <iterator>
 #include <memory>
-#include <ostream>
 #include <set>
 #include <string>
 #include <unordered_set>
@@ -67,22 +66,21 @@ class faststring;
 static const size_t kEncodedBucketSize = sizeof(uint32_t);
 
 Slice Partition::range_key_start() const {
-  return range_key(partition_key_start());
+  return range_key(partition_key_start_);
 }
 
 Slice Partition::range_key_end() const {
-  return range_key(partition_key_end());
+  return range_key(partition_key_end_);
 }
 
 Slice Partition::range_key(const string& partition_key) const {
-  size_t hash_size = kEncodedBucketSize * hash_buckets().size();
-  if (partition_key.size() > hash_size) {
-    Slice s = Slice(partition_key);
-    s.remove_prefix(hash_size);
-    return s;
-  } else {
+  const size_t hash_size = kEncodedBucketSize * hash_buckets_.size();
+  if (partition_key.size() <= hash_size) {
     return Slice();
   }
+  auto s = Slice(partition_key);
+  s.remove_prefix(hash_size);
+  return s;
 }
 
 bool Partition::operator==(const Partition& rhs) const {
@@ -104,7 +102,7 @@ bool Partition::operator==(const Partition& rhs) const {
 void Partition::ToPB(PartitionPB* pb) const {
   pb->Clear();
   pb->mutable_hash_buckets()->Reserve(hash_buckets_.size());
-  for (int32_t bucket : hash_buckets()) {
+  for (int32_t bucket : hash_buckets_) {
     pb->add_hash_buckets(bucket);
   }
   pb->set_partition_key_start(partition_key_start());
@@ -162,7 +160,7 @@ void SetColumnIdentifiers(const vector<ColumnId>& column_ids,
     identifiers->Add()->set_id(column_id);
   }
 }
-} // namespace
+} // anonymous namespace
 
 
 Status PartitionSchema::ExtractHashBucketSchemasFromPB(
@@ -251,15 +249,21 @@ Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
     }
   }
 
-  auto* ranges_with_schemas_ptr = &partition_schema->ranges_with_hash_schemas_;
+  auto* ranges_ptr = &partition_schema->ranges_with_hash_schemas_;
   if (!range_bounds.empty()) {
     RETURN_NOT_OK(partition_schema->EncodeRangeBounds(
-        range_bounds, range_hash_schema, schema, ranges_with_schemas_ptr));
+        range_bounds, range_hash_schema, schema, ranges_ptr));
+  }
+  if (ranges_ptr != nullptr) {
+    auto& dict = partition_schema->hash_schema_idx_by_encoded_range_start_;
+    for (auto it = ranges_ptr->cbegin(); it != ranges_ptr->cend(); ++it) {
+      InsertOrDie(&dict, it->lower, std::distance(ranges_ptr->cbegin(), it));
+    }
   }
-  if (range_bounds.size() != ranges_with_schemas_ptr->size()) {
+  if (range_bounds.size() != ranges_ptr->size()) {
     return Status::InvalidArgument(Substitute("the number of range bounds "
         "($0) differs from the number ranges with hash schemas ($1)",
-        range_bounds.size(), ranges_with_schemas_ptr->size()));
+        range_bounds.size(), ranges_ptr->size()));
   }
 
   return partition_schema->Validate(schema);
@@ -307,17 +311,17 @@ Status PartitionSchema::ToPB(const Schema& schema, PartitionSchemaPB* pb) const
 
 template<typename Row>
 void PartitionSchema::EncodeKeyImpl(const Row& row, string* buf) const {
-  // TODO(aserbin): update the implementation and remove the DCHECK() below
-  DCHECK(ranges_with_hash_schemas_.empty())
-      << "ranges with custom hash schemas are not yet supported";
-  const KeyEncoder<string>& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
-
-  for (const HashBucketSchema& hash_bucket_schema : hash_bucket_schemas_) {
-    const int32_t bucket = BucketForRow(row, hash_bucket_schema);
+  string range_key;
+  EncodeColumns(row, range_schema_.column_ids, &range_key);
+  const auto& hash_schemas = GetHashBucketSchemasForRange(range_key);
+  const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
+  for (const auto& hash_bucket_schema : hash_schemas) {
+    int32_t bucket = BucketForRow(row, hash_bucket_schema);
     hash_encoder.Encode(&bucket, buf);
   }
-
-  return EncodeColumns(row, range_schema_.column_ids, buf);
+  // The range portion of the key has been already encoded -- append it to the
+  // result buffer.
+  buf->append(range_key);
 }
 
 string PartitionSchema::EncodeKey(const KuduPartialRow& row) const {
@@ -584,7 +588,7 @@ Status PartitionSchema::CreatePartitions(
                                              : GenerateHashPartitions(
                                                    current_range_hash_schemas,
                                                    hash_encoder);
-      // Add range part to partition key.
+      // Add range information to the partition key.
       for (Partition& partition : current_bound_hash_partitions) {
         partition.partition_key_start_.append(bound.lower);
         partition.partition_key_end_.append(bound.upper);
@@ -662,9 +666,11 @@ Status PartitionSchema::CreatePartitions(
 template<typename Row>
 bool PartitionSchema::PartitionContainsRowImpl(const Partition& partition,
                                                const Row& row) const {
-  CHECK_EQ(partition.hash_buckets().size(), hash_bucket_schemas_.size());
-  for (int i = 0; i < hash_bucket_schemas_.size(); i++) {
-    if (!HashPartitionContainsRowImpl(partition, row, i)) {
+  string range_key;
+  EncodeColumns(row, range_schema_.column_ids, &range_key);
+  const auto& hash_schemas = GetHashBucketSchemasForRange(range_key);
+  for (size_t i = 0; i < hash_schemas.size(); ++i) {
+    if (!HashPartitionContainsRowImpl(partition, row, hash_schemas, i)) {
       return false;
     }
   }
@@ -673,11 +679,15 @@ bool PartitionSchema::PartitionContainsRowImpl(const Partition& partition,
 }
 
 template<typename Row>
-bool PartitionSchema::HashPartitionContainsRowImpl(const Partition& partition,
-                                                   const Row& row,
-                                                   int hash_idx) const {
-  DCHECK_EQ(partition.hash_buckets().size(), hash_bucket_schemas_.size());
-  const HashBucketSchema& hash_bucket_schema = hash_bucket_schemas_[hash_idx];
+bool PartitionSchema::HashPartitionContainsRowImpl(
+    const Partition& partition,
+    const Row& row,
+    const HashBucketSchemas& hash_bucket_schemas,
+    int hash_idx) const {
+  DCHECK_GE(hash_idx, 0);
+  DCHECK_LT(hash_idx, hash_bucket_schemas.size());
+  DCHECK_EQ(partition.hash_buckets().size(), hash_bucket_schemas.size());
+  const HashBucketSchema& hash_bucket_schema = hash_bucket_schemas[hash_idx];
   const int32_t bucket = BucketForRow(row, hash_bucket_schema);
   return partition.hash_buckets()[hash_idx] == bucket;
 }
@@ -690,9 +700,9 @@ bool PartitionSchema::RangePartitionContainsRowImpl(
   string range_partition_key;
   EncodeColumns(row, range_schema_.column_ids, &range_partition_key);
 
-  // If all of the hash buckets match, then the row is contained in the
-  // partition if the row is gte the lower bound; and if there is no upper
-  // bound, or the row is lt the upper bound.
+  // When all hash buckets match, then the row is contained in the partition
+  // if the row's key is greater or equal to the lower bound, and if there is
+  // either no upper bound or the row's key is less than the upper bound.
   const auto key = Slice(range_partition_key);
   return
       (partition.range_key_start() <= key) &&
@@ -742,11 +752,15 @@ bool PartitionSchema::PartitionMayContainRow(const Partition& partition,
       !RangePartitionContainsRow(partition, row)) {
     return false;
   }
-  for (size_t i = 0; i < hash_bucket_schemas_.size(); ++i) {
-    const auto& hash_partition = hash_bucket_schemas_[i];
+
+  string range_key;
+  EncodeColumns(row, range_schema_.column_ids, &range_key);
+  const auto& hash_schemas = GetHashBucketSchemasForRange(range_key);
+  for (size_t i = 0; i < hash_schemas.size(); ++i) {
+    const auto& hash_partition = hash_schemas[i];
     if (hash_partition.column_ids.size() == 1 &&
         hash_partition.column_ids[0] == single_column_id &&
-        !HashPartitionContainsRow(partition, row, i)) {
+        !HashPartitionContainsRowImpl(partition, row, hash_schemas, i)) {
       return false;
     }
   }
@@ -755,12 +769,6 @@ bool PartitionSchema::PartitionMayContainRow(const Partition& partition,
   return true;
 }
 
-bool PartitionSchema::HashPartitionContainsRow(const Partition& partition,
-                                               const KuduPartialRow& row,
-                                               int hash_idx) const {
-  return HashPartitionContainsRowImpl(partition, row, hash_idx);
-}
-
 bool PartitionSchema::RangePartitionContainsRow(
     const Partition& partition, const KuduPartialRow& row) const {
   return RangePartitionContainsRowImpl(partition, row);
@@ -930,18 +938,41 @@ string PartitionSchema::PartitionKeyDebugString(const KuduPartialRow& row) const
 }
 
 string PartitionSchema::PartitionKeyDebugString(Slice key, const Schema& schema) const {
-  // TODO(aserbin): update the implementation and remove the DCHECK() below
-  DCHECK(ranges_with_hash_schemas_.empty())
-      << "ranges with custom hash schemas are not yet supported";
+  // Since it's not known what's the right hash bucket schema for the key
+  // (there might be custom hash bucket schemas per range), let's first find
+  // what's the size of the range key and remove it as the trailing part
+  // of the key. The rest is the hash partition key.
+  //
+  // TODO(aserbin): is there a better way to do this?
+  const bool has_ranges = !range_schema_.column_ids.empty();
+  string range_key_size_meter;
+  if (has_ranges) {
+    KuduPartialRow row(&schema);
+    EncodeColumns(row, range_schema_.column_ids, &range_key_size_meter);
+  }
 
-  vector<string> components;
+  if (has_ranges) {
+    if (key.size() < range_key_size_meter.size()) {
+      return "<range-decode-error>";
+    }
+  } else {
+    if (key.size() <  kEncodedBucketSize * hash_bucket_schemas_.size()) {
+      return "<hash-decode-error>";
+    }
+  }
 
-  size_t hash_components_size = kEncodedBucketSize * hash_bucket_schemas_.size();
-  if (key.size() < hash_components_size) {
-    return "<hash-decode-error>";
+  Slice range_key(key);
+  if (has_ranges) {
+    DCHECK_GE(key.size(), range_key_size_meter.size());
+    range_key.remove_prefix(key.size() - range_key_size_meter.size());
   }
 
-  for (const auto& hash_schema : hash_bucket_schemas_) {
+  const auto& hash_schemas = has_ranges
+      ? GetHashBucketSchemasForRange(range_key.ToString())
+      : hash_bucket_schemas_;
+  vector<string> components;
+  components.reserve(hash_schemas.size() + 1);
+  for (const auto& hash_schema : hash_schemas) {
     uint32_t big_endian;
     memcpy(&big_endian, key.data(), sizeof(uint32_t));
     key.remove_prefix(sizeof(uint32_t));
@@ -951,7 +982,7 @@ string PartitionSchema::PartitionKeyDebugString(Slice key, const Schema& schema)
                     BigEndian::ToHost32(big_endian)));
   }
 
-  if (!range_schema_.column_ids.empty()) {
+  if (has_ranges) {
       components.emplace_back(
           Substitute("RANGE ($0): $1",
                      ColumnIdsToColumnNames(schema, range_schema_.column_ids),
@@ -1297,10 +1328,10 @@ int32_t PartitionSchema::BucketForRow(const ConstContiguousRow& row,
                                       const HashBucketSchema& hash_bucket_schema);
 
 void PartitionSchema::Clear() {
+  hash_schema_idx_by_encoded_range_start_.clear();
+  ranges_with_hash_schemas_.clear();
   hash_bucket_schemas_.clear();
   range_schema_.column_ids.clear();
-  ranges_with_hash_schemas_.clear();
-
 }
 
 Status PartitionSchema::Validate(const Schema& schema) const {
@@ -1512,6 +1543,27 @@ Status PartitionSchema::IncrementRangePartitionKey(KuduPartialRow* row, bool* in
   return Status::OK();
 }
 
+const PartitionSchema::HashBucketSchemas& PartitionSchema::GetHashBucketSchemasForRange(
+    const string& range_key) const {
+  // Find proper hash bucket schema corresponding to the specified range key.
+  const auto* entry = FindFloorOrNull(
+      hash_schema_idx_by_encoded_range_start_, range_key);
+  bool has_custom_range = (entry != nullptr);
+  // Check for the case of a non-covered range between two covered ranges.
+  // TODO(aserbin): maybe, it's better to build ranges_with_hash_schemas_ not
+  //                having any range gaps?
+  if (has_custom_range) {
+    DCHECK_LT(*entry, ranges_with_hash_schemas_.size());
+    const auto& upper = ranges_with_hash_schemas_[*entry].upper;
+    // TODO(aserbin): is the upper bound always exclusive?
+    if (!upper.empty() && upper <= range_key) {
+      has_custom_range = false;
+    }
+  }
+  return has_custom_range ? ranges_with_hash_schemas_[*entry].hash_schemas
+                          : hash_bucket_schemas_;
+}
+
 Status PartitionSchema::MakeLowerBoundRangePartitionKeyInclusive(KuduPartialRow* row) const {
   // To transform a lower bound range partition key from exclusive to inclusive,
   // the key must be incremented. To increment the key, start with the least
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index ab03729..6d77328 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -16,7 +16,9 @@
 // under the License.
 #pragma once
 
+#include <cstddef>
 #include <cstdint>
+#include <map>
 #include <string>
 #include <utility>
 #include <vector>
@@ -56,7 +58,6 @@ template <typename Buffer> class KeyEncoder;
 // start and end primary keys, and predicates.
 class Partition {
  public:
-
   const std::vector<int32_t>& hash_buckets() const {
     return hash_buckets_;
   }
@@ -108,7 +109,8 @@ class Partition {
 // determine the tablet containing the key.
 //
 // The partition schema is made up of zero or more hash bucket components,
-// followed by a single range component.
+// followed by a single range component. In addition, the partition schema can
+// contain multiple ranges and their per-range custom bucket schemas.
 //
 // Each hash bucket component includes one or more columns from the primary key
 // column set, with the restriction that an individual primary key column may
@@ -409,9 +411,11 @@ class PartitionSchema {
 
   // Private templated helper for HashPartitionContainsRow.
   template<typename Row>
-  bool HashPartitionContainsRowImpl(const Partition& partition,
-                                    const Row& row,
-                                    int hash_idx) const;
+  bool HashPartitionContainsRowImpl(
+      const Partition& partition,
+      const Row& row,
+      const HashBucketSchemas& hash_bucket_schemas,
+      int hash_idx) const;
 
   // Private templated helper for RangePartitionContainsRow.
   template<typename Row>
@@ -491,9 +495,23 @@ class PartitionSchema {
   // maximum value. Unset columns will be incremented to increment(min_value).
   Status IncrementRangePartitionKey(KuduPartialRow* row, bool* increment) const;
 
+  // Find hash bucket schemas for the given encoded range key. Depending
+  // on the partitioning schema and the key, it might be either table-wide
+  // or a custom hash bucket schema for a particular range.
+  const HashBucketSchemas& GetHashBucketSchemasForRange(
+      const std::string& range_key) const;
+
   HashBucketSchemas hash_bucket_schemas_;
   RangeSchema range_schema_;
   RangesWithHashSchemas ranges_with_hash_schemas_;
+
+  // Encoded start of the range --> index of the hash bucket schemas for the
+  // range in the 'ranges_with_hash_schemas_' array container.
+  // NOTE: the contents of this map and 'ranges_with_hash_schemas_' are tightly
+  //       coupled -- it's necessary to clear/set this map along with
+  //       'ranges_with_hash_schemas_'.
+  typedef std::map<std::string, size_t> HashSchemasByEncodedLowerRange;
+  HashSchemasByEncodedLowerRange hash_schema_idx_by_encoded_range_start_;
 };
 
 } // namespace kudu
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 8c18e55..92690ad 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -6308,9 +6308,8 @@ void TableInfo::AddRemoveTablets(const vector<scoped_refptr<TabletInfo>>& tablet
 void TableInfo::GetTabletsInRange(const GetTableLocationsRequestPB* req,
                                   vector<scoped_refptr<TabletInfo>>* ret) const {
   shared_lock<rw_spinlock> l(lock_);
-  int max_returned_locations = req->max_returned_locations();
 
-  RawTabletInfoMap::const_iterator it, it_end;
+  RawTabletInfoMap::const_iterator it;
   if (req->has_partition_key_start()) {
     it = tablet_map_.upper_bound(req->partition_key_start());
     if (it != tablet_map_.begin()) {
@@ -6320,16 +6319,15 @@ void TableInfo::GetTabletsInRange(const GetTableLocationsRequestPB* req,
     it = tablet_map_.begin();
   }
 
-  if (req->has_partition_key_end()) {
-    it_end = tablet_map_.upper_bound(req->partition_key_end());
-  } else {
-    it_end = tablet_map_.end();
-  }
+  const RawTabletInfoMap::const_iterator it_end = req->has_partition_key_end()
+      ? tablet_map_.upper_bound(req->partition_key_end())
+      : tablet_map_.end();
 
-  int count = 0;
+  const size_t max_returned_locations = req->max_returned_locations();
+  size_t count = 0;
   for (; it != it_end && count < max_returned_locations; ++it) {
     ret->emplace_back(make_scoped_refptr(it->second));
-    count++;
+    ++count;
   }
 }
 
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 50106bc..903698b 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -602,13 +602,12 @@ Status Tablet::AcquireTxnLock(int64_t txn_id, WriteOpState* op_state) {
 }
 
 Status Tablet::CheckRowInTablet(const ConstContiguousRow& row) const {
-  if (PREDICT_FALSE(!metadata_->partition_schema().PartitionContainsRow(
-      metadata_->partition(), row))) {
+  const auto& ps = metadata_->partition_schema();
+  if (PREDICT_FALSE(!ps.PartitionContainsRow(metadata_->partition(), row))) {
     return Status::NotFound(
-        Substitute("Row not in tablet partition. Partition: '$0', row: '$1'.",
-                   metadata_->partition_schema().PartitionDebugString(metadata_->partition(),
-                                                                      *schema()),
-                   metadata_->partition_schema().PartitionKeyDebugString(row)));
+        Substitute("Row not in tablet partition. Partition: '$0' row: '$1'",
+                   ps.PartitionDebugString(metadata_->partition(), *schema()),
+                   ps.PartitionKeyDebugString(row)));
   }
   return Status::OK();
 }