You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2022/06/30 12:17:14 UTC

[kudu] branch master updated (3c415ea2f -> 00a2c7f29)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


    from 3c415ea2f KUDU-2671 introduce PartitionPruner::PrepareRangeSet()
     new 936d7edc4 KUDU-1644: Simplify InList predicate values based on rowset PK bounds
     new 00a2c7f29 [Catalog] Make DNS name of Kudu master case insensitive

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/common/generic_iterators.cc |   3 +-
 src/kudu/common/scan_spec.cc         |  28 +++---
 src/kudu/common/scan_spec.h          |   4 +
 src/kudu/master/sys_catalog.cc       |  13 ++-
 src/kudu/tablet/cfile_set-test.cc    | 188 ++++++++++++++++++++++++++++++++---
 src/kudu/tablet/cfile_set.cc         |  53 ++++++++--
 src/kudu/tablet/cfile_set.h          |  10 +-
 src/kudu/util/net/net_util.h         |   2 +-
 8 files changed, 258 insertions(+), 43 deletions(-)


[kudu] 02/02: [Catalog] Make DNS name of Kudu master case insensitive

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 00a2c7f29408968ab989215d56fc01dab4edac5b
Author: xinghuayu007 <14...@qq.com>
AuthorDate: Mon May 23 16:47:31 2022 +0800

    [Catalog] Make DNS name of Kudu master case insensitive
    
    Kudu master gets DNS name from config file when server startup
    and writes it into sys table. But after rewriting Raft config,
    it will be changed with different character case. Therefore it will
    be good to keep case insensitive for DNS name.
    
    In our situtation, the config file uses lower case DNS name,
    when rewriting Raft configure, user will input DNS name, which
    may be in upper case. Then Kudu master get an error when starting.
    
    Change-Id: If955e91a9e081dad7aebdcc497c413c8eb10eb4b
    Reviewed-on: http://gerrit.cloudera.org:8080/18567
    Tested-by: Kudu Jenkins
    Reviewed-by: Yingchun Lai <ac...@gmail.com>
---
 src/kudu/master/sys_catalog.cc | 13 ++++++++++---
 src/kudu/util/net/net_util.h   |  2 +-
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 2b31d6bbb..58a44922b 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -83,6 +83,7 @@
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/slice.h"
+#include "kudu/util/string_case.h"
 
 DEFINE_double(sys_catalog_fail_during_write, 0.0,
               "Fraction of the time when system table writes will fail");
@@ -221,7 +222,7 @@ Status SysCatalogTable::VerifyAndPopulateSingleMasterConfig(ConsensusMetadata* c
     if (peer.has_last_known_addr()) {
       // Verify the supplied master address matches with the on-disk Raft config.
       auto raft_master_addr = HostPortFromPB(peer.last_known_addr());
-      if (raft_master_addr != master_addr) {
+      if (!iequals(raft_master_addr.ToString(), master_addr.ToString())) {
         return Status::InvalidArgument(
             Substitute("Single master Raft config error. On-disk master: $0 and "
                        "supplied master: $1 are different", raft_master_addr.ToString(),
@@ -273,9 +274,12 @@ Status SysCatalogTable::Load(FsManager *fs_manager) {
     // Make sure the set of masters passed in at start time matches the set in
     // the on-disk cmeta.
     set<string> peer_addrs_from_opts;
+    string hp_str;
     const auto& master_addresses = master_->opts().master_addresses();
     for (const auto& hp : master_addresses) {
-      peer_addrs_from_opts.insert(hp.ToString());
+      hp_str = hp.ToString();
+      ToLowerCase(&hp_str);
+      peer_addrs_from_opts.insert(hp_str);
     }
     if (peer_addrs_from_opts.size() < master_addresses.size()) {
       LOG(WARNING) << Substitute("Found duplicates in --master_addresses: "
@@ -283,8 +287,11 @@ Status SysCatalogTable::Load(FsManager *fs_manager) {
                                  JoinStrings(peer_addrs_from_opts, ", "));
     }
     set<string> peer_addrs_from_disk;
+    string last_known_addr_str;
     for (const auto& p : cstate.committed_config().peers()) {
-      peer_addrs_from_disk.insert(HostPortFromPB(p.last_known_addr()).ToString());
+      last_known_addr_str = HostPortFromPB(p.last_known_addr()).ToString();
+      ToLowerCase(&last_known_addr_str);
+      peer_addrs_from_disk.insert(last_known_addr_str);
     }
     vector<string> symm_diff;
     std::set_symmetric_difference(peer_addrs_from_opts.begin(),
diff --git a/src/kudu/util/net/net_util.h b/src/kudu/util/net/net_util.h
index ea77bc686..ff756d702 100644
--- a/src/kudu/util/net/net_util.h
+++ b/src/kudu/util/net/net_util.h
@@ -94,7 +94,7 @@ class HostPort {
   // Takes a vector of HostPort objects and returns a comma separated
   // string containing of "host:port" pairs. This method is the
   // "inverse" of ParseStrings().
-  static std::string ToCommaSeparatedString(const std::vector<HostPort>& host_ports);
+  static std::string ToCommaSeparatedString(const std::vector<HostPort>& hostports);
 
   // Returns true if addr is within 127.0.0.0/8 range.
   static bool IsLoopback(uint32_t addr);


[kudu] 01/02: KUDU-1644: Simplify InList predicate values based on rowset PK bounds

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 936d7edc4e4b69d2e1f1dffc96760cb3fd57a934
Author: zhangyifan27 <ch...@163.com>
AuthorDate: Mon Apr 18 21:17:00 2022 +0800

    KUDU-1644: Simplify InList predicate values based on rowset PK bounds
    
    Previous we only optimize InList predicates based on tablet PK bounds, we can
    also optimize it at the DRS level. By adding the implicit PK bounds, InList
    predicate can be simplified. Also, the DRS bounds info can be used to skip rows
    effectively when we have a predicate on a non-prefix of the primary key and the
    leading column(s) have cardinality=1 (as described in KUDU-1291).
    
    Benchmark tests result(in slow mode):
    before
    Selected 10000 rows cost 2.519996 seconds. # PredicateOnFirstColumn
    Selected 100 rows cost 2.040003 seconds. # PredicateOnSecondColumn
    after
    Selected 10000 rows cost 1.771755 seconds. # PredicateOnFirstColumn
    Selected 100 rows cost 0.131996 seconds. # PredicateOnSecondColumn
    
    Change-Id: Ia9c2aa958f19a0b62e40a2ef5eb5365f91cbab80
    Reviewed-on: http://gerrit.cloudera.org:8080/18434
    Tested-by: Kudu Jenkins
    Reviewed-by: Yingchun Lai <ac...@gmail.com>
---
 src/kudu/common/generic_iterators.cc |   3 +-
 src/kudu/common/scan_spec.cc         |  28 +++---
 src/kudu/common/scan_spec.h          |   4 +
 src/kudu/tablet/cfile_set-test.cc    | 188 ++++++++++++++++++++++++++++++++---
 src/kudu/tablet/cfile_set.cc         |  53 ++++++++--
 src/kudu/tablet/cfile_set.h          |  10 +-
 6 files changed, 247 insertions(+), 39 deletions(-)

diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index 46fc55055..10457e4d8 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -1175,7 +1175,8 @@ Status MaterializingIterator::Init(ScanSpec *spec) {
   const int32_t num_columns = schema().num_columns();
   col_idx_predicates_.clear();
   non_predicate_column_indexes_.clear();
-  if (PREDICT_FALSE(!disallow_pushdown_for_tests_) && spec != nullptr) {
+  if (PREDICT_TRUE(!disallow_pushdown_for_tests_) && spec != nullptr &&
+      !spec->predicates().empty()) {
     col_idx_predicates_.reserve(spec->predicates().size());
     DCHECK_GE(num_columns, spec->predicates().size());
     non_predicate_column_indexes_.reserve(num_columns - spec->predicates().size());
diff --git a/src/kudu/common/scan_spec.cc b/src/kudu/common/scan_spec.cc
index 688a5e2e3..3fdfaa837 100644
--- a/src/kudu/common/scan_spec.cc
+++ b/src/kudu/common/scan_spec.cc
@@ -147,19 +147,15 @@ string ScanSpec::ToString(const Schema& schema) const {
   return JoinStrings(preds, " AND ") + limit;
 }
 
-void ScanSpec::OptimizeScan(const Schema& schema,
-                            Arena* arena,
-                            bool remove_pushed_predicates) {
-  // Don't bother if we can already short circuit the scan. This also lets us
-  // rely on lower_bound_key_ < exclusive_upper_bound_key_ and no None
-  // predicates in the optimization step.
-  if (!CanShortCircuit()) {
-    LiftPrimaryKeyBounds(schema, arena);
-    // Predicates may be has None, after merge PrimaryKeyBounds and Predicates
-    if (!CanShortCircuit()) {
-      PushPredicatesIntoPrimaryKeyBounds(schema, arena, remove_pushed_predicates);
-    }
-  }
+void ScanSpec::UnifyPrimaryKeyBoundsAndColumnPredicates(const Schema& schema,
+                                                        Arena* arena,
+                                                        bool remove_pushed_predicates) {
+  LiftPrimaryKeyBounds(schema, arena);
+  PushPredicatesIntoPrimaryKeyBounds(schema, arena, remove_pushed_predicates);
+}
+
+void ScanSpec::OptimizeScan(const Schema& schema, Arena* arena, bool remove_pushed_predicates) {
+  UnifyPrimaryKeyBoundsAndColumnPredicates(schema, arena, remove_pushed_predicates);
 
   // KUDU-1652: Filter IS NOT NULL predicates for non-nullable columns.
   for (auto itr = predicates_.begin(); itr != predicates_.end(); ) {
@@ -227,6 +223,9 @@ vector<ColumnSchema> ScanSpec::GetMissingColumns(const Schema& projection) {
 void ScanSpec::PushPredicatesIntoPrimaryKeyBounds(const Schema& schema,
                                                   Arena* arena,
                                                   bool remove_pushed_predicates) {
+  if (CanShortCircuit()) {
+    return;
+  }
   // Step 1: load key column predicate values into a pair of rows.
   uint8_t* lower_buf = static_cast<uint8_t*>(
       CHECK_NOTNULL(arena->AllocateBytes(schema.key_byte_size())));
@@ -279,6 +278,9 @@ void ScanSpec::PushPredicatesIntoPrimaryKeyBounds(const Schema& schema,
 }
 
 void ScanSpec::LiftPrimaryKeyBounds(const Schema& schema, Arena* arena) {
+  if (CanShortCircuit()) {
+    return;
+  }
   if (lower_bound_key_ == nullptr && exclusive_upper_bound_key_ == nullptr) { return; }
   int32_t num_key_columns = schema.num_key_columns();
   for (int32_t col_idx = 0; col_idx < num_key_columns; col_idx++) {
diff --git a/src/kudu/common/scan_spec.h b/src/kudu/common/scan_spec.h
index 506612d5b..e53474e9e 100644
--- a/src/kudu/common/scan_spec.h
+++ b/src/kudu/common/scan_spec.h
@@ -67,6 +67,10 @@ class ScanSpec {
   // into the upper or lower primary key bounds are removed.
   //
   // Idempotent.
+  void UnifyPrimaryKeyBoundsAndColumnPredicates(const Schema& schema,
+                                                Arena* arena,
+                                                bool remove_pushed_predicates);
+
   void OptimizeScan(const Schema& schema,
                     Arena* arena,
                     bool remove_pushed_predicates);
diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc
index d3f66c376..6b115cbe9 100644
--- a/src/kudu/tablet/cfile_set-test.cc
+++ b/src/kudu/tablet/cfile_set-test.cc
@@ -47,6 +47,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/diskrowset.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/util/block_bloom_filter.h"
@@ -57,7 +58,9 @@
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
 
 DECLARE_int32(cfile_default_block_size);
 
@@ -69,6 +72,18 @@ using std::vector;
 namespace kudu {
 namespace tablet {
 
+ScanSpec GetInListScanSpec(const ColumnSchema& col_schema, const vector<int>& value_list) {
+  vector<const void*> pred_list;
+  pred_list.reserve(value_list.size());
+  for (int i = 0; i < value_list.size(); i++) {
+    pred_list.emplace_back(&value_list[i]);
+  }
+  ColumnPredicate pred = ColumnPredicate::InList(col_schema, &pred_list);
+  ScanSpec spec;
+  spec.AddPredicate(pred);
+  return spec;
+}
+
 class TestCFileSet : public KuduRowSetTest {
  public:
   TestCFileSet() :
@@ -161,16 +176,16 @@ class TestCFileSet : public KuduRowSetTest {
       uint32_t hash2 = HashUtil::ComputeHash32(second, FAST_HASH, 0);
 
       if (bf1_contain->Find(hash1)) {
-        ret1_contain->push_back(i);
+        ret1_contain->push_back(curr1);
       }
       if (bf1_exclude->Find(hash1)) {
-        ret1_exclude->push_back(i);
+        ret1_exclude->push_back(curr1);
       }
       if (bf2_contain->Find(hash2)) {
-        ret2_contain->push_back(i);
+        ret2_contain->push_back(curr1);
       }
       if (bf2_exclude->Find(hash2)) {
-        ret2_exclude->push_back(i);
+        ret2_exclude->push_back(curr1);
       }
     }
   }
@@ -227,7 +242,7 @@ class TestCFileSet : public KuduRowSetTest {
       spec.AddPredicate(pred);
     }
     ASSERT_OK(iter->Init(&spec));
-    // Check that the range was respected on all the results.
+    // Check that InBloomFilter predicates were respected on all the results.
     RowBlockMemory mem(1024);
     RowBlock block(&schema_, 100, &mem);
     while (iter->HasNext()) {
@@ -235,8 +250,8 @@ class TestCFileSet : public KuduRowSetTest {
       for (size_t i = 0; i < block.nrows(); i++) {
         if (block.selection_vector()->IsRowSelected(i)) {
           RowBlockRow row = block.row(i);
-          size_t index = row.row_index();
-          auto target_iter = std::find(target.begin(), target.end(), index);
+          int32_t row_key = *schema_.ExtractColumnFromRow<INT32>(row, 0);
+          auto target_iter = std::find(target.begin(), target.end(), row_key);
           if (target_iter == target.end()) {
             FAIL() << "Row " << schema_.DebugRow(row) << " should not have "
                    << "passed predicate ";
@@ -252,9 +267,49 @@ class TestCFileSet : public KuduRowSetTest {
     }
   }
 
-  Status MaterializeColumn(CFileSet::Iterator *iter,
-                           size_t col_idx,
-                           ColumnBlock *cb) {
+  // Issue a InList scan and verify that all result rows indeed fall inside that predicate.
+  void DoTestInListScan(const shared_ptr<CFileSet>& fileset, int upper_bound, int interval) {
+    // Create iterator.
+    unique_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_, nullptr));
+    unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(cfile_iter)));
+
+    // Create a scan with a InList predicate on the key column.
+    vector<int> value_list;
+    vector<int> result_list;
+    for (int i = 0; i < upper_bound; i += interval) {
+      value_list.emplace_back(i * kRatio[0]);
+      result_list.emplace_back(i * kRatio[0]);
+    }
+    auto spec = GetInListScanSpec(schema_.column(0), value_list);
+    ASSERT_OK(iter->Init(&spec));
+
+    // Check that the InList predicate was respected on all the results.
+    RowBlockMemory mem(1024);
+    RowBlock block(&schema_, 100, &mem);
+    int selected_size = 0;
+    while (iter->HasNext()) {
+      mem.Reset();
+      ASSERT_OK_FAST(iter->NextBlock(&block));
+      for (size_t i = 0; i < block.nrows(); i++) {
+        if (block.selection_vector()->IsRowSelected(i)) {
+          RowBlockRow row = block.row(i);
+          int32_t row_key = *schema_.ExtractColumnFromRow<INT32>(row, 0);
+          auto target_iter = std::find(result_list.begin(), result_list.end(), row_key);
+          if (target_iter == result_list.end()) {
+            FAIL() << "Row " << schema_.DebugRow(row) << " should not have passed predicate.";
+          }
+          result_list.erase(target_iter);
+        }
+      }
+      selected_size += block.selection_vector()->CountSelected();
+    }
+    LOG(INFO) << "Selected size: " << selected_size;
+    if (!result_list.empty()) {
+      FAIL() << result_list.size() << " values should have passed predicate.";
+    }
+  }
+
+  static Status MaterializeColumn(CFileSet::Iterator* iter, size_t col_idx, ColumnBlock* cb) {
     SelectionVector sel(cb->nrows());
     ColumnMaterializationContext ctx(col_idx, nullptr, cb, &sel);
     return iter->MaterializeColumn(&ctx);
@@ -544,14 +599,10 @@ TEST_F(TestCFileSet, TestBloomFilterPredicates) {
   // BloomFilter of column 0 contain with lower and upper bound.
   int32_t lower = 8;
   int32_t upper = 58;
-  int32_t lower_row_index = lower / 2;
-  int32_t upper_row_index = upper / 2;
   vector<size_t> ret1_contain_range = ret1_contain;
-  auto left = std::lower_bound(ret1_contain_range.begin(),
-                               ret1_contain_range.end(), lower_row_index);
+  auto left = std::lower_bound(ret1_contain_range.begin(), ret1_contain_range.end(), lower);
   ret1_contain_range.erase(ret1_contain_range.begin(), left); // don't erase left
-  auto right = std::lower_bound(ret1_contain_range.begin(),
-                                ret1_contain_range.end(), upper_row_index);
+  auto right = std::lower_bound(ret1_contain_range.begin(), ret1_contain_range.end(), upper);
   ret1_contain_range.erase(right, ret1_contain_range.end()); // earse right
   auto range = ColumnPredicate::Range(schema_.column(0), &lower, &upper);
   DoTestBloomFilterScan(fileset, { pred1_contain, range }, ret1_contain_range);
@@ -562,5 +613,110 @@ TEST_F(TestCFileSet, TestBloomFilterPredicates) {
   DoTestBloomFilterScan(fileset, { bf_with_range }, ret1_contain_range);
 }
 
+TEST_F(TestCFileSet, TestInListPredicates) {
+  const int kNumRows = 10000;
+  WriteTestRowSet(kNumRows);
+
+  shared_ptr<CFileSet> fileset;
+  ASSERT_OK(CFileSet::Open(
+      rowset_meta_, MemTracker::GetRootTracker(), MemTracker::GetRootTracker(), nullptr, &fileset));
+
+  // Test different size and interval.
+  DoTestInListScan(fileset, 1, 1);
+  DoTestInListScan(fileset, 10, 1);
+  DoTestInListScan(fileset, 100, 5);
+  DoTestInListScan(fileset, 1000, 10);
+}
+
+class InListPredicateBenchmark : public KuduRowSetTest {
+ public:
+  InListPredicateBenchmark()
+      : KuduRowSetTest(Schema({ColumnSchema("c0", INT32), ColumnSchema("c1", INT32)}, 2)) {}
+
+  void SetUp() OVERRIDE {
+    KuduRowSetTest::SetUp();
+
+    // Use a small cfile block size, so that when we skip materializing a given
+    // column for 10,000 rows, it can actually skip over a number of blocks.
+    FLAGS_cfile_default_block_size = 512;
+  }
+
+  // Write out a test rowset with two int columns.
+  // The two columns make up a composite primary key.
+  // The first column contains only one value 1.
+  // The second contains the row index * 10.
+  void WriteTestRowSet(int nrows) {
+    DiskRowSetWriter rsw(
+        rowset_meta_.get(), &schema_, BloomFilterSizing::BySizeAndFPRate(32 * 1024, 0.01F));
+
+    ASSERT_OK(rsw.Open());
+
+    RowBuilder rb(&schema_);
+    for (int i = 0; i < nrows; i++) {
+      rb.Reset();
+      rb.AddInt32(1);
+      rb.AddInt32(i * 10);
+      ASSERT_OK_FAST(WriteRow(rb.data(), &rsw));
+    }
+    ASSERT_OK(rsw.Finish());
+  }
+
+  void BenchmarkInListScan(const ColumnSchema& col_schema, const vector<int>& value_list) {
+    // Write some rows and open the fileset.
+    const int kNumRows = 10000;
+    const int kNumIters = AllowSlowTests() ? 10000 : 100;
+    WriteTestRowSet(kNumRows);
+    shared_ptr<CFileSet> fileset;
+    ASSERT_OK(CFileSet::Open(rowset_meta_,
+                             MemTracker::GetRootTracker(),
+                             MemTracker::GetRootTracker(),
+                             nullptr,
+                             &fileset));
+    Stopwatch sw;
+    sw.start();
+    int selected_size = 0;
+    for (int i = 0; i < kNumIters; i++) {
+      // LOG(INFO) << "Start scan";
+      // Create iterator.
+      unique_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_, nullptr));
+      unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(cfile_iter)));
+      // Create a scan with a InList predicate on the key column.
+      auto spec = GetInListScanSpec(col_schema, value_list);
+      ASSERT_OK(iter->Init(&spec));
+      RowBlockMemory mem(1024);
+      RowBlock block(&schema_, 100, &mem);
+      selected_size = 0;
+      while (iter->HasNext()) {
+        mem.Reset();
+        ASSERT_OK_FAST(iter->NextBlock(&block));
+        selected_size += block.selection_vector()->CountSelected();
+      }
+    }
+    sw.stop();
+    LOG(INFO) << Substitute(
+        "Selected $0 rows cost $1 seconds.", selected_size, sw.elapsed().user_cpu_seconds());
+  }
+};
+
+TEST_F(InListPredicateBenchmark, PredicateOnFirstColumn) {
+  // Test an "IN" predicate on the first column which could be optimized to a "=" predicate.
+  vector<int> value_list;
+  value_list.reserve(100);
+  for (int i = 0; i < 100; i++) {
+    value_list.emplace_back(i);
+  }
+  BenchmarkInListScan(schema_.column(0), value_list);
+}
+
+TEST_F(InListPredicateBenchmark, PredicateOnSecondColumn) {
+  // Test an "IN" predicate on the second column which could be optimized to skip unnecessary rows.
+  vector<int> value_list;
+  value_list.reserve(100);
+  for (int i = 100; i < 200; i++) {
+    value_list.emplace_back(i * 10);
+  }
+  BenchmarkInListScan(schema_.column(1), value_list);
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/cfile_set.cc b/src/kudu/tablet/cfile_set.cc
index 6a121588e..e145edbae 100644
--- a/src/kudu/tablet/cfile_set.cc
+++ b/src/kudu/tablet/cfile_set.cc
@@ -390,6 +390,7 @@ Status CFileSet::Iterator::Init(ScanSpec *spec) {
   CHECK(!initted_);
 
   RETURN_NOT_OK(base_data_->CountRows(io_context_, &row_count_));
+  CHECK_GT(row_count_, 0);
 
   // Setup key iterator.
   RETURN_NOT_OK(base_data_->NewKeyIterator(io_context_, &key_iter_));
@@ -397,9 +398,17 @@ Status CFileSet::Iterator::Init(ScanSpec *spec) {
   // Setup column iterators.
   RETURN_NOT_OK(CreateColumnIterators(spec));
 
-  // If there is a range predicate on the key column, push that down into an
-  // ordinal range.
-  RETURN_NOT_OK(PushdownRangeScanPredicate(spec));
+  lower_bound_idx_ = 0;
+  upper_bound_idx_ = row_count_;
+  RETURN_NOT_OK(OptimizePKPredicates(spec));
+  if (spec != nullptr && spec->CanShortCircuit()) {
+    lower_bound_idx_ = row_count_;
+    spec->RemovePredicates();
+  } else {
+    // If there is a range predicate on the key column, push that down into an
+    // ordinal range.
+    RETURN_NOT_OK(PushdownRangeScanPredicate(spec));
+  }
 
   initted_ = true;
 
@@ -410,12 +419,42 @@ Status CFileSet::Iterator::Init(ScanSpec *spec) {
   return Status::OK();
 }
 
-Status CFileSet::Iterator::PushdownRangeScanPredicate(ScanSpec *spec) {
-  CHECK_GT(row_count_, 0);
+Status CFileSet::Iterator::OptimizePKPredicates(ScanSpec* spec) {
+  if (spec == nullptr) {
+    // No predicate.
+    return Status::OK();
+  }
 
-  lower_bound_idx_ = 0;
-  upper_bound_idx_ = row_count_;
+  const EncodedKey* lb_key = spec->lower_bound_key();
+  const EncodedKey* ub_key = spec->exclusive_upper_bound_key();
+  EncodedKey* implicit_lb_key = nullptr;
+  EncodedKey* implicit_ub_key = nullptr;
+  bool modify_lower_bound_key = false;
+  bool modify_upper_bound_key = false;
+  const Schema& tablet_schema = *base_data_->tablet_schema();
+
+  if (!lb_key || lb_key->encoded_key() < base_data_->min_encoded_key_) {
+    RETURN_NOT_OK(EncodedKey::DecodeEncodedString(
+        tablet_schema, &arena_, base_data_->min_encoded_key_, &implicit_lb_key));
+    spec->SetLowerBoundKey(implicit_lb_key);
+    modify_lower_bound_key = true;
+  }
+
+  RETURN_NOT_OK(EncodedKey::DecodeEncodedString(
+      tablet_schema, &arena_, base_data_->max_encoded_key_, &implicit_ub_key));
+  RETURN_NOT_OK(EncodedKey::IncrementEncodedKey(tablet_schema, &implicit_ub_key, &arena_));
+  if (!ub_key || ub_key->encoded_key() > implicit_ub_key->encoded_key()) {
+    spec->SetExclusiveUpperBoundKey(implicit_ub_key);
+    modify_upper_bound_key = true;
+  }
+
+  if (modify_lower_bound_key || modify_upper_bound_key) {
+    spec->UnifyPrimaryKeyBoundsAndColumnPredicates(tablet_schema, &arena_, true);
+  }
+  return Status::OK();
+}
 
+Status CFileSet::Iterator::PushdownRangeScanPredicate(ScanSpec* spec) {
   if (spec == nullptr) {
     // No predicate.
     return Status::OK();
diff --git a/src/kudu/tablet/cfile_set.h b/src/kudu/tablet/cfile_set.h
index ae21b8232..7a119c4c6 100644
--- a/src/kudu/tablet/cfile_set.h
+++ b/src/kudu/tablet/cfile_set.h
@@ -38,6 +38,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/tablet/rowset_metadata.h"
 #include "kudu/util/make_shared.h"
+#include "kudu/util/memory/arena.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -222,18 +223,22 @@ class CFileSet::Iterator : public ColumnwiseIterator {
   friend class CFileSet;
 
   // 'projection' must remain valid for the lifetime of this object.
-  Iterator(std::shared_ptr<CFileSet const> base_data, const Schema* projection,
+  Iterator(std::shared_ptr<CFileSet const> base_data,
+           const Schema* projection,
            const fs::IOContext* io_context)
       : base_data_(std::move(base_data)),
         projection_(projection),
         initted_(false),
         cur_idx_(0),
         prepared_count_(0),
-        io_context_(io_context) {}
+        io_context_(io_context),
+        arena_(256) {}
 
   // Fill in col_iters_ for each of the requested columns.
   Status CreateColumnIterators(const ScanSpec* spec);
 
+  Status OptimizePKPredicates(ScanSpec* spec);
+
   // Look for a predicate which can be converted into a range scan using the key
   // column's index. If such a predicate exists, remove it from the scan spec and
   // store it in member fields.
@@ -276,6 +281,7 @@ class CFileSet::Iterator : public ColumnwiseIterator {
   // stored in 'col_iters_'.
   std::vector<cfile::ColumnIterator*> prepared_iters_;
 
+  Arena arena_;
 };
 
 } // namespace tablet