You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/11/12 06:59:00 UTC

[kudu] branch master updated: KUDU-1644 hash-partition based in-list predicate optimization

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a7cadc  KUDU-1644 hash-partition based in-list predicate optimization
6a7cadc is described below

commit 6a7cadc7eddeaaa374971d5ba16fec8422e33db9
Author: ningw <19...@gmail.com>
AuthorDate: Wed Oct 28 14:47:27 2020 +0800

    KUDU-1644 hash-partition based in-list predicate optimization
    
    Hash prune for single hash-key based inList query. Reduce the values to predicate
    by hash-partition match.
    This patch reduces the IN List predicated values to be pushed to tablet
    without change the content to be returned.
    
    Table has P partitions, N records. Inlist predicate has V values.
    
    Before:
    To each tablet, time complexity to complete hash-key based in-list query is:
    LOG(V) * N
    
    After:
    Complexity becomes:
    LOG(V/P) * N
    
    E.g.
    Hash partition of table 'profile':
    hash(id) by id partitions 3, simply use mod as hash function.
    select * from profile where id in (1,2,3,4,5,6,7,8,9,10)
    
    Before:
    Tablet 1: id in (1,2,3,4,5,6,7,8,9,10)
    Tablet 2: id in (1,2,3,4,5,6,7,8,9,10)
    Tablet 3: id in (1,2,3,4,5,6,7,8,9,10)
    
    After:
    Tablet 1: id in (0,3,6,9)
    Tablet 2: id in (1,4,7,10)
    Tablet 3: id in (2,5,8)
    
    Change-Id: I202001535669a72de7fbb9e766dbc27db48e0aa2
    Reviewed-on: http://gerrit.cloudera.org:8080/16674
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../org/apache/kudu/client/TestAlterTable.java     |   2 +-
 src/kudu/common/column_predicate.h                 |   7 +
 src/kudu/common/partial_row.cc                     |   2 +-
 src/kudu/common/partial_row.h                      |   1 +
 src/kudu/common/partition.cc                       |  64 +++-
 src/kudu/common/partition.h                        |  25 +-
 src/kudu/common/partition_pruner-test.cc           |   2 +-
 src/kudu/common/scan_spec-test.cc                  | 417 ++++++++++++++++++++-
 src/kudu/common/scan_spec.cc                       |  39 +-
 src/kudu/common/scan_spec.h                        |  16 +
 src/kudu/tserver/tablet_service.cc                 |   3 +
 11 files changed, 552 insertions(+), 26 deletions(-)

diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
index 5d88178..4df496f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
@@ -382,7 +382,7 @@ public class TestAlterTable {
   }
 
   @Test
