You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/04/15 21:25:20 UTC

[kudu] branch master updated: generic_iterators: pass rowset bounds into grouping iterators

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new f324ba2  generic_iterators: pass rowset bounds into grouping iterators
f324ba2 is described below

commit f324ba257ae1b256bf6f4f4b33e25d7937e7b086
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Fri Nov 16 16:50:24 2018 -0800

    generic_iterators: pass rowset bounds into grouping iterators
    
    The rowset bounds will be used to reduce MergeIterator memory consumption by
    restricting the "eager RowBlock fetching" behavior at Init time to only
    those sub-iterators whose rowsets could conceivably participate in the
    beginning of the merge.
    
    Change-Id: Id1aebd84507f87c869d781e117225c37c8f15969
    Reviewed-on: http://gerrit.cloudera.org:8080/12946
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/generic_iterators-test.cc | 85 +++++++++++++++++++++++--------
 src/kudu/common/generic_iterators.cc      | 78 +++++++++++++++-------------
 src/kudu/common/generic_iterators.h       | 21 ++++++--
 src/kudu/tablet/rowset.cc                 | 33 +++++++++---
 src/kudu/tablet/rowset.h                  |  8 ++-
 src/kudu/tablet/tablet.cc                 | 26 +++++-----
 src/kudu/tablet/tablet.h                  |  3 +-
 7 files changed, 171 insertions(+), 83 deletions(-)

diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 3662299..6c95118 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -30,6 +30,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
@@ -41,6 +42,7 @@
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/iterator.h"
+#include "kudu/common/key_encoder.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
@@ -61,6 +63,7 @@ DEFINE_int32(num_iters, 1, "Number of times to run merge");
 DECLARE_bool(materializing_iterator_do_pushdown);
 
 using std::map;
+using std::pair;
 using std::string;
 using std::unique_ptr;
 using std::unordered_set;
@@ -201,10 +204,12 @@ TEST(TestMergeIterator, TestMergeEmpty) {
   unique_ptr<RowwiseIterator> iter(
     NewMaterializingIterator(
         unique_ptr<ColumnwiseIterator>(new VectorIterator({}))));
-  vector<unique_ptr<RowwiseIterator>> input;
-  input.emplace_back(std::move(iter));
-  unique_ptr<RowwiseIterator> merger(
-      NewMergeIterator(MergeIteratorOptions(/*include_deleted_rows=*/false), std::move(input)));
+  vector<IterWithBounds> input;
+  IterWithBounds iwb;
+  iwb.iter = std::move(iter);
+  input.emplace_back(std::move(iwb));
+  unique_ptr<RowwiseIterator> merger(NewMergeIterator(
+      MergeIteratorOptions(/*include_deleted_rows=*/false), std::move(input)));
   ASSERT_OK(merger->Init(nullptr));
   ASSERT_FALSE(merger->HasNext());
 }
@@ -217,10 +222,12 @@ TEST(TestMergeIterator, TestMergeEmptyViaSelectionVector) {
   unique_ptr<VectorIterator> vec(new VectorIterator({ 1, 2, 3 }));
   vec->set_selection_vector(&sv);
   unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(vec)));
-  vector<unique_ptr<RowwiseIterator>> input;
-  input.emplace_back(std::move(iter));
-  unique_ptr<RowwiseIterator> merger(
-      NewMergeIterator(MergeIteratorOptions(/*include_deleted_rows=*/false), std::move(input)));
+  vector<IterWithBounds> input;
+  IterWithBounds iwb;
+  iwb.iter = std::move(iter);
+  input.emplace_back(std::move(iwb));
+  unique_ptr<RowwiseIterator> merger(NewMergeIterator(
+      MergeIteratorOptions(/*include_deleted_rows=*/false), std::move(input)));
   ASSERT_OK(merger->Init(nullptr));
   ASSERT_FALSE(merger->HasNext());
 }