-  public void testAlterRangeParitioningInvalid() throws KuduException {
+  public void testAlterRangePartitioningInvalid() throws KuduException {
     // Create initial table with single range partition covering [0, 100).
     KuduTable table = createTable(ImmutableList.of(new Pair<>(0, 100)));
     Schema schema = table.getSchema();
diff --git a/src/kudu/common/column_predicate.h b/src/kudu/common/column_predicate.h
index 6120095..974fb99 100644
--- a/src/kudu/common/column_predicate.h
+++ b/src/kudu/common/column_predicate.h
@@ -259,6 +259,13 @@ class ColumnPredicate {
   const std::vector<const void*>& raw_values() const {
     return values_;
   }
+
+  // Returns the pointer to list of values if this is an in-list predicate.
+  // Use for pruning in-list predicate values in case of hash-key based in-list prediate.
+  std::vector<const void*>* mutable_raw_values() {
+    return &values_;
+  }
+
   // Returns bloom filters if this is a bloom filter predicate.
   const std::vector<BlockBloomFilter*>& bloom_filters() const {
     return bloom_filters_;
diff --git a/src/kudu/common/partial_row.cc b/src/kudu/common/partial_row.cc
index 8b3096a..6951e6a 100644
--- a/src/kudu/common/partial_row.cc
+++ b/src/kudu/common/partial_row.cc
@@ -103,7 +103,7 @@ KuduPartialRow::KuduPartialRow(const KuduPartialRow& other)
     if (BitmapTest(owned_strings_bitmap_, col_idx)) {
       ContiguousRow row(schema_, row_data_);
       Slice* slice = reinterpret_cast<Slice*>(row.mutable_cell_ptr(col_idx));
-      auto data = new uint8_t[slice->size()];
+      auto* data = new uint8_t[slice->size()];
       slice->relocate(data);
     }
   }
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index fb29ef0..9983137 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -646,6 +646,7 @@ class KUDU_EXPORT KuduPartialRow {
   friend class PartitionSchema;
   friend class RowOperationsPBDecoder;
   friend class RowOperationsPBEncoder;
+  friend class ScanSpec; // for Set(int32_t column_idx, const uint8_t* val)
   friend class tools::TableScanner;
   friend class TestScanSpec;
   template<typename KeyTypeWrapper> friend struct client::SliceKeysTestSetup;
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index 756f180..dd34d30 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -26,7 +26,6 @@
 #include <vector>
 
 #include <glog/logging.h>
-#include <google/protobuf/stubs/port.h>
 
 #include "kudu/common/common.pb.h"
 #include "kudu/common/key_encoder.h"
@@ -458,12 +457,8 @@ Status PartitionSchema::PartitionContainsRowImpl(const Partition& partition,
                                                  bool* contains) const {
   CHECK_EQ(partition.hash_buckets().size(), hash_bucket_schemas_.size());
   for (int i = 0; i < hash_bucket_schemas_.size(); i++) {
-    const HashBucketSchema& hash_bucket_schema = hash_bucket_schemas_[i];
-    int32_t bucket;
-    RETURN_NOT_OK(BucketForRow(row, hash_bucket_schema, &bucket));
-
-    if (bucket != partition.hash_buckets()[i]) {
-      *contains = false;
+    RETURN_NOT_OK(HashPartitionContainsRowImpl(partition, row, i, contains));
+    if (!*contains) {
       return Status::OK();
     }
   }
@@ -481,6 +476,19 @@ Status PartitionSchema::PartitionContainsRowImpl(const Partition& partition,
   return Status::OK();
 }
 
+template<typename Row>
+Status PartitionSchema::HashPartitionContainsRowImpl(const Partition& partition,
+                                                     const Row& row,
+                                                     int hash_idx,
+                                                     bool* contains) const {
+  DCHECK_EQ(partition.hash_buckets().size(), hash_bucket_schemas_.size());
+  const HashBucketSchema& hash_bucket_schema = hash_bucket_schemas_[hash_idx];
+  int32_t bucket = -1;
+  RETURN_NOT_OK(BucketForRow(row, hash_bucket_schema, &bucket));
+  *contains = (partition.hash_buckets()[hash_idx] == bucket);
+  return Status::OK();
+}
+
 Status PartitionSchema::PartitionContainsRow(const Partition& partition,
                                              const KuduPartialRow& row,
                                              bool* contains) const {
@@ -493,11 +501,24 @@ Status PartitionSchema::PartitionContainsRow(const Partition& partition,
   return PartitionContainsRowImpl(partition, row, contains);
 }
 
+Status PartitionSchema::HashPartitionContainsRow(const Partition& partition,
+                                                 const KuduPartialRow& row,
+                                                 int hash_idx,
+                                                 bool* contains) const {
+  return HashPartitionContainsRowImpl(partition, row, hash_idx, contains);
+}
+
+Status PartitionSchema::HashPartitionContainsRow(const Partition& partition,
+                                                 const ConstContiguousRow& row,
+                                                 int hash_idx,
+                                                 bool* contains) const {
+  return HashPartitionContainsRowImpl(partition, row, hash_idx, contains);
+}
 
 Status PartitionSchema::DecodeRangeKey(Slice* encoded_key,
-                                       KuduPartialRow* row,
+                                       KuduPartialRow* partial_row,
                                        Arena* arena) const {
-  ContiguousRow cont_row(row->schema(), row->row_data_);
+  ContiguousRow cont_row(partial_row->schema(), partial_row->row_data_);
   for (int i = 0; i < range_schema_.column_ids.size(); i++) {
     if (encoded_key->empty()) {
       // This can happen when decoding partition start and end keys, since they
@@ -505,8 +526,8 @@ Status PartitionSchema::DecodeRangeKey(Slice* encoded_key,
       break;
     }
 
-    int32_t column_idx = row->schema()->find_column_by_id(range_schema_.column_ids[i]);
-    const ColumnSchema& column = row->schema()->column(column_idx);
+    int32_t column_idx = partial_row->schema()->find_column_by_id(range_schema_.column_ids[i]);
+    const ColumnSchema& column = partial_row->schema()->column(column_idx);
     const KeyEncoder<faststring>& key_encoder = GetKeyEncoder<faststring>(column.type_info());
     bool is_last = i == (range_schema_.column_ids.size() - 1);
 
@@ -518,7 +539,7 @@ Status PartitionSchema::DecodeRangeKey(Slice* encoded_key,
                           Substitute("Error decoding partition key range component '$0'",
                                      column.name()));
     // Mark the column as set.
-    BitmapSet(row->isset_bitmap_, column_idx);
+    BitmapSet(partial_row->isset_bitmap_, column_idx);
   }
   if (!encoded_key->empty()) {
     return Status::InvalidArgument("unable to fully decode range key",
@@ -585,6 +606,7 @@ namespace {
 string ColumnIdsToColumnNames(const Schema& schema,
                               const vector<ColumnId>& column_ids) {
   vector<string> names;
+  names.reserve(column_ids.size());
   for (const ColumnId& column_id : column_ids) {
     names.push_back(schema.column(schema.find_column_by_id(column_id)).name());
   }
@@ -913,10 +935,10 @@ Status PartitionSchema::EncodeColumns(const KuduPartialRow& row,
   return Status::OK();
 }
 
-int32_t PartitionSchema::BucketForEncodedColumns(const string& encoded_key,
+int32_t PartitionSchema::BucketForEncodedColumns(const string& encoded_hash_columns,
                                                  const HashBucketSchema& hash_bucket_schema) {
-  uint64_t hash = HashUtil::MurmurHash2_64(encoded_key.data(),
-                                           encoded_key.length(),
+  uint64_t hash = HashUtil::MurmurHash2_64(encoded_hash_columns.data(),
+                                           encoded_hash_columns.length(),
                                            hash_bucket_schema.seed);
   return hash % static_cast<uint64_t>(hash_bucket_schema.num_buckets);
 }
@@ -1273,4 +1295,16 @@ Status PartitionSchema::GetRangeSchemaColumnIndexes(const Schema& schema,
   return Status::OK();
 }
 
+int32_t PartitionSchema::TryGetSingleColumnHashPartitionIndex(const Schema& schema,
+                                                              int32_t col_idx) const {
+  const ColumnId column_id = schema.column_id(col_idx);
+  for (int i = 0; i < hash_partition_schemas().size(); ++i) {
+    auto hash_partition = hash_partition_schemas()[i];
+    if (hash_partition.column_ids.size() == 1 && hash_partition.column_ids[0] == column_id) {
+      return i;
+    }
+  }
+  return -1;
+}
+
 } // namespace kudu
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index 761e8e2..bf57d0f 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -182,6 +182,16 @@ class PartitionSchema {
                               const ConstContiguousRow& row,
                               bool* contains) const WARN_UNUSED_RESULT;
 
+  // Tests if the hash partition contians the row with given hash_idx.
+  Status HashPartitionContainsRow(const Partition& partition,
+                                  const KuduPartialRow& row,
+                                  int hash_idx,
+                                  bool* contains) const WARN_UNUSED_RESULT;
+  Status HashPartitionContainsRow(const Partition& partition,
+                                  const ConstContiguousRow& row,
+                                  int hash_idx,
+                                  bool* contains) const WARN_UNUSED_RESULT;
+
   // Returns a text description of the partition suitable for debug printing.
   //
   // Partitions are considered metadata, so no redaction will happen on the hash
@@ -245,7 +255,7 @@ class PartitionSchema {
 
   // Decodes a range partition key into a partial row, with variable-length
   // fields stored in the arena.
-  Status DecodeRangeKey(Slice* encode_key,
+  Status DecodeRangeKey(Slice* encoded_key,
                         KuduPartialRow* partial_row,
                         Arena* arena) const;
 
@@ -263,6 +273,10 @@ class PartitionSchema {
   Status GetRangeSchemaColumnIndexes(const Schema& schema,
                                      std::vector<int>* range_column_idxs) const;
 
+  // Returns index of given column idx, if it is one of hash key and this hash schema
+  // contains only one column, otherwise returns -1.
+  int32_t TryGetSingleColumnHashPartitionIndex(const Schema& schema, int32_t col_idx) const;
+
  private:
   friend class PartitionPruner;
   FRIEND_TEST(PartitionTest, TestIncrementRangePartitionBounds);
@@ -307,6 +321,13 @@ class PartitionSchema {
                                   const Row& row,
                                   bool* contains) const;
 
+  // Private templated helper for HashPartitionContainsRow.
+  template<typename Row>
+  Status HashPartitionContainsRowImpl(const Partition& partition,
+                                      const Row& row,
+                                      int hash_idx,
+                                      bool* contains) const;
+
   // Private templated helper for EncodeKey.
   template<typename Row>
   Status EncodeKeyImpl(const Row& row, std::string* buf) const;
@@ -342,7 +363,7 @@ class PartitionSchema {
   //
   // This should only be called with partition keys created from a row, not with
   // partition keys from a partition.
-  Status DecodeHashBuckets(Slice* partition_key, std::vector<int32_t>* buckets) const;
+  Status DecodeHashBuckets(Slice* encoded_key, std::vector<int32_t>* buckets) const;
 
   // Clears the state of this partition schema.
   void Clear();
diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc
index 9101e85..e541951 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -339,7 +339,7 @@ TEST_F(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning) {
 
   PartitionSchema partition_schema;
   auto pb = PartitionSchemaPB();
-  auto range_schema = pb.mutable_range_schema();
+  auto* range_schema = pb.mutable_range_schema();
   range_schema->add_columns()->set_name("a");
   range_schema->add_columns()->set_name("b");
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
diff --git a/src/kudu/common/scan_spec-test.cc b/src/kudu/common/scan_spec-test.cc
index 2aca630..9357977 100644
--- a/src/kudu/common/scan_spec-test.cc
+++ b/src/kudu/common/scan_spec-test.cc
@@ -19,9 +19,9 @@
 
 #include <cstdint>
 #include <cstring>
-#include <memory>
 #include <string>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -33,6 +33,7 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/encoded_key.h"
 #include "kudu/common/partial_row.h"
+#include "kudu/common/partition.h"
 #include "kudu/common/row.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/map-util.h"
@@ -43,11 +44,34 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-using std::unique_ptr;
 using std::vector;
+using std::pair;
 
 namespace kudu {
 
+namespace {
+// Generate partition schema of a table with given hash_partitions.
+// E.g. GeneratePartitionSchema(schema, {make_pair({a, b}, 3), make_pair({c}, 5) })
+// Returns 'partition by hash(a, b) partitions 3, hash(c) partitions 5'.
+void GeneratePartitionSchema(const Schema& schema,
+                             const vector<std::pair<vector<std::string>, int>>& hash_partitions,
+                             PartitionSchema* partition_schema) {
+  PartitionSchemaPB partition_schema_pb;
+  for (const auto& col_names_and_num_buckets : hash_partitions) {
+    auto* hash_pb = partition_schema_pb.add_hash_bucket_schemas();
+    hash_pb->set_num_buckets(col_names_and_num_buckets.second);
+    hash_pb->set_seed(0);
+    for (const auto& col_name : col_names_and_num_buckets.first) {
+      auto* column_pb = hash_pb->add_columns();
+      int col_idx = schema.find_column(col_name);
+      column_pb->set_id(col_idx);
+      column_pb->set_name(col_name);
+    }
+  }
+  CHECK_OK(PartitionSchema::FromPB(partition_schema_pb, schema, partition_schema));
+}
+} // anonymous namespace
+
 static std::string ToString(const vector<ColumnSchema>& columns) {
   std::string str;
   for (const auto& column : columns) {
@@ -59,9 +83,9 @@ static std::string ToString(const vector<ColumnSchema>& columns) {
 
 class TestScanSpec : public KuduTest {
  public:
-  explicit TestScanSpec(const Schema& s)
+  explicit TestScanSpec(Schema s)
     : arena_(1024),
-      schema_(s) {
+      schema_(std::move(s)) {
   }
 
   enum ComparisonOp {
@@ -375,6 +399,391 @@ TEST_F(CompositeIntKeysTest, TestInListPushdown) {
             spec.ToString(schema_));
 }
 
+// Test that hash(a) IN list predicates prune with right values.
+TEST_F(CompositeIntKeysTest, TestOneHashKeyInListHashPruning) {
+  ScanSpec spec;
+  AddInPredicate<int8_t>(&spec, "a", { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 });
+  AddInPredicate<int8_t>(&spec, "b", { 50, 100 });
+
+  Schema schema = schema_.CopyWithColumnIds();
+
+  PartitionSchema partition_schema;
+  GeneratePartitionSchema(schema,
+                          { pair<vector<std::string>, int>({ "a" }, 3) },
+                          &partition_schema);
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+  ASSERT_EQ(3, partitions.size());
+
+  // clone scan_spec for different partition.
+  ScanSpec spec_p1 = spec;
+  ScanSpec spec_p2 = spec;
+  ScanSpec spec_p3 = spec;
+
+  spec_p1.PruneHashForInlistIfPossible(schema, partitions[0], partition_schema);
+  spec_p1.OptimizeScan(schema, &arena_, true);
+
+  // Verify the splitted values can merge into originl set without overlapping.
+  SCOPED_TRACE(spec_p1.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=4, int8 b=50, int8 c=-128) AND "
+            "PK < (int8 a=8, int8 b=101, int8 c=-128) AND "
+            "a IN (4, 7, 8) AND b IN (50, 100)",
+            spec_p1.ToString(schema));
+
+  spec_p2.PruneHashForInlistIfPossible(schema, partitions[1], partition_schema);
+  spec_p2.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p2.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=-128) AND "
+            "PK < (int8 a=9, int8 b=101, int8 c=-128) AND "
+            "a IN (0, 2, 5, 9) AND b IN (50, 100)",
+            spec_p2.ToString(schema));
+
+  spec_p3.PruneHashForInlistIfPossible(schema, partitions[2], partition_schema);
+  spec_p3.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p3.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=1, int8 b=50, int8 c=-128) AND "
+            "PK < (int8 a=6, int8 b=101, int8 c=-128) AND "
+            "a IN (1, 3, 6) AND b IN (50, 100)",
+            spec_p3.ToString(schema));
+}
+
+// Test that in case hash(a) prune all predicate values, the rest predicate values for
+// pruned in-list predicate should be detect correctly by CanShortCircuit().
+// BTW, empty IN list predicates wouldn't result in the crash of OptimizeScan().
+TEST_F(CompositeIntKeysTest, TestHashKeyInListHashPruningEmptyDetect) {
+  ScanSpec spec;
+  AddInPredicate<int8_t>(&spec, "a", { 0, 2, 4, 5, 7, 8, 9 });
+  AddInPredicate<int8_t>(&spec, "b", { 50, 100 });
+
+  Schema schema = schema_.CopyWithColumnIds();
+
+  PartitionSchema partition_schema;
+  GeneratePartitionSchema(schema,
+                          { pair<vector<std::string>, int>({ "a" }, 3) },
+                          &partition_schema);
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+  ASSERT_EQ(3, partitions.size());
+
+  // clone scan_spec for different partition.
+  ScanSpec spec_p1 = spec;
+  ScanSpec spec_p2 = spec;
+  ScanSpec spec_p3 = spec;
+
+  spec_p1.PruneHashForInlistIfPossible(schema, partitions[0], partition_schema);
+  // Guarantee OptimizeScan can be call without fatal.
+  NO_FATALS(spec_p1.OptimizeScan(schema, &arena_, true));
+
+  // Verify the splitted values can merge into originl set without overlapping.
+  SCOPED_TRACE(spec_p1.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=4, int8 b=50, int8 c=-128) AND "
+            "PK < (int8 a=8, int8 b=101, int8 c=-128) AND "
+            "a IN (4, 7, 8) AND b IN (50, 100)",
+            spec_p1.ToString(schema));
+  ASSERT_FALSE(spec_p1.CanShortCircuit());
+
+  spec_p2.PruneHashForInlistIfPossible(schema, partitions[1], partition_schema);
+  // Guarantee OptimizeScan can be call without fatal.
+  NO_FATALS(spec_p2.OptimizeScan(schema, &arena_, true));
+
+  SCOPED_TRACE(spec_p2.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=-128) AND "
+            "PK < (int8 a=9, int8 b=101, int8 c=-128) AND "
+            "a IN (0, 2, 5, 9) AND b IN (50, 100)",
+            spec_p2.ToString(schema));
+  ASSERT_FALSE(spec_p2.CanShortCircuit());
+
+  // There should be no predicate values after prune.
+  spec_p3.PruneHashForInlistIfPossible(schema, partitions[2], partition_schema);
+  // Guarantee OptimizeScan can be call without fatal.
+  NO_FATALS(spec_p3.OptimizeScan(schema, &arena_, true));
+  ASSERT_TRUE(spec_p3.CanShortCircuit());
+}
+
+// Test that hash(a), hash(b) IN list predicates should be pruned.
+TEST_F(CompositeIntKeysTest, TestMultiHashKeyOneColumnInListHashPruning) {
+  ScanSpec spec;
+  AddInPredicate<int8_t>(&spec, "a", { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 });
+  AddInPredicate<int8_t>(&spec, "b", { 10, 20, 30, 40, 50, 60, 70, 80 });
+
+  Schema schema = schema_.CopyWithColumnIds();
+
+  PartitionSchema partition_schema;
+  GeneratePartitionSchema(schema,
+                          { pair<vector<std::string>, int>({ "a" }, 3),
+                            pair<vector<std::string>, int>({ "b" }, 3) },
+                          &partition_schema);
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+  ASSERT_EQ(9, partitions.size());
+
+  // clone scan_spec for different partition.
+  ScanSpec spec_p1 = spec;
+  ScanSpec spec_p2 = spec;
+  ScanSpec spec_p3 = spec;
+  ScanSpec spec_p4 = spec;
+  ScanSpec spec_p5 = spec;
+  ScanSpec spec_p6 = spec;
+  ScanSpec spec_p7 = spec;
+  ScanSpec spec_p8 = spec;
+  ScanSpec spec_p9 = spec;
+
+  // p1, p2, p3 should have the same predicate values to be pushed on hash(a).
+  // p1, p4, p7 should have the same predicate values to be pushed on hash(b).
+  spec_p1.PruneHashForInlistIfPossible(schema, partitions[0], partition_schema);
+  spec_p1.OptimizeScan(schema, &arena_, true);
+  SCOPED_TRACE(spec_p1.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=4, int8 b=40, int8 c=-128) AND "
+            "PK < (int8 a=8, int8 b=71, int8 c=-128) AND "
+            "a IN (4, 7, 8) AND b IN (40, 60, 70)",
+            spec_p1.ToString(schema));
+
+  spec_p2.PruneHashForInlistIfPossible(schema, partitions[1], partition_schema);
+  spec_p2.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p2.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=4, int8 b=20, int8 c=-128) AND "
+            "PK < (int8 a=8, int8 b=51, int8 c=-128) AND "
+            "a IN (4, 7, 8) AND b IN (20, 30, 50)",
+            spec_p2.ToString(schema));
+
+  spec_p3.PruneHashForInlistIfPossible(schema, partitions[2], partition_schema);
+  spec_p3.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p3.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=4, int8 b=10, int8 c=-128) AND "
+            "PK < (int8 a=8, int8 b=81, int8 c=-128) AND "
+            "a IN (4, 7, 8) AND b IN (10, 80)",
+            spec_p3.ToString(schema));
+
+  spec_p4.PruneHashForInlistIfPossible(schema, partitions[3], partition_schema);
+  spec_p4.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p4.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=40, int8 c=-128) AND "
+            "PK < (int8 a=9, int8 b=71, int8 c=-128) AND "
+            "a IN (0, 2, 5, 9) AND b IN (40, 60, 70)",
+            spec_p4.ToString(schema));
+
+  spec_p5.PruneHashForInlistIfPossible(schema, partitions[4], partition_schema);
+  spec_p5.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p5.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=20, int8 c=-128) AND "
+            "PK < (int8 a=9, int8 b=51, int8 c=-128) AND "
+            "a IN (0, 2, 5, 9) AND b IN (20, 30, 50)",
+            spec_p5.ToString(schema));
+
+  spec_p6.PruneHashForInlistIfPossible(schema, partitions[5], partition_schema);
+  spec_p6.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p6.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=10, int8 c=-128) AND "
+            "PK < (int8 a=9, int8 b=81, int8 c=-128) AND "
+            "a IN (0, 2, 5, 9) AND b IN (10, 80)",
+            spec_p6.ToString(schema));
+
+  spec_p7.PruneHashForInlistIfPossible(schema, partitions[6], partition_schema);
+  spec_p7.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p7.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=1, int8 b=40, int8 c=-128) AND "
+            "PK < (int8 a=6, int8 b=71, int8 c=-128) AND "
+            "a IN (1, 3, 6) AND b IN (40, 60, 70)",
+            spec_p7.ToString(schema));
+
+  spec_p8.PruneHashForInlistIfPossible(schema, partitions[7], partition_schema);
+  spec_p8.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p8.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=1, int8 b=20, int8 c=-128) AND "
+            "PK < (int8 a=6, int8 b=51, int8 c=-128) AND "
+            "a IN (1, 3, 6) AND b IN (20, 30, 50)",
+            spec_p8.ToString(schema));
+
+  spec_p9.PruneHashForInlistIfPossible(schema, partitions[8], partition_schema);
+  spec_p9.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p9.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=1, int8 b=10, int8 c=-128) AND "
+            "PK < (int8 a=6, int8 b=81, int8 c=-128) AND "
+            "a IN (1, 3, 6) AND b IN (10, 80)",
+            spec_p9.ToString(schema));
+}
+
+// Test that hash(a, b) IN list predicates should not be pruned.
+TEST_F(CompositeIntKeysTest, TesMultiHashColumnsInListHashPruning) {
+  ScanSpec spec;
+  AddInPredicate<int8_t>(&spec, "a", { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 });
+  AddInPredicate<int8_t>(&spec, "b", { 50, 100 });
+
+  Schema schema = schema_.CopyWithColumnIds();
+
+  PartitionSchema partition_schema;
+  GeneratePartitionSchema(schema,
+                          { pair<vector<std::string>, int>({ "a", "b" }, 3) },
+                          &partition_schema);
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+  ASSERT_EQ(3, partitions.size());
+
+  // clone scan_spec for different partition.
+  ScanSpec spec_p1 = spec;
+  ScanSpec spec_p2 = spec;
+  ScanSpec spec_p3 = spec;
+
+  spec_p1.PruneHashForInlistIfPossible(schema, partitions[0], partition_schema);
+  spec_p1.OptimizeScan(schema, &arena_, true);
+
+  // Verify that the predicates to be pushed to different partition should be
+  // the same when no hash prune happened.
+  SCOPED_TRACE(spec_p1.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=-128) AND "
+            "PK < (int8 a=9, int8 b=101, int8 c=-128) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100)",
+            spec_p1.ToString(schema));
+
+  spec_p2.PruneHashForInlistIfPossible(schema, partitions[1], partition_schema);
+  spec_p2.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p2.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=-128) AND "
+            "PK < (int8 a=9, int8 b=101, int8 c=-128) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100)",
+            spec_p2.ToString(schema));
+
+  spec_p3.PruneHashForInlistIfPossible(schema, partitions[2], partition_schema);
+  spec_p3.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p3.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=-128) AND "
+            "PK < (int8 a=9, int8 b=101, int8 c=-128) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100)",
+            spec_p3.ToString(schema));
+}
+
+// Test that hash(a, b), hash(c) InList predicates.
+// Neither a or b IN list can be pruned.
+// c IN list should be pruned.
+TEST_F(CompositeIntKeysTest, TesMultiHashKeyMultiHashInListHashPruning) {
+  ScanSpec spec;
+  AddInPredicate<int8_t>(&spec, "a", { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 });
+  AddInPredicate<int8_t>(&spec, "b", { 50, 100 });
+  AddInPredicate<int8_t>(&spec, "c", { 20, 30, 40, 50, 60, 70, 80, 90});
+
+  Schema schema = schema_.CopyWithColumnIds();
+
+  PartitionSchema partition_schema;
+  GeneratePartitionSchema(schema,
+                          { pair<vector<std::string>, int>({ "a", "b" }, 3),
+                            pair<vector<std::string>, int>({ "c" }, 3) },
+                          &partition_schema);
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+  ASSERT_EQ(9, partitions.size());
+
+  // clone scan_spec for different partition.
+  ScanSpec spec_p1 = spec;
+  ScanSpec spec_p2 = spec;
+  ScanSpec spec_p3 = spec;
+  ScanSpec spec_p4 = spec;
+  ScanSpec spec_p5 = spec;
+  ScanSpec spec_p6 = spec;
+  ScanSpec spec_p7 = spec;
+  ScanSpec spec_p8 = spec;
+  ScanSpec spec_p9 = spec;
+
+  spec_p1.PruneHashForInlistIfPossible(schema, partitions[0], partition_schema);
+  spec_p1.OptimizeScan(schema, &arena_, true);
+
+  // hash(a, b) should not be pruned, hash(c) should be pruned.
+  // p1, p4, p7 should have the same values to be pushed.
+  SCOPED_TRACE(spec_p1.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=40) AND "
+            "PK < (int8 a=9, int8 b=100, int8 c=71) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100) AND c IN (40, 60, 70)",
+            spec_p1.ToString(schema));
+
+  spec_p2.PruneHashForInlistIfPossible(schema, partitions[1], partition_schema);
+  spec_p2.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p2.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=20) AND "
+            "PK < (int8 a=9, int8 b=100, int8 c=51) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100) AND c IN (20, 30, 50)",
+            spec_p2.ToString(schema));
+
+  spec_p3.PruneHashForInlistIfPossible(schema, partitions[2], partition_schema);
+  spec_p3.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p3.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=80) AND "
+            "PK < (int8 a=9, int8 b=100, int8 c=91) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100) AND c IN (80, 90)",
+            spec_p3.ToString(schema));
+
+  spec_p4.PruneHashForInlistIfPossible(schema, partitions[3], partition_schema);
+  spec_p4.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p4.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=40) AND "
+            "PK < (int8 a=9, int8 b=100, int8 c=71) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100) AND c IN (40, 60, 70)",
+            spec_p4.ToString(schema));
+
+  spec_p5.PruneHashForInlistIfPossible(schema, partitions[4], partition_schema);
+  spec_p5.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p5.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=20) AND "
+            "PK < (int8 a=9, int8 b=100, int8 c=51) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100) AND c IN (20, 30, 50)",
+            spec_p5.ToString(schema));
+
+  spec_p6.PruneHashForInlistIfPossible(schema, partitions[5], partition_schema);
+  spec_p6.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p6.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=80) AND "
+            "PK < (int8 a=9, int8 b=100, int8 c=91) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100) AND c IN (80, 90)",
+            spec_p6.ToString(schema));
+
+  spec_p7.PruneHashForInlistIfPossible(schema, partitions[6], partition_schema);
+  spec_p7.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p7.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=40) AND "
+            "PK < (int8 a=9, int8 b=100, int8 c=71) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100) AND c IN (40, 60, 70)",
+            spec_p7.ToString(schema));
+
+  spec_p8.PruneHashForInlistIfPossible(schema, partitions[7], partition_schema);
+  spec_p8.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p8.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=20) AND "
+            "PK < (int8 a=9, int8 b=100, int8 c=51) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100) AND c IN (20, 30, 50)",
+            spec_p8.ToString(schema));
+
+  spec_p9.PruneHashForInlistIfPossible(schema, partitions[8], partition_schema);
+  spec_p9.OptimizeScan(schema, &arena_, true);
+
+  SCOPED_TRACE(spec_p9.ToString(schema));
+  EXPECT_EQ("PK >= (int8 a=0, int8 b=50, int8 c=80) AND "
+            "PK < (int8 a=9, int8 b=100, int8 c=91) AND "
+            "a IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) AND b IN (50, 100) AND c IN (80, 90)",
+            spec_p9.ToString(schema));
+}
+
 // Test that IN list mixed with range predicates get pushed into the primary key
 // bounds.
 TEST_F(CompositeIntKeysTest, TestInListPushdownWithRange) {
diff --git a/src/kudu/common/scan_spec.cc b/src/kudu/common/scan_spec.cc
index f9feb42..fc519d2 100644
--- a/src/kudu/common/scan_spec.cc
+++ b/src/kudu/common/scan_spec.cc
@@ -31,6 +31,8 @@
 #include "kudu/common/column_predicate.h"
 #include "kudu/common/encoded_key.h"
 #include "kudu/common/key_util.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/partition.h"
 #include "kudu/common/row.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
@@ -39,6 +41,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/array_view.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/status.h"
 
 using std::any_of;
 using std::max;
@@ -82,7 +85,9 @@ bool ScanSpec::CanShortCircuit() const {
 
   return any_of(predicates_.begin(), predicates_.end(),
                 [] (const pair<string, ColumnPredicate>& predicate) {
-                  return predicate.second.predicate_type() == PredicateType::None;
+                  return predicate.second.predicate_type() == PredicateType::None ||
+                      (predicate.second.predicate_type() == PredicateType::InList &&
+                       predicate.second.raw_values().empty());
                 });
 }
 
@@ -166,6 +171,36 @@ void ScanSpec::OptimizeScan(const Schema& schema,
   }
 }
 
+void ScanSpec::PruneHashForInlistIfPossible(const Schema& schema,
+                                            const Partition& partition,
+                                            const PartitionSchema& partition_schema) {
+  for (auto& predicate_pair : predicates_) {
+    auto& predicate = predicate_pair.second;
+    if (predicate.predicate_type() != PredicateType::InList) continue;
+
+    const string& col_name = predicate_pair.first;
+    int32_t idx;
+    Status s = schema.FindColumn(col_name, &idx);
+    if (!s.ok() || !schema.is_key_column(idx)) continue;
+
+    int hash_idx = partition_schema.TryGetSingleColumnHashPartitionIndex(schema, idx);
+    if (hash_idx == -1) continue;
+
+    auto* predicate_values = predicate.mutable_raw_values();
+    predicate_values->erase(std::remove_if(predicate_values->begin(), predicate_values->end(),
+          [idx, hash_idx, &schema, &partition, &partition_schema](const void* value) {
+        KuduPartialRow partial_row(&schema);
+        Status s = partial_row.Set(idx, reinterpret_cast<const uint8_t*>(value));
+        if (!s.ok()) return false;
+        bool is_value_in;
+        s = partition_schema.HashPartitionContainsRow(partition, partial_row,
+                                                      hash_idx, &is_value_in);
+        if (!s.ok()) return false;
+        return !is_value_in;
+      }), predicate_values->end());
+  }
+}
+
 vector<ColumnSchema> ScanSpec::GetMissingColumns(const Schema& projection) {
   vector<ColumnSchema> missing_cols;
   unordered_set<string> missing_col_names;
@@ -279,7 +314,7 @@ void ScanSpec::LiftPrimaryKeyBounds(const Schema& schema, Arena* arena) {
       if (is_exclusive) {
         ColumnPredicate predicate = ColumnPredicate::Range(column, lower, upper);
         if (predicate.predicate_type() == PredicateType::Equality &&
-            col_idx + 1 < num_key_columns) {
+            col_idx + 1 < num_key_columns && lower_bound_key_ != nullptr) {
           // If this turns out to be an equality predicate, then we can add one
           // more lower bound predicate from the next component (if it exists).
           //
diff --git a/src/kudu/common/scan_spec.h b/src/kudu/common/scan_spec.h
index 826d57b..3963f17 100644
--- a/src/kudu/common/scan_spec.h
+++ b/src/kudu/common/scan_spec.h
@@ -32,6 +32,8 @@ namespace kudu {
 class Arena;
 class ColumnSchema;
 class EncodedKey;
+class Partition;
+class PartitionSchema;
 class Schema;
 
 class ScanSpec {
@@ -71,6 +73,20 @@ class ScanSpec {
                     Arena* arena,
                     bool remove_pushed_predicates);
 
+  // Filter in-list predicate values with given hash partition schema.
+  // Only supports pruning for single-column hash schemas.
+  // Now support hash prune on:
+  //     hash(onekey), # support.
+  //     hash(onekey), hash(anotherkey) # support either.
+  //     hash(key_one, key_two), hash(anotherkey) # support only prune on anotherkey.
+  //
+  // TODO(ningw) For IN list predicate on hash(key_one, key_two) or more columns,
+  // if one predicate is IN list, and the rest predicate(s) are EQUAL, could
+  // have IN list predicate values prune as well.
+  void PruneHashForInlistIfPossible(const Schema& schema,
+                                    const Partition& partition,
+                                    const PartitionSchema& partition_schema);
+
   // Get columns that are present in the predicates but not in the projection
   std::vector<ColumnSchema> GetMissingColumns(const Schema& projection);
 
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 6b0704a..a73c5a0 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -2657,6 +2657,9 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   }
 
   VLOG(3) << "Before optimizing scan spec: " << spec.ToString(tablet_schema);
+  spec.PruneHashForInlistIfPossible(tablet_schema,
+                                    replica->tablet_metadata()->partition(),
+                                    replica->tablet_metadata()->partition_schema());
   spec.OptimizeScan(tablet_schema, scanner->arena(), true);
   VLOG(3) << "After optimizing scan spec: " << spec.ToString(tablet_schema);