@@ -232,10 +239,16 @@ TEST(TestMergeIterator, TestNotConsumedCleanup) {
   unique_ptr<VectorIterator> vec2(new VectorIterator({ 2 }));
   unique_ptr<VectorIterator> vec3(new VectorIterator({ 3 }));
 
-  vector<unique_ptr<RowwiseIterator>> input;
-  input.emplace_back(NewMaterializingIterator(std::move(vec1)));
-  input.emplace_back(NewMaterializingIterator(std::move(vec2)));
-  input.emplace_back(NewMaterializingIterator(std::move(vec3)));
+  vector<IterWithBounds> input;
+  IterWithBounds iwb1;
+  iwb1.iter = NewMaterializingIterator(std::move(vec1));
+  input.emplace_back(std::move(iwb1));
+  IterWithBounds iwb2;
+  iwb2.iter = NewMaterializingIterator(std::move(vec2));
+  input.emplace_back(std::move(iwb2));
+  IterWithBounds iwb3;
+  iwb3.iter = NewMaterializingIterator(std::move(vec3));
+  input.emplace_back(std::move(iwb3));
   unique_ptr<RowwiseIterator> merger(NewMergeIterator(
       MergeIteratorOptions(/*include_deleted_rows=*/false), std::move(input)));
   ASSERT_OK(merger->Init(nullptr));
@@ -279,11 +292,13 @@ void TestMerge(const Schema& schema,
     vector<int64_t> ints;
     vector<uint8_t> is_deleted;
     unique_ptr<SelectionVector> sv;
+    boost::optional<pair<string, string>> encoded_bounds;
   };
 
   vector<List> all_ints;
   map<int64_t, bool> expected;
   unordered_set<int64_t> seen_live;
+  const auto& encoder = GetKeyEncoder<string>(GetTypeInfo(INT64));
   Random prng(SeedRandom());
 
   int64_t entry = 0;
@@ -291,6 +306,8 @@ void TestMerge(const Schema& schema,
     List list(FLAGS_num_rows);
     unordered_set<int64_t> seen_this_list;
 
+    boost::optional<int64_t> min_entry;
+    boost::optional<int64_t> max_entry;
     if (overlapping_ranges) {
       entry = 0;
     }
@@ -328,6 +345,13 @@ void TestMerge(const Schema& schema,
       list.ints.emplace_back(entry);
       list.is_deleted.emplace_back(is_deleted);
 
+      if (!max_entry || entry > max_entry) {
+        max_entry = entry;
+      }
+      if (!min_entry || entry < min_entry) {
+        min_entry = entry;
+      }
+
       // Some entries are randomly deselected in order to exercise the selection
       // vector logic in the MergeIterator. This is reflected both in the input
       // to the MergeIterator as well as the expected output (see below).
@@ -353,6 +377,17 @@ void TestMerge(const Schema& schema,
       }
     }
 
+    if (prng.Uniform(10) > 0) {
+      // Use the smallest and largest entries as bounds most of the time. They
+      // are randomly adjusted to reflect their inexactness in the real world.
+      list.encoded_bounds.emplace();
+      DCHECK(min_entry);
+      DCHECK(max_entry);
+      min_entry = *min_entry - prng.Uniform(5);
+      max_entry = *max_entry + prng.Uniform(5);
+      encoder.Encode(&(min_entry.get()), &list.encoded_bounds->first);
+      encoder.Encode(&(max_entry.get()), &list.encoded_bounds->second);
+    }
     all_ints.emplace_back(std::move(list));
   }
 
@@ -367,16 +402,23 @@ void TestMerge(const Schema& schema,
   const int kIsDeletedIndex = schema.find_first_is_deleted_virtual_column();
 
   for (int trial = 0; trial < FLAGS_num_iters; trial++) {
-    vector<unique_ptr<RowwiseIterator>> to_merge;
+    vector<IterWithBounds> to_merge;
     for (const auto& list : all_ints) {
       unique_ptr<VectorIterator> vec_it(new VectorIterator(list.ints, list.is_deleted, schema));
       vec_it->set_block_size(10);
       vec_it->set_selection_vector(list.sv.get());
       unique_ptr<RowwiseIterator> mat_it(NewMaterializingIterator(std::move(vec_it)));
-      vector<unique_ptr<RowwiseIterator>> input;
-      input.emplace_back(std::move(mat_it));
-      unique_ptr<RowwiseIterator> un_it(NewUnionIterator(std::move(input)));
-      to_merge.emplace_back(std::move(un_it));
+      IterWithBounds mat_iwb;
+      mat_iwb.iter = std::move(mat_it);
+      vector<IterWithBounds> un_input;
+      un_input.emplace_back(std::move(mat_iwb));
+      unique_ptr<RowwiseIterator> un_it(NewUnionIterator(std::move(un_input)));
+      IterWithBounds un_iwb;
+      un_iwb.iter = std::move(un_it);
+      if (list.encoded_bounds) {
+        un_iwb.encoded_bounds = list.encoded_bounds;
+      }
+      to_merge.emplace_back(std::move(un_iwb));
     }
 
     // Setup predicate exclusion
@@ -385,9 +427,8 @@ void TestMerge(const Schema& schema,
     LOG(INFO) << "Predicate: " << predicate.pred_.ToString();
 
     LOG_TIMING(INFO, "iterating merged lists") {
-      unique_ptr<RowwiseIterator> merger(
-          NewMergeIterator(MergeIteratorOptions(include_deleted_rows),
-                           std::move(to_merge)));
+      unique_ptr<RowwiseIterator> merger(NewMergeIterator(
+          MergeIteratorOptions(include_deleted_rows), std::move(to_merge)));
       ASSERT_OK(merger->Init(&spec));
 
       RowBlock dst(&schema, 100, nullptr);
@@ -472,7 +513,7 @@ TEST(TestMaterializingIterator, TestMaterializingPredicatePushdown) {
     ints[i] = i;
   }
 
-  unique_ptr<VectorIterator> colwise(new VectorIterator(std::move(ints)));
+  unique_ptr<VectorIterator> colwise(new VectorIterator(ints));
   unique_ptr<RowwiseIterator> materializing(NewMaterializingIterator(std::move(colwise)));
   ASSERT_OK(materializing->Init(&spec));
   ASSERT_EQ(0, spec.predicates().size()) << "Iterator should have pushed down predicate";
@@ -505,7 +546,7 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) {
 
   // Set up a MaterializingIterator with pushdown disabled, so that the
   // PredicateEvaluatingIterator will wrap it and do evaluation.
-  unique_ptr<VectorIterator> colwise(new VectorIterator(std::move(ints)));
+  unique_ptr<VectorIterator> colwise(new VectorIterator(ints));
   FLAGS_materializing_iterator_do_pushdown = false;
   unique_ptr<RowwiseIterator> materializing(
       NewMaterializingIterator(std::move(colwise)));
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index 8659bc2..acb4f5d 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -103,13 +103,13 @@ static const int kMergeRowBuffer = 1000;
 // RowwiseIterator such that all returned rows are valid.
 class MergeIterState : public boost::intrusive::list_base_hook<> {
  public:
-  explicit MergeIterState(unique_ptr<RowwiseIterator> iter) :
-      iter_(std::move(iter)),
-      arena_(1024),
-      read_block_(&iter_->schema(), kMergeRowBuffer, &arena_),
-      next_row_idx_(0),
-      rows_advanced_(0),
-      rows_valid_(0)
+  explicit MergeIterState(IterWithBounds iwb)
+      : iwb_(std::move(iwb)),
+        arena_(1024),
+        read_block_(&schema(), kMergeRowBuffer, &arena_),
+        next_row_idx_(0),
+        rows_advanced_(0),
+        rows_valid_(0)
   {}
 
   // Fetch the next row from the iterator. Does not advance the iterator.
@@ -128,7 +128,7 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
 
   // Returns true if the underlying iterator is fully exhausted.
   bool IsFullyExhausted() const {
-    return rows_valid_ == 0 && !iter_->HasNext();
+    return rows_valid_ == 0 && !iwb_.iter->HasNext();
   }
 
   // Advance to the next row in the underlying iterator.
@@ -136,7 +136,7 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
 
   // Add statistics about the underlying iterator to the given vector.
   void AddStats(vector<IteratorStats>* stats) const {
-    AddIterStats(*iter_, stats);
+    AddIterStats(*iwb_.iter, stats);
   }
 
   // Returns the number of valid rows remaining in the current block.
@@ -146,7 +146,7 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
 
   // Returns the schema from the underlying iterator.
   const Schema& schema() const {
-    return iter_->schema();
+    return iwb_.iter->schema();
   }
 
  private:
@@ -158,8 +158,11 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
     return rows_advanced_ == rows_valid_;
   }
 
-  // Underlying iterator.
-  unique_ptr<RowwiseIterator> iter_;
+  // The iterator (and optional bounds) whose rows are to be merged with other
+  // iterators.
+  //
+  // Must already be Init'ed at MergeIterState construction time.
+  IterWithBounds iwb_;
 
   // Allocates memory for read_block_.
   Arena arena_;
@@ -207,8 +210,8 @@ Status MergeIterState::PullNextBlock() {
   CHECK_EQ(rows_advanced_, rows_valid_)
       << "should not pull next block until current block is exhausted";
 
-  while (iter_->HasNext()) {
-    RETURN_NOT_OK(iter_->NextBlock(&read_block_));
+  while (iwb_.iter->HasNext()) {
+    RETURN_NOT_OK(iwb_.iter->NextBlock(&read_block_));
     rows_advanced_ = 0;
     // Honor the selection vector of the read_block_, since not all rows are necessarily selected.
     SelectionVector *selection = read_block_.selection_vector();
@@ -246,7 +249,7 @@ class MergeIterator : public RowwiseIterator {
   //
   // Note: the iterators must be constructed using a projection that includes
   // all key columns; otherwise a CHECK will fire at initialization time.
-  MergeIterator(MergeIteratorOptions opts, vector<unique_ptr<RowwiseIterator>> iters);
+  MergeIterator(MergeIteratorOptions opts, vector<IterWithBounds> iters);
 
   virtual ~MergeIterator();
 
@@ -283,7 +286,7 @@ class MergeIterator : public RowwiseIterator {
 
   // Holds the subiterators until Init is called, at which point this is cleared.
   // This is required because we can't create a MergeIterState of an uninitialized iterator.
-  vector<unique_ptr<RowwiseIterator>> orig_iters_;
+  vector<IterWithBounds> orig_iters_;
 
   // See UnionIterator::iters_lock_ for details on locking. This follows the same
   // pattern.
@@ -308,7 +311,7 @@ class MergeIterator : public RowwiseIterator {
 };
 
 MergeIterator::MergeIterator(MergeIteratorOptions opts,
-                             vector<unique_ptr<RowwiseIterator>> iters)
+                             vector<IterWithBounds> iters)
     : opts_(opts),
       initted_(false),
       orig_iters_(std::move(iters)),
@@ -382,7 +385,7 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) {
   // Initialize all the sub iterators.
   for (auto& i : orig_iters_) {
     ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
-    RETURN_NOT_OK(InitAndMaybeWrap(&i, spec_copy));
+    RETURN_NOT_OK(InitAndMaybeWrap(&i.iter, spec_copy));
     unique_ptr<MergeIterState> state(new MergeIterState(std::move(i)));
     states_.push_back(*state.release());
   }
@@ -553,7 +556,7 @@ void MergeIterator::GetIteratorStats(vector<IteratorStats>* stats) const {
 }
 
 unique_ptr<RowwiseIterator> NewMergeIterator(
-    MergeIteratorOptions opts, vector<unique_ptr<RowwiseIterator>> iters) {
+    MergeIteratorOptions opts, vector<IterWithBounds> iters) {
   return unique_ptr<RowwiseIterator>(new MergeIterator(opts, std::move(iters)));
 }
 
@@ -577,7 +580,7 @@ class UnionIterator : public RowwiseIterator {
   // Constructs a UnionIterator of the given iterators.
   //
   // The iterators must have matching schemas and should not yet be initialized.
-  explicit UnionIterator(vector<unique_ptr<RowwiseIterator>> iters);
+  explicit UnionIterator(vector<IterWithBounds> iters);
 
   Status Init(ScanSpec *spec) OVERRIDE;
 
@@ -619,7 +622,7 @@ class UnionIterator : public RowwiseIterator {
   // it's the only thread which might write. However, it does need to acquire in write
   // mode when changing 'iters_'.
   mutable rw_spinlock iters_lock_;
-  deque<unique_ptr<RowwiseIterator>> iters_;
+  deque<IterWithBounds> iters_;
 
   // Statistics (keyed by projection column index) accumulated so far by any
   // fully-consumed sub-iterators.
@@ -632,7 +635,7 @@ class UnionIterator : public RowwiseIterator {
   ObjectPool<ScanSpec> scan_spec_copies_;
 };
 
-UnionIterator::UnionIterator(vector<unique_ptr<RowwiseIterator>> iters)
+UnionIterator::UnionIterator(vector<IterWithBounds> iters)
   : initted_(false),
     iters_(std::make_move_iterator(iters.begin()),
            std::make_move_iterator(iters.end())) {
@@ -650,14 +653,14 @@ Status UnionIterator::Init(ScanSpec *spec) {
   // It's important to do the verification after initializing the iterators, as
   // they may not know their own schemas until they've been initialized (in the
   // case of a union of unions).
-  schema_.reset(new Schema(iters_.front()->schema()));
+  schema_.reset(new Schema(iters_.front().iter->schema()));
   finished_iter_stats_by_col_.resize(schema_->num_columns());
 #ifndef NDEBUG
   for (const auto& i : iters_) {
-    if (!i->schema().Equals(*schema_)) {
+    if (!i.iter->schema().Equals(*schema_)) {
       return Status::InvalidArgument(
           Substitute("Schemas do not match: $0 vs. $1",
-                     schema_->ToString(), i->schema().ToString()));
+                     schema_->ToString(), i.iter->schema().ToString()));
     }
   }
 #endif
@@ -670,7 +673,11 @@ Status UnionIterator::Init(ScanSpec *spec) {
 Status UnionIterator::InitSubIterators(ScanSpec *spec) {
   for (auto& i : iters_) {
     ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
-    RETURN_NOT_OK(InitAndMaybeWrap(&i, spec_copy));
+    RETURN_NOT_OK(InitAndMaybeWrap(&i.iter, spec_copy));
+
+    // The union iterator doesn't care about these, so let's drop them now to
+    // free some memory.
+    i.encoded_bounds.reset();
   }
   // Since we handle predicates in all the wrapped iterators, we can clear
   // them here.
@@ -683,7 +690,7 @@ Status UnionIterator::InitSubIterators(ScanSpec *spec) {
 bool UnionIterator::HasNext() const {
   CHECK(initted_);
   for (const auto& i : iters_) {
-    if (i->HasNext()) return true;
+    if (i.iter->HasNext()) return true;
   }
 
   return false;
@@ -701,17 +708,17 @@ void UnionIterator::PrepareBatch() {
   CHECK(initted_);
 
   while (!iters_.empty() &&
-         !iters_.front()->HasNext()) {
+         !iters_.front().iter->HasNext()) {
     PopFront();
   }
 }
 
 Status UnionIterator::MaterializeBlock(RowBlock *dst) {
-  return iters_.front()->NextBlock(dst);
+  return iters_.front().iter->NextBlock(dst);
 }
 
 void UnionIterator::FinishBatch() {
-  if (!iters_.front()->HasNext()) {
+  if (!iters_.front().iter->HasNext()) {
     // Iterator exhausted, remove it.
     PopFront();
   }
@@ -719,15 +726,15 @@ void UnionIterator::FinishBatch() {
 
 void UnionIterator::PopFront() {
   std::lock_guard<rw_spinlock> l(iters_lock_);
-  AddIterStats(*iters_.front(), &finished_iter_stats_by_col_);
+  AddIterStats(*iters_.front().iter, &finished_iter_stats_by_col_);
   iters_.pop_front();
 }
 
 string UnionIterator::ToString() const {
   string s;
   s.append("Union(");
-  s += JoinMapped(iters_, [](const unique_ptr<RowwiseIterator>& it) {
-      return it->ToString();
+  s += JoinMapped(iters_, [](const IterWithBounds& i) {
+      return i.iter->ToString();
     }, ",");
   s.append(")");
   return s;
@@ -738,12 +745,11 @@ void UnionIterator::GetIteratorStats(vector<IteratorStats>* stats) const {
   shared_lock<rw_spinlock> l(iters_lock_);
   *stats = finished_iter_stats_by_col_;
   if (!iters_.empty()) {
-    AddIterStats(*iters_.front(), stats);
+    AddIterStats(*iters_.front().iter, stats);
   }
 }
 
-unique_ptr<RowwiseIterator> NewUnionIterator(
-    vector<unique_ptr<RowwiseIterator>> iters) {
+unique_ptr<RowwiseIterator> NewUnionIterator(vector<IterWithBounds> iters) {
   return unique_ptr<RowwiseIterator>(new UnionIterator(std::move(iters)));
 }
 
diff --git a/src/kudu/common/generic_iterators.h b/src/kudu/common/generic_iterators.h
index 2c3fccf..b93a788 100644
--- a/src/kudu/common/generic_iterators.h
+++ b/src/kudu/common/generic_iterators.h
@@ -18,17 +18,29 @@
 
 #include <cstdint>
 #include <memory>
+#include <string>
+#include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
+
+#include "kudu/common/iterator.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
 class ColumnPredicate;
-class ColumnwiseIterator;
-class RowwiseIterator;
 class ScanSpec;
 
+// Encapsulates a rowwise-iterator along with the (encoded) lower and upper
+// bounds for the rowset that the iterator belongs to.
+//
+// The bounds are optional because some rowsets (e.g. MRS) don't have them.
+struct IterWithBounds {
+  std::unique_ptr<RowwiseIterator> iter;
+  boost::optional<std::pair<std::string, std::string>> encoded_bounds;
+};
+
 // Options struct for the MergeIterator.
 struct MergeIteratorOptions {
   explicit MergeIteratorOptions(bool include_deleted_rows)
@@ -45,13 +57,12 @@ struct MergeIteratorOptions {
 // The iterators must have matching schemas and should not yet be initialized.
 std::unique_ptr<RowwiseIterator> NewMergeIterator(
     MergeIteratorOptions opts,
-    std::vector<std::unique_ptr<RowwiseIterator>> iters);
+    std::vector<IterWithBounds> iters);
 
 // Constructs a UnionIterator of the given iterators.
 //
 // The iterators must have matching schemas and should not yet be initialized.
-std::unique_ptr<RowwiseIterator> NewUnionIterator(
-    std::vector<std::unique_ptr<RowwiseIterator>> iters);
+std::unique_ptr<RowwiseIterator> NewUnionIterator(std::vector<IterWithBounds> iters);
 
 // Constructs a MaterializingIterator of the given ColumnwiseIterator.
 std::unique_ptr<RowwiseIterator> NewMaterializingIterator(
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index beefc8a..78cb199 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -20,6 +20,7 @@
 #include <limits>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -48,6 +49,26 @@ RowIteratorOptions::RowIteratorOptions()
       order(OrderMode::UNORDERED),
       include_deleted_rows(false) {}
 
+Status RowSet::NewRowIteratorWithBounds(const RowIteratorOptions& opts,
+                                        IterWithBounds* out) const {
+  // Get the iterator.
+  unique_ptr<RowwiseIterator> iter;
+  RETURN_NOT_OK(NewRowIterator(opts, &iter));
+
+  // Get the bounds. Some rowsets (e.g. MRS) have no bounds; that's OK as the
+  // bounds aren't required.
+  string lower;
+  string upper;
+  Status s = GetBounds(&lower, &upper);
+  if (s.ok()) {
+    out->encoded_bounds = std::make_pair(std::move(lower), std::move(upper));
+  } else if (!s.IsNotSupported()) {
+    RETURN_NOT_OK(s);
+  }
+  out->iter = std::move(iter);
+  return Status::OK();
+}
+
 DuplicatingRowSet::DuplicatingRowSet(RowSetVector old_rowsets,
                                      RowSetVector new_rowsets)
     : old_rowsets_(std::move(old_rowsets)),
@@ -92,13 +113,13 @@ Status DuplicatingRowSet::NewRowIterator(const RowIteratorOptions& opts,
   }
   // Union or merge between them
 
-  vector<unique_ptr<RowwiseIterator>> iters;
-  for (const shared_ptr<RowSet> &rowset : old_rowsets_) {
-    unique_ptr<RowwiseIterator> iter;
-    RETURN_NOT_OK_PREPEND(rowset->NewRowIterator(opts, &iter),
+  vector<IterWithBounds> iters;
+  for (const auto& rs : old_rowsets_) {
+    IterWithBounds iwb;
+    RETURN_NOT_OK_PREPEND(rs->NewRowIteratorWithBounds(opts, &iwb),
                           Substitute("Could not create iterator for rowset $0",
-                                     rowset->ToString()));
-    iters.emplace_back(std::move(iter));
+                                     rs->ToString()));
+    iters.emplace_back(std::move(iwb));
   }
 
   switch (opts.order) {
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index ed99122..baecb4a 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -49,6 +49,7 @@ class RowwiseIterator;
 class Schema;
 class Slice;
 struct ColumnId;
+struct IterWithBounds;
 
 namespace consensus {
 class OpId;
@@ -134,7 +135,7 @@ class RowSet {
                            ProbeStats* stats,
                            OperationResultPB* result) = 0;
 
-  // Return a new RowIterator for this rowset, with the given options.
+  // Return a new iterator for this rowset, with the given options.
   //
   // Pointers in 'opts' must remain valid for the lifetime of the iterator.
   //
@@ -145,6 +146,11 @@ class RowSet {
   virtual Status NewRowIterator(const RowIteratorOptions& opts,
                                 std::unique_ptr<RowwiseIterator>* out) const = 0;
 
+  // Like NewRowIterator, but returns the rowset's bounds (if they exist) along
+  // with the iterator.
+  Status NewRowIteratorWithBounds(const RowIteratorOptions& opts,
+                                  IterWithBounds* out) const;
+
   // Create the input to be used for a compaction.
   //
   // The provided 'projection' is for the compaction output. Each row
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index b63e5c2..7627ed4 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1834,20 +1834,22 @@ Status Tablet::DebugDump(vector<string> *lines) {
 Status Tablet::CaptureConsistentIterators(
     const RowIteratorOptions& opts,
     const ScanSpec* spec,
-    vector<unique_ptr<RowwiseIterator>>* iters) const {
+    vector<IterWithBounds>* iters) const {
 
   shared_lock<rw_spinlock> l(component_lock_);
   RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
 
   // Construct all the iterators locally first, so that if we fail
   // in the middle, we don't modify the output arguments.
-  vector<unique_ptr<RowwiseIterator>> ret;
+  vector<IterWithBounds> ret;
 
 
   // Grab the memrowset iterator.
   unique_ptr<RowwiseIterator> ms_iter;
   RETURN_NOT_OK(components_->memrowset->NewRowIterator(opts, &ms_iter));
-  ret.emplace_back(ms_iter.release());
+  IterWithBounds mrs_iwb;
+  mrs_iwb.iter = std::move(ms_iter);
+  ret.emplace_back(std::move(mrs_iwb));
 
 
   // Cull row-sets in the case of key-range queries.
@@ -1858,12 +1860,12 @@ Status Tablet::CaptureConsistentIterators(
         boost::optional<Slice>(spec->exclusive_upper_bound_key()->encoded_key()) : boost::none;
     vector<RowSet*> interval_sets;
     components_->rowsets->FindRowSetsIntersectingInterval(lower_bound, upper_bound, &interval_sets);
-    for (const RowSet *rs : interval_sets) {
-      unique_ptr<RowwiseIterator> row_it;
-      RETURN_NOT_OK_PREPEND(rs->NewRowIterator(opts, &row_it),
+    for (const auto* rs : interval_sets) {
+      IterWithBounds iwb;
+      RETURN_NOT_OK_PREPEND(rs->NewRowIteratorWithBounds(opts, &iwb),
                             Substitute("Could not create iterator for rowset $0",
                                        rs->ToString()));
-      ret.emplace_back(std::move(row_it));
+      ret.emplace_back(std::move(iwb));
     }
     *iters = std::move(ret);
     return Status::OK();
@@ -1871,12 +1873,12 @@ Status Tablet::CaptureConsistentIterators(
 
   // If there are no encoded predicates of the primary keys, then
   // fall back to grabbing all rowset iterators.
-  for (const shared_ptr<RowSet> &rs : components_->rowsets->all_rowsets()) {
-    unique_ptr<RowwiseIterator> row_it;
-    RETURN_NOT_OK_PREPEND(rs->NewRowIterator(opts, &row_it),
+  for (const shared_ptr<RowSet>& rs : components_->rowsets->all_rowsets()) {
+    IterWithBounds iwb;
+    RETURN_NOT_OK_PREPEND(rs->NewRowIteratorWithBounds(opts, &iwb),
                           Substitute("Could not create iterator for rowset $0",
                                      rs->ToString()));
-    ret.emplace_back(std::move(row_it));
+    ret.emplace_back(std::move(iwb));
   }
 
   // Swap results into the parameters.
@@ -2462,7 +2464,7 @@ Status Tablet::Iterator::Init(ScanSpec *spec) {
 
   RETURN_NOT_OK(tablet_->GetMappedReadProjection(projection_, &projection_));
 
-  vector<unique_ptr<RowwiseIterator>> iters;
+  vector<IterWithBounds> iters;
   RETURN_NOT_OK(tablet_->CaptureConsistentIterators(opts_, spec, &iters));
   TRACE_COUNTER_INCREMENT("rowset_iterators", iters.size());
 
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 62f7cb4..87448f5 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -66,6 +66,7 @@ class RowBlock;
 class ScanSpec;
 class Throttler;
 class Timestamp;
+struct IterWithBounds;
 struct IteratorStats;
 
 namespace log {
@@ -583,7 +584,7 @@ class Tablet {
   // lifetime of the returned iterators.
   Status CaptureConsistentIterators(const RowIteratorOptions& opts,
                                     const ScanSpec* spec,
-                                    std::vector<std::unique_ptr<RowwiseIterator>>* iters) const;
+                                    std::vector<IterWithBounds>* iters) const;
 
   Status PickRowSetsToCompact(RowSetsInCompaction *picked,
                               CompactFlags flags) const;