You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/01/25 05:15:00 UTC

[kudu] branch master updated (33d3878 -> 5629cc7)

This is an automated email from the ASF dual-hosted git repository.

adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 33d3878  switch all iterators to unique_ptr
     new 432c3a1  generic_iterators: move iterator declarations into cc file
     new 5629cc7  generic_iterators: prep for MergeIterator dominance

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/common/generic_iterators-test.cc | 110 ++++++-----
 src/kudu/common/generic_iterators.cc      | 306 +++++++++++++++++++++++++++---
 src/kudu/common/generic_iterators.h       | 273 ++++----------------------
 src/kudu/tablet/cfile_set-test.cc         |   8 +-
 src/kudu/tablet/compaction.cc             |   2 +-
 src/kudu/tablet/delta_compaction.cc       |   4 +-
 src/kudu/tablet/diskrowset.cc             |   2 +-
 src/kudu/tablet/rowset.cc                 |   4 +-
 src/kudu/tablet/tablet.cc                 |   4 +-
 9 files changed, 392 insertions(+), 321 deletions(-)


[kudu] 01/02: generic_iterators: move iterator declarations into cc file

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 432c3a1f1fc403b4869a57d92dd91ddffa4712da
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Thu Jan 10 16:35:53 2019 -0800

    generic_iterators: move iterator declarations into cc file
    
    I plan on converting MergeIterator's state vector into an intrusive list,
    which means MergeIterState's class definition needs to be visible to
    MergeIterator's class definition.
    
    Instead of moving MergeIterState into the header, let's go the other way and
    move all of the iterator declarations into the .cc file. The code outside of
    these classes doesn't care about the concrete types, and it should speed up
    compilation a bit.
    
    I had to poke a hole in for some PredicateEvaluatingIterator tests though.
    
    Change-Id: I301f39ec0d55b73cadcf28d8104accca3219ab1b
    Reviewed-on: http://gerrit.cloudera.org:8080/12223
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/generic_iterators-test.cc |  84 ++++-----
 src/kudu/common/generic_iterators.cc      | 283 +++++++++++++++++++++++++++---
 src/kudu/common/generic_iterators.h       | 270 ++++------------------------
 src/kudu/tablet/cfile_set-test.cc         |   8 +-
 src/kudu/tablet/compaction.cc             |   2 +-
 src/kudu/tablet/delta_compaction.cc       |   4 +-
 src/kudu/tablet/diskrowset.cc             |   2 +-
 src/kudu/tablet/rowset.cc                 |   4 +-
 src/kudu/tablet/tablet.cc                 |   4 +-
 9 files changed, 345 insertions(+), 316 deletions(-)

diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 4561dad..b9228f0 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -27,6 +27,7 @@
 #include <vector>
 
 #include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
@@ -40,7 +41,6 @@
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
-#include "kudu/gutil/casts.h"
 #include "kudu/gutil/mathlimits.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/memory/arena.h"
@@ -53,6 +53,7 @@
 DEFINE_int32(num_lists, 3, "Number of lists to merge");
 DEFINE_int32(num_rows, 1000, "Number of entries per list");
 DEFINE_int32(num_iters, 1, "Number of times to run merge");
+DECLARE_bool(materializing_iterator_do_pushdown);
 
 using std::string;
 using std::unique_ptr;
@@ -161,13 +162,13 @@ class VectorIterator : public ColumnwiseIterator {
 // Test that empty input to a merger behaves correctly.
 TEST(TestMergeIterator, TestMergeEmpty) {
   unique_ptr<RowwiseIterator> iter(
-      new MaterializingIterator(
-          unique_ptr<ColumnwiseIterator>(new VectorIterator({}))));
+    NewMaterializingIterator(
+        unique_ptr<ColumnwiseIterator>(new VectorIterator({}))));
   vector<unique_ptr<RowwiseIterator>> input;
   input.emplace_back(std::move(iter));
-  MergeIterator merger(std::move(input));
-  ASSERT_OK(merger.Init(nullptr));
-  ASSERT_FALSE(merger.HasNext());
+  unique_ptr<RowwiseIterator> merger(NewMergeIterator(std::move(input)));
+  ASSERT_OK(merger->Init(nullptr));
+  ASSERT_FALSE(merger->HasNext());
 }
 
 // Test that non-empty input to a merger with a zeroed selection vector
@@ -177,12 +178,12 @@ TEST(TestMergeIterator, TestMergeEmptyViaSelectionVector) {
   sv.SetAllFalse();
   unique_ptr<VectorIterator> vec(new VectorIterator({ 1, 2, 3 }));
   vec->set_selection_vector(&sv);
-  unique_ptr<RowwiseIterator> iter(new MaterializingIterator(std::move(vec)));
+  unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(vec)));
   vector<unique_ptr<RowwiseIterator>> input;
   input.emplace_back(std::move(iter));
-  MergeIterator merger(std::move(input));
-  ASSERT_OK(merger.Init(nullptr));
-  ASSERT_FALSE(merger.HasNext());
+  unique_ptr<RowwiseIterator> merger(NewMergeIterator(std::move(input)));
+  ASSERT_OK(merger->Init(nullptr));
+  ASSERT_FALSE(merger->HasNext());
 }
 
 class TestIntRangePredicate {
@@ -249,10 +250,10 @@ void TestMerge(const TestIntRangePredicate &predicate) {
       unique_ptr<VectorIterator> vec_it(new VectorIterator(e.ints));
       vec_it->set_block_size(10);
       vec_it->set_selection_vector(e.sv.get());
-      unique_ptr<RowwiseIterator> mat_it(new MaterializingIterator(std::move(vec_it)));
-      vector<unique_ptr<RowwiseIterator>> to_union;
-      to_union.emplace_back(std::move(mat_it));
-      unique_ptr<RowwiseIterator> un_it(new UnionIterator(std::move(to_union)));
+      unique_ptr<RowwiseIterator> mat_it(NewMaterializingIterator(std::move(vec_it)));
+      vector<unique_ptr<RowwiseIterator>> input;
+      input.emplace_back(std::move(mat_it));
+      unique_ptr<RowwiseIterator> un_it(NewUnionIterator(std::move(input)));
       to_merge.emplace_back(std::move(un_it));
     }
 
@@ -262,13 +263,13 @@ void TestMerge(const TestIntRangePredicate &predicate) {
     LOG(INFO) << "Predicate: " << predicate.pred_.ToString();
 
     LOG_TIMING(INFO, "Iterate merged lists") {
-      MergeIterator merger(std::move(to_merge));
-      ASSERT_OK(merger.Init(&spec));
+      unique_ptr<RowwiseIterator> merger(NewMergeIterator({ std::move(to_merge) }));
+      ASSERT_OK(merger->Init(&spec));
 
       RowBlock dst(kIntSchema, 100, nullptr);
       size_t total_idx = 0;
-      while (merger.HasNext()) {
-        ASSERT_OK(merger.NextBlock(&dst));
+      while (merger->HasNext()) {
+        ASSERT_OK(merger->NextBlock(&dst));
         ASSERT_GT(dst.nrows(), 0) <<
           "if HasNext() returns true, must return some rows";
 
@@ -321,13 +322,13 @@ TEST(TestMaterializingIterator, TestMaterializingPredicatePushdown) {
   }
 
   unique_ptr<VectorIterator> colwise(new VectorIterator(std::move(ints)));
-  MaterializingIterator materializing(std::move(colwise));
-  ASSERT_OK(materializing.Init(&spec));
+  unique_ptr<RowwiseIterator> materializing(NewMaterializingIterator(std::move(colwise)));
+  ASSERT_OK(materializing->Init(&spec));
   ASSERT_EQ(0, spec.predicates().size()) << "Iterator should have pushed down predicate";
 
   Arena arena(1024);
   RowBlock dst(kIntSchema, 100, &arena);
-  ASSERT_OK(materializing.NextBlock(&dst));
+  ASSERT_OK(materializing->NextBlock(&dst));
   ASSERT_EQ(dst.nrows(), 100);
 
   // Check that the resulting selection vector is correct (rows 20-29 selected)
@@ -354,25 +355,22 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) {
   // Set up a MaterializingIterator with pushdown disabled, so that the
   // PredicateEvaluatingIterator will wrap it and do evaluation.
   unique_ptr<VectorIterator> colwise(new VectorIterator(std::move(ints)));
-  unique_ptr<MaterializingIterator> materializing(
-      new MaterializingIterator(std::move(colwise)));
-  materializing->disallow_pushdown_for_tests_ = true;
+  FLAGS_materializing_iterator_do_pushdown = false;
+  unique_ptr<RowwiseIterator> materializing(
+      NewMaterializingIterator(std::move(colwise)));
 
   // Wrap it in another iterator to do the evaluation
-  const MaterializingIterator* mat_iter_addr = materializing.get();
+  const RowwiseIterator* mat_iter_addr = materializing.get();
   unique_ptr<RowwiseIterator> outer_iter(std::move(materializing));
-  ASSERT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&outer_iter, &spec));
+  ASSERT_OK(InitAndMaybeWrap(&outer_iter, &spec));
 
   ASSERT_NE(reinterpret_cast<uintptr_t>(outer_iter.get()),
             reinterpret_cast<uintptr_t>(mat_iter_addr))
     << "Iterator pointer should differ after wrapping";
 
-  PredicateEvaluatingIterator *pred_eval = down_cast<PredicateEvaluatingIterator *>(
-    outer_iter.get());
-
   ASSERT_EQ(0, spec.predicates().size())
     << "Iterator tree should have accepted predicate";
-  ASSERT_EQ(1, pred_eval->col_predicates_.size())
+  ASSERT_EQ(1, GetIteratorPredicatesForTests(outer_iter).size())
     << "Predicate should be evaluated by the outer iterator";
 
   Arena arena(1024);
@@ -393,13 +391,12 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) {
 TEST(TestPredicateEvaluatingIterator, TestDontWrapWhenNoPredicates) {
   ScanSpec spec;
 
-  vector<uint32_t> ints;
-  unique_ptr<VectorIterator> colwise(new VectorIterator(std::move(ints)));
-  unique_ptr<MaterializingIterator> materializing(
-      new MaterializingIterator(std::move(colwise)));
-  const MaterializingIterator* mat_iter_addr = materializing.get();
+  unique_ptr<VectorIterator> colwise(new VectorIterator({}));
+  unique_ptr<RowwiseIterator> materializing(
+      NewMaterializingIterator(std::move(colwise)));
+  const RowwiseIterator* mat_iter_addr = materializing.get();
   unique_ptr<RowwiseIterator> outer_iter(std::move(materializing));
-  ASSERT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&outer_iter, &spec));
+  ASSERT_OK(InitAndMaybeWrap(&outer_iter, &spec));
   ASSERT_EQ(reinterpret_cast<uintptr_t>(outer_iter.get()),
             reinterpret_cast<uintptr_t>(mat_iter_addr))
       << "InitAndMaybeWrap should not have wrapped iter";
@@ -462,11 +459,9 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluationOrder) {
     spec.AddPredicate(c_equality);
 
     unique_ptr<RowwiseIterator> iter(new DummyIterator(schema));
-    ASSERT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&iter, &spec));
-
-    PredicateEvaluatingIterator* pred_eval = down_cast<PredicateEvaluatingIterator*>(iter.get());
-    ASSERT_TRUE(pred_eval->col_predicates_ ==
-                vector<ColumnPredicate>({ c_equality, b_equality, a_range }));
+    ASSERT_OK(InitAndMaybeWrap(&iter, &spec));
+    ASSERT_EQ(GetIteratorPredicatesForTests(iter),
+              vector<ColumnPredicate>({ c_equality, b_equality, a_range }));
   }
 
   { // Test that smaller columns come before larger ones, and ties are broken by idx.
@@ -476,11 +471,10 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluationOrder) {
     spec.AddPredicate(c_equality);
 
     unique_ptr<RowwiseIterator> iter(new DummyIterator(schema));
-    ASSERT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&iter, &spec));
+    ASSERT_OK(InitAndMaybeWrap(&iter, &spec));
 
-    PredicateEvaluatingIterator* pred_eval = down_cast<PredicateEvaluatingIterator*>(iter.get());
-    ASSERT_TRUE(pred_eval->col_predicates_ ==
-                vector<ColumnPredicate>({ c_equality, a_equality, b_equality }));
+    ASSERT_EQ(GetIteratorPredicatesForTests(iter),
+              vector<ColumnPredicate>({ c_equality, a_equality, b_equality }));
   }
 }
 
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index 27c0515..7dd3378 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -20,10 +20,13 @@
 #include <unistd.h>
 
 #include <algorithm>
+#include <cstddef>
 #include <cstdint>
+#include <deque>
 #include <iterator>
 #include <memory>
 #include <mutex>
+#include <ostream>
 #include <string>
 #include <unordered_map>
 #include <utility>
@@ -34,18 +37,26 @@
 #include "kudu/common/column_materialization_context.h"
 #include "kudu/common/column_predicate.h"
 #include "kudu/common/columnblock.h"
+#include "kudu/common/iterator.h"
 #include "kudu/common/iterator_stats.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/scan_spec.h"
+#include "kudu/common/schema.h"
+#include "kudu/gutil/casts.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/object_pool.h"
 
+using std::deque;
 using std::get;
-using std::move;
+using std::make_shared;
 using std::pair;
 using std::remove_if;
 using std::shared_ptr;
@@ -77,7 +88,7 @@ void AddIterStats(const RowwiseIterator& iter,
 } // anonymous namespace
 
 ////////////////////////////////////////////////////////////
-// Merge iterator
+// MergeIterator
 ////////////////////////////////////////////////////////////
 
 // TODO(todd): this should be sized by # bytes, not # rows.
@@ -211,9 +222,72 @@ Status MergeIterState::PullNextBlock() {
   return Status::OK();
 }
 
+// An iterator which merges the results of other iterators, comparing
+// based on keys.
+//
+// Note: the sub-iterators must be constructed using a projection that includes
+// key columns; otherwise the merge algorithm's key comparisons won't work.
+class MergeIterator : public RowwiseIterator {
+ public:
+  // Constructs a MergeIterator of the given iterators.
+  //
+  // The iterators must have matching schemas and should not yet be initialized.
+  //
+  // Note: the iterators must be constructed using a projection that includes
+  // all key columns; otherwise a CHECK will fire at initialization time.
+  explicit MergeIterator(vector<unique_ptr<RowwiseIterator>> iters);
+
+  virtual ~MergeIterator();
+
+  // The passed-in iterators should be already initialized.
+  Status Init(ScanSpec *spec) OVERRIDE;
+
+  virtual bool HasNext() const OVERRIDE;
+
+  virtual string ToString() const OVERRIDE;
+
+  virtual const Schema& schema() const OVERRIDE;
+
+  virtual void GetIteratorStats(vector<IteratorStats>* stats) const OVERRIDE;
+
+  virtual Status NextBlock(RowBlock* dst) OVERRIDE;
+
+ private:
+  void PrepareBatch(RowBlock* dst);
+  Status MaterializeBlock(RowBlock* dst);
+  Status InitSubIterators(ScanSpec *spec);
+
+  // Initialized during Init.
+  unique_ptr<Schema> schema_;
+
+  bool initted_;
+
+  // Holds the subiterators until Init is called, at which point this is cleared.
+  // This is required because we can't create a MergeIterState of an uninitialized iterator.
+  vector<unique_ptr<RowwiseIterator>> orig_iters_;
+
+  // See UnionIterator::states_lock_ for details on locking. This follows the same
+  // pattern.
+  mutable rw_spinlock states_lock_;
+  vector<unique_ptr<MergeIterState>> states_;
+
+  // Statistics (keyed by projection column index) accumulated so far by any
+  // fully-consumed sub-iterators.
+  vector<IteratorStats> finished_iter_stats_by_col_;
+
+  // The number of iterators, used by ToString().
+  const int num_orig_iters_;
+
+  // When the underlying iterators are initialized, each needs its own
+  // copy of the scan spec in order to do its own pushdown calculations, etc.
+  // The copies are allocated from this pool so they can be automatically freed
+  // when the UnionIterator goes out of scope.
+  ObjectPool<ScanSpec> scan_spec_copies_;
+};
+
 MergeIterator::MergeIterator(vector<unique_ptr<RowwiseIterator>> iters)
     : initted_(false),
-      orig_iters_(move(iters)),
+      orig_iters_(std::move(iters)),
       num_orig_iters_(orig_iters_.size()) {
   CHECK_GT(orig_iters_.size(), 0);
 }
@@ -277,7 +351,7 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) {
   // Initialize all the sub iterators.
   for (auto& i : orig_iters_) {
     ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
-    RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&i, spec_copy));
+    RETURN_NOT_OK(InitAndMaybeWrap(&i, spec_copy));
     states_.push_back(unique_ptr<MergeIterState>(new MergeIterState(std::move(i))));
   }
   orig_iters_.clear();
@@ -377,11 +451,80 @@ void MergeIterator::GetIteratorStats(vector<IteratorStats>* stats) const {
   }
 }
 
+unique_ptr<RowwiseIterator> NewMergeIterator(
+    vector<unique_ptr<RowwiseIterator>> iters) {
+  return unique_ptr<RowwiseIterator>(new MergeIterator(std::move(iters)));
+}
 
 ////////////////////////////////////////////////////////////
-// Union iterator
+// UnionIterator
 ////////////////////////////////////////////////////////////
 
+// An iterator which unions the results of other iterators.
+// This is different from MergeIterator in that it lays the results out end-to-end
+// rather than merging them based on keys. Hence it is more efficient since there is
+// no comparison needed, and the key column does not need to be read if it is not
+// part of the projection.
+class UnionIterator : public RowwiseIterator {
+ public:
+  // Constructs a UnionIterator of the given iterators.
+  //
+  // The iterators must have matching schemas and should not yet be initialized.
+  explicit UnionIterator(vector<unique_ptr<RowwiseIterator>> iters);
+
+  Status Init(ScanSpec *spec) OVERRIDE;
+
+  bool HasNext() const OVERRIDE;
+
+  string ToString() const OVERRIDE;
+
+  const Schema &schema() const OVERRIDE {
+    CHECK(initted_);
+    CHECK(schema_.get() != NULL) << "Bad schema in " << ToString();
+    return *CHECK_NOTNULL(schema_.get());
+  }
+
+  virtual void GetIteratorStats(vector<IteratorStats>* stats) const OVERRIDE;
+
+  virtual Status NextBlock(RowBlock* dst) OVERRIDE;
+
+ private:
+  void PrepareBatch();
+  Status MaterializeBlock(RowBlock* dst);
+  void FinishBatch();
+  Status InitSubIterators(ScanSpec *spec);
+
+  // Pop the front iterator from iters_ and accumulate its statistics into
+  // finished_iter_stats_by_col_.
+  void PopFront();
+
+  // Schema: initialized during Init()
+  unique_ptr<Schema> schema_;
+
+  bool initted_;
+
+  // Lock protecting 'iters_' and 'finished_iter_stats_by_col_'.
+  //
+  // Scanners are mostly accessed by the thread doing the scanning, but the HTTP endpoint
+  // which lists running scans may occasionally need to read as well.
+  //
+  // The "owner" thread of the scanner doesn't need to acquire this in read mode, since
+  // it's the only thread which might write. However, it does need to acquire in write
+  // mode when changing 'iters_'.
+  mutable rw_spinlock iters_lock_;
+  deque<unique_ptr<RowwiseIterator>> iters_;
+
+  // Statistics (keyed by projection column index) accumulated so far by any
+  // fully-consumed sub-iterators.
+  vector<IteratorStats> finished_iter_stats_by_col_;
+
+  // When the underlying iterators are initialized, each needs its own
+  // copy of the scan spec in order to do its own pushdown calculations, etc.
+  // The copies are allocated from this pool so they can be automatically freed
+  // when the UnionIterator goes out of scope.
+  ObjectPool<ScanSpec> scan_spec_copies_;
+};
+
 UnionIterator::UnionIterator(vector<unique_ptr<RowwiseIterator>> iters)
   : initted_(false),
     iters_(std::make_move_iterator(iters.begin()),
@@ -420,7 +563,7 @@ Status UnionIterator::Init(ScanSpec *spec) {
 Status UnionIterator::InitSubIterators(ScanSpec *spec) {
   for (auto& i : iters_) {
     ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
-    RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&i, spec_copy));
+    RETURN_NOT_OK(InitAndMaybeWrap(&i, spec_copy));
   }
   // Since we handle predicates in all the wrapped iterators, we can clear
   // them here.
@@ -492,12 +635,61 @@ void UnionIterator::GetIteratorStats(vector<IteratorStats>* stats) const {
   }
 }
 
+unique_ptr<RowwiseIterator> NewUnionIterator(
+    vector<unique_ptr<RowwiseIterator>> iters) {
+  return unique_ptr<RowwiseIterator>(new UnionIterator(std::move(iters)));
+}
+
 ////////////////////////////////////////////////////////////
-// Materializing iterator
+// MaterializingIterator
 ////////////////////////////////////////////////////////////
 
+// An iterator which wraps a ColumnwiseIterator, materializing it into full rows.
+//
+// Column predicates are pushed down into this iterator. While materializing a
+// block, columns with associated predicates are materialized first, and the
+// predicates evaluated. If the predicates succeed in filtering out an entire
+// batch, then other columns may avoid doing any IO.
+class MaterializingIterator : public RowwiseIterator {
+ public:
+  explicit MaterializingIterator(unique_ptr<ColumnwiseIterator> iter);
+
+  // Initialize the iterator, performing predicate pushdown as described above.
+  Status Init(ScanSpec *spec) OVERRIDE;
+
+  bool HasNext() const OVERRIDE;
+
+  string ToString() const OVERRIDE;
+
+  const Schema &schema() const OVERRIDE {
+    return iter_->schema();
+  }
+
+  virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE {
+    iter_->GetIteratorStats(stats);
+  }
+
+  virtual Status NextBlock(RowBlock* dst) OVERRIDE;
+
+ private:
+  Status MaterializeBlock(RowBlock *dst);
+
+  unique_ptr<ColumnwiseIterator> iter_;
+
+  // List of (column index, predicate) in order of most to least selective, with
+  // ties broken by the index.
+  vector<std::pair<int32_t, ColumnPredicate>> col_idx_predicates_;
+
+  // List of column indexes without predicates to materialize.
+  vector<int32_t> non_predicate_column_indexes_;
+
+  // Set only by test code to disallow pushdown.
+  bool disallow_pushdown_for_tests_;
+  bool disallow_decoder_eval_;
+};
+
 MaterializingIterator::MaterializingIterator(unique_ptr<ColumnwiseIterator> iter)
-    : iter_(move(iter)),
+    : iter_(std::move(iter)),
       disallow_pushdown_for_tests_(!FLAGS_materializing_iterator_do_pushdown),
       disallow_decoder_eval_(!FLAGS_materializing_iterator_decoder_eval) {
 }
@@ -627,26 +819,55 @@ string MaterializingIterator::ToString() const {
   return s;
 }
 
+unique_ptr<RowwiseIterator> NewMaterializingIterator(
+    unique_ptr<ColumnwiseIterator> iter) {
+  return unique_ptr<RowwiseIterator>(
+      new MaterializingIterator(std::move(iter)));
+}
+
 ////////////////////////////////////////////////////////////
 // PredicateEvaluatingIterator
 ////////////////////////////////////////////////////////////
 
-PredicateEvaluatingIterator::PredicateEvaluatingIterator(unique_ptr<RowwiseIterator> base_iter)
-    : base_iter_(move(base_iter)) {
-}
+// An iterator which wraps another iterator and evaluates any predicates that the
+// wrapped iterator did not itself handle during push down.
+class PredicateEvaluatingIterator : public RowwiseIterator {
+ public:
+  // Construct the evaluating iterator.
+  // This is only called from ::InitAndMaybeWrap()
+  // REQUIRES: base_iter is already Init()ed.
+  explicit PredicateEvaluatingIterator(unique_ptr<RowwiseIterator> base_iter);
 
-Status PredicateEvaluatingIterator::InitAndMaybeWrap(
-  unique_ptr<RowwiseIterator> *base_iter, ScanSpec *spec) {
-  RETURN_NOT_OK((*base_iter)->Init(spec));
+  // Initialize the iterator.
+  // POSTCONDITION: spec->predicates().empty()
+  Status Init(ScanSpec *spec) OVERRIDE;
 
-  if (spec != nullptr && !spec->predicates().empty()) {
-    // Underlying iterator did not accept all predicates. Wrap it.
-    unique_ptr<RowwiseIterator> wrapper(
-        new PredicateEvaluatingIterator(std::move(*base_iter)));
-    CHECK_OK(wrapper->Init(spec));
-    *base_iter = std::move(wrapper);
+  virtual Status NextBlock(RowBlock *dst) OVERRIDE;
+
+  bool HasNext() const OVERRIDE;
+
+  string ToString() const OVERRIDE;
+
+  const Schema &schema() const OVERRIDE {
+    return base_iter_->schema();
   }
-  return Status::OK();
+
+  virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE {
+    base_iter_->GetIteratorStats(stats);
+  }
+
+  const vector<ColumnPredicate>& col_predicates() const { return col_predicates_; }
+
+ private:
+  unique_ptr<RowwiseIterator> base_iter_;
+
+  // List of predicates in order of most to least selective, with
+  // ties broken by the column index.
+  vector<ColumnPredicate> col_predicates_;
+};
+
+PredicateEvaluatingIterator::PredicateEvaluatingIterator(unique_ptr<RowwiseIterator> base_iter)
+    : base_iter_(std::move(base_iter)) {
 }
 
 Status PredicateEvaluatingIterator::Init(ScanSpec *spec) {
@@ -703,4 +924,24 @@ string PredicateEvaluatingIterator::ToString() const {
   return Substitute("PredicateEvaluating($0)", base_iter_->ToString());
 }
 
+Status InitAndMaybeWrap(unique_ptr<RowwiseIterator>* base_iter,
+                        ScanSpec *spec) {
+  RETURN_NOT_OK((*base_iter)->Init(spec));
+
+  if (spec != nullptr && !spec->predicates().empty()) {
+    // Underlying iterator did not accept all predicates. Wrap it.
+    unique_ptr<RowwiseIterator> wrapper(new PredicateEvaluatingIterator(std::move(*base_iter)));
+    RETURN_NOT_OK(wrapper->Init(spec));
+    *base_iter = std::move(wrapper);
+  }
+  return Status::OK();
+}
+
+const vector<ColumnPredicate>& GetIteratorPredicatesForTests(
+    const unique_ptr<RowwiseIterator>& iter) {
+  PredicateEvaluatingIterator* pred_eval =
+      down_cast<PredicateEvaluatingIterator*>(iter.get());
+  return pred_eval->col_predicates();
+}
+
 } // namespace kudu
diff --git a/src/kudu/common/generic_iterators.h b/src/kudu/common/generic_iterators.h
index 5edb0ee..b4b466e 100644
--- a/src/kudu/common/generic_iterators.h
+++ b/src/kudu/common/generic_iterators.h
@@ -14,257 +14,51 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_COMMON_MERGE_ITERATOR_H
-#define KUDU_COMMON_MERGE_ITERATOR_H
+#pragma once
 
-#include <cstddef>
-#include <cstdint>
-#include <deque>
 #include <memory>
-#include <ostream>
-#include <string>
-#include <utility>
 #include <vector>
 
-#include <glog/logging.h>
-#include <gtest/gtest_prod.h>
-
-#include "kudu/common/column_predicate.h"
-#include "kudu/common/iterator.h"
-#include "kudu/common/iterator_stats.h"
-#include "kudu/common/scan_spec.h"
-#include "kudu/common/schema.h"
-#include "kudu/gutil/port.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/object_pool.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
-class MergeIterState;
-class RowBlock;
-
-// An iterator which merges the results of other iterators, comparing
-// based on keys.
-class MergeIterator : public RowwiseIterator {
- public:
-  // Constructs a MergeIterator of the given iterators.
-  //
-  // The iterators must have matching schemas and should not yet be initialized.
-  //
-  // Note: the iterators must be constructed using a projection that includes
-  // all key columns; otherwise a CHECK will fire at initialization time.
-  explicit MergeIterator(std::vector<std::unique_ptr<RowwiseIterator>> iters);
-
-  virtual ~MergeIterator();
-
-  // The passed-in iterators should be already initialized.
-  Status Init(ScanSpec *spec) OVERRIDE;
-
-  virtual bool HasNext() const OVERRIDE;
-
-  virtual std::string ToString() const OVERRIDE;
-
-  virtual const Schema& schema() const OVERRIDE;
-
-  virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE;
-
-  virtual Status NextBlock(RowBlock* dst) OVERRIDE;
-
- private:
-  void PrepareBatch(RowBlock* dst);
-  Status MaterializeBlock(RowBlock* dst);
-  Status InitSubIterators(ScanSpec *spec);
-
-  // Initialized during Init.
-  std::unique_ptr<Schema> schema_;
-
-  bool initted_;
-
-  // Holds the subiterators until Init is called, at which point this is cleared.
-  // This is required because we can't create a MergeIterState of an uninitialized iterator.
-  std::vector<std::unique_ptr<RowwiseIterator>> orig_iters_;
-
-  // See UnionIterator::states_lock_ for details on locking. This follows the same
-  // pattern.
-  mutable rw_spinlock states_lock_;
-  std::vector<std::unique_ptr<MergeIterState>> states_;
-
-  // Statistics (keyed by projection column index) accumulated so far by any
-  // fully-consumed sub-iterators.
-  std::vector<IteratorStats> finished_iter_stats_by_col_;
-
-  // The number of iterators, used by ToString().
-  const int num_orig_iters_;
-
-  // When the underlying iterators are initialized, each needs its own
-  // copy of the scan spec in order to do its own pushdown calculations, etc.
-  // The copies are allocated from this pool so they can be automatically freed
-  // when the UnionIterator goes out of scope.
-  ObjectPool<ScanSpec> scan_spec_copies_;
-};
-
-// An iterator which unions the results of other iterators.
-// This is different from MergeIterator in that it lays the results out end-to-end
-// rather than merging them based on keys. Hence it is more efficient since there is
-// no comparison needed, and the key column does not need to be read if it is not
-// part of the projection.
-class UnionIterator : public RowwiseIterator {
- public:
-  // Constructs a UnionIterator of the given iterators.
-  //
-  // The iterators must have matching schemas and should not yet be initialized.
-  explicit UnionIterator(std::vector<std::unique_ptr<RowwiseIterator>> iters);
-
-  Status Init(ScanSpec *spec) OVERRIDE;
-
-  bool HasNext() const OVERRIDE;
-
-  std::string ToString() const OVERRIDE;
-
-  const Schema &schema() const OVERRIDE {
-    CHECK(initted_);
-    CHECK(schema_.get() != NULL) << "Bad schema in " << ToString();
-    return *CHECK_NOTNULL(schema_.get());
-  }
-
-  virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE;
-
-  virtual Status NextBlock(RowBlock* dst) OVERRIDE;
-
- private:
-  void PrepareBatch();
-  Status MaterializeBlock(RowBlock* dst);
-  void FinishBatch();
-  Status InitSubIterators(ScanSpec *spec);
-
-  // Pop the front iterator from iters_ and accumulate its statistics into
-  // finished_iter_stats_by_col_.
-  void PopFront();
-
-  // Schema: initialized during Init()
-  std::unique_ptr<Schema> schema_;
-
-  bool initted_;
+class ColumnPredicate;
+class ColumnwiseIterator;
+class RowwiseIterator;
+class ScanSpec;
 
-  // Lock protecting 'iters_' and 'finished_iter_stats_by_col_'.
-  //
-  // Scanners are mostly accessed by the thread doing the scanning, but the HTTP endpoint
-  // which lists running scans may occasionally need to read as well.
-  //
-  // The "owner" thread of the scanner doesn't need to acquire this in read mode, since
-  // it's the only thread which might write. However, it does need to acquire in write
-  // mode when changing 'iters_'.
-  mutable rw_spinlock iters_lock_;
-  std::deque<std::unique_ptr<RowwiseIterator>> iters_;
-
-  // Statistics (keyed by projection column index) accumulated so far by any
-  // fully-consumed sub-iterators.
-  std::vector<IteratorStats> finished_iter_stats_by_col_;
-
-  // When the underlying iterators are initialized, each needs its own
-  // copy of the scan spec in order to do its own pushdown calculations, etc.
-  // The copies are allocated from this pool so they can be automatically freed
-  // when the UnionIterator goes out of scope.
-  ObjectPool<ScanSpec> scan_spec_copies_;
-};
-
-// An iterator which wraps a ColumnwiseIterator, materializing it into full rows.
+// Constructs a MergeIterator of the given iterators.
 //
-// Column predicates are pushed down into this iterator. While materializing a
-// block, columns with associated predicates are materialized first, and the
-// predicates evaluated. If the predicates succeed in filtering out an entire
-// batch, then other columns may avoid doing any IO.
-class MaterializingIterator : public RowwiseIterator {
- public:
-  explicit MaterializingIterator(std::unique_ptr<ColumnwiseIterator> iter);
-
-  // Initialize the iterator, performing predicate pushdown as described above.
-  Status Init(ScanSpec *spec) OVERRIDE;
-
-  bool HasNext() const OVERRIDE;
-
-  std::string ToString() const OVERRIDE;
-
-  const Schema &schema() const OVERRIDE {
-    return iter_->schema();
-  }
-
-  virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE {
-    iter_->GetIteratorStats(stats);
-  }
-
-  virtual Status NextBlock(RowBlock* dst) OVERRIDE;
-
- private:
-  FRIEND_TEST(TestMaterializingIterator, TestPredicatePushdown);
-  FRIEND_TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation);
-
-  Status MaterializeBlock(RowBlock *dst);
-
-  std::unique_ptr<ColumnwiseIterator> iter_;
-
-  // List of (column index, predicate) in order of most to least selective, with
-  // ties broken by the index.
-  std::vector<std::pair<int32_t, ColumnPredicate>> col_idx_predicates_;
-
-  // List of column indexes without predicates to materialize.
-  std::vector<int32_t> non_predicate_column_indexes_;
+// The iterators must have matching schemas and should not yet be initialized.
+std::unique_ptr<RowwiseIterator> NewMergeIterator(
+    std::vector<std::unique_ptr<RowwiseIterator>> iters);
 
-  // Set only by test code to disallow pushdown.
-  bool disallow_pushdown_for_tests_;
-  bool disallow_decoder_eval_;
-};
-
-// An iterator which wraps another iterator and evaluates any predicates that the
-// wrapped iterator did not itself handle during push down.
-class PredicateEvaluatingIterator : public RowwiseIterator {
- public:
-  // Initialize the given '*base_iter' with the given 'spec'.
-  //
-  // If the base_iter accepts all predicates, then simply returns.
-  // Otherwise, swaps out *base_iter for a PredicateEvaluatingIterator which wraps
-  // the original iterator and accepts all predicates on its behalf.
-  //
-  // POSTCONDITION: spec->predicates().empty()
-  // POSTCONDITION: base_iter and its wrapper are initialized
-  static Status InitAndMaybeWrap(std::unique_ptr<RowwiseIterator> *base_iter,
-                                 ScanSpec *spec);
-
-  // Initialize the iterator.
-  // POSTCONDITION: spec->predicates().empty()
-  Status Init(ScanSpec *spec) OVERRIDE;
-
-  virtual Status NextBlock(RowBlock *dst) OVERRIDE;
-
-  bool HasNext() const OVERRIDE;
-
-  std::string ToString() const OVERRIDE;
-
-  const Schema &schema() const OVERRIDE {
-    return base_iter_->schema();
-  }
-
-  virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE {
-    base_iter_->GetIteratorStats(stats);
-  }
-
- private:
-
-  // Construct the evaluating iterator.
-  // This is only called from ::InitAndMaybeWrap()
-  // REQUIRES: base_iter is already Init()ed.
-  explicit PredicateEvaluatingIterator(std::unique_ptr<RowwiseIterator> base_iter);
+// Constructs a UnionIterator of the given iterators.
+//
+// The iterators must have matching schemas and should not yet be initialized.
+std::unique_ptr<RowwiseIterator> NewUnionIterator(
+    std::vector<std::unique_ptr<RowwiseIterator>> iters);
 
-  FRIEND_TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation);
-  FRIEND_TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluationOrder);
+// Constructs a MaterializingIterator of the given ColumnwiseIterator.
+std::unique_ptr<RowwiseIterator> NewMaterializingIterator(
+    std::unique_ptr<ColumnwiseIterator> iter);
 
-  std::unique_ptr<RowwiseIterator> base_iter_;
+// Initializes the given '*base_iter' with the given 'spec'.
+//
+// If the base_iter accepts all predicates, then simply returns. Otherwise,
+// swaps out *base_iter for a PredicateEvaluatingIterator which wraps the
+// original iterator and accepts all predicates on its behalf.
+//
+// POSTCONDITION: spec->predicates().empty()
+// POSTCONDITION: base_iter and its wrapper are initialized
+Status InitAndMaybeWrap(std::unique_ptr<RowwiseIterator>* base_iter,
+                        ScanSpec* spec);
 
-  // List of predicates in order of most to least selective, with
-  // ties broken by the column index.
-  std::vector<ColumnPredicate> col_predicates_;
-};
+// Gets the predicates associated with a PredicateEvaluatingIterator.
+//
+// Only for use by tests.
+const std::vector<ColumnPredicate>& GetIteratorPredicatesForTests(
+    const std::unique_ptr<RowwiseIterator>& iter);
 
 } // namespace kudu
-#endif
diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc
index 4d28a86..20b9f22 100644
--- a/src/kudu/tablet/cfile_set-test.cc
+++ b/src/kudu/tablet/cfile_set-test.cc
@@ -181,7 +181,7 @@ class TestCFileSet : public KuduRowSetTest {
                        int32_t upper) {
     // Create iterator.
     unique_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_, nullptr));
-    unique_ptr<RowwiseIterator> iter(new MaterializingIterator(std::move(cfile_iter)));
+    unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(cfile_iter)));
 
     // Create a scan with a range predicate on the key column.
     ScanSpec spec;
@@ -217,7 +217,7 @@ class TestCFileSet : public KuduRowSetTest {
     LOG(INFO) << "predicates size: " << predicates.size();
     // Create iterator.
     unique_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_, nullptr));
-    unique_ptr<RowwiseIterator> iter(new MaterializingIterator(std::move(cfile_iter)));
+    unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(cfile_iter)));
     LOG(INFO) << "Target size: " << target.size();
     // Create a scan with a range predicate on the key column.
     ScanSpec spec;
@@ -363,7 +363,7 @@ TEST_F(TestCFileSet, TestIteratePartialSchema) {
   Schema new_schema;
   ASSERT_OK(schema_.CreateProjectionByNames({ "c0", "c2" }, &new_schema));
   unique_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&new_schema, nullptr));
-  unique_ptr<RowwiseIterator> iter(new MaterializingIterator(std::move(cfile_iter)));
+  unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(cfile_iter)));
 
   ASSERT_OK(iter->Init(nullptr));
 
@@ -396,7 +396,7 @@ TEST_F(TestCFileSet, TestRangeScan) {
   // Create iterator.
   unique_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_, nullptr));
   CFileSet::Iterator* cfile_iter_raw = cfile_iter.get();
-  unique_ptr<RowwiseIterator> iter(new MaterializingIterator(std::move(cfile_iter)));
+  unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(cfile_iter)));
   Schema key_schema = schema_.CreateKeyProjection();
   Arena arena(1024);
   AutoReleasePool pool;
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 7912d08..da7215e 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -853,7 +853,7 @@ Status CompactionInput::Create(const DiskRowSet &rowset,
   CHECK(projection->has_column_ids());
 
   unique_ptr<ColumnwiseIterator> base_cwise(rowset.base_data_->NewIterator(projection, io_context));
-  unique_ptr<RowwiseIterator> base_iter(new MaterializingIterator(std::move(base_cwise)));
+  unique_ptr<RowwiseIterator> base_iter(NewMaterializingIterator(std::move(base_cwise)));
 
   // Creates a DeltaIteratorMerger that will only include the relevant REDO deltas.
   RowIteratorOptions redo_opts;
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index 4bb54a9..dd208b0 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -119,8 +119,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
 
   unique_ptr<ColumnwiseIterator> old_base_data_cwise(base_data_->NewIterator(&partial_schema_,
                                                                              io_context));
-  unique_ptr<RowwiseIterator> old_base_data_rwise(new MaterializingIterator(
-      std::move(old_base_data_cwise)));
+  unique_ptr<RowwiseIterator> old_base_data_rwise(
+      NewMaterializingIterator(std::move(old_base_data_cwise)));
 
   ScanSpec spec;
   spec.set_cache_blocks(false);
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 6b6e84c..5725e53 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -665,7 +665,7 @@ Status DiskRowSet::NewRowIterator(const RowIteratorOptions& opts,
   unique_ptr<ColumnwiseIterator> col_iter;
   RETURN_NOT_OK(delta_tracker_->WrapIterator(base_iter, opts, &col_iter));
 
-  out->reset(new MaterializingIterator(std::move(col_iter)));
+  *out = NewMaterializingIterator(std::move(col_iter));
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index 2e5f4d1..5bc4e09 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -103,10 +103,10 @@ Status DuplicatingRowSet::NewRowIterator(const RowIteratorOptions& opts,
 
   switch (opts.order) {
     case ORDERED:
-      out->reset(new MergeIterator(std::move(iters)));
+      *out = NewMergeIterator(std::move(iters));
       break;
     case UNORDERED:
-      out->reset(new UnionIterator(std::move(iters)));
+      *out = NewUnionIterator(std::move(iters));
       break;
     default:
       LOG(FATAL) << "unknown order: " << opts.order;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 42804cf..899fc7f 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -2465,11 +2465,11 @@ Status Tablet::Iterator::Init(ScanSpec *spec) {
 
   switch (opts_.order) {
     case ORDERED:
-      iter_.reset(new MergeIterator(std::move(iters)));
+      iter_ = NewMergeIterator(std::move(iters));
       break;
     case UNORDERED:
     default:
-      iter_.reset(new UnionIterator(std::move(iters)));
+      iter_ = NewUnionIterator(std::move(iters));
       break;
   }
 


[kudu] 02/02: generic_iterators: prep for MergeIterator dominance

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5629cc7c9c780a035536fe355170e53cc6231cb2
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Jan 9 00:46:33 2019 -0800

    generic_iterators: prep for MergeIterator dominance
    
    This patch adds a counter to the MergeIterator that tracks the number of
    comparisons made during the lifetime of the iterator. When the dominance
    algorithm is added, the counter's value ought to drop. TestMerge now logs
    the counter's value, and can also generate non-overlapped inputs if desired.
    
    The counter isn't exposed to users. It could be added to IteratorStats and
    set for every key column, but even then it'll only apply to ORDERED scans,
    so the value is dubious.
    
    Change-Id: I3a8db5973dc09b7271e05b3cc28025b7a2a9e21b
    Reviewed-on: http://gerrit.cloudera.org:8080/12196
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/generic_iterators-test.cc | 26 ++++++++++++++++++++++----
 src/kudu/common/generic_iterators.cc      | 23 ++++++++++++++++++++---
 src/kudu/common/generic_iterators.h       |  7 +++++++
 3 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index b9228f0..375be34 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -22,6 +22,7 @@
 #include <cstdlib>
 #include <memory>
 #include <ostream>
+#include <random>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -197,7 +198,7 @@ class TestIntRangePredicate {
   ColumnPredicate pred_;
 };
 
-void TestMerge(const TestIntRangePredicate &predicate) {
+void TestMerge(const TestIntRangePredicate &predicate, bool overlapping_inputs = true) {
   struct List {
     vector<uint32_t> ints;
     unique_ptr<SelectionVector> sv;
@@ -207,12 +208,15 @@ void TestMerge(const TestIntRangePredicate &predicate) {
   expected.reserve(FLAGS_num_rows * FLAGS_num_lists);
   Random prng(SeedRandom());
 
+  uint32_t entry = 0;
   for (int i = 0; i < FLAGS_num_lists; i++) {
     vector<uint32_t> ints;
     ints.reserve(FLAGS_num_rows);
     unique_ptr<SelectionVector> sv(new SelectionVector(FLAGS_num_rows));
 
-    uint32_t entry = 0;
+    if (overlapping_inputs) {
+      entry = 0;
+    }
     for (int j = 0; j < FLAGS_num_rows; j++) {
       entry += prng.Uniform(5);
       ints.emplace_back(entry);
@@ -238,10 +242,16 @@ void TestMerge(const TestIntRangePredicate &predicate) {
     all_ints.emplace_back(List{ std::move(ints), std::move(sv) });
   }
 
-  LOG_TIMING(INFO, "std::sort the expected results") {
+  LOG_TIMING(INFO, "sorting the expected results") {
     std::sort(expected.begin(), expected.end());
   }
 
+  LOG_TIMING(INFO, "shuffling the inputs") {
+    std::random_device rdev;
+    std::mt19937 gen(rdev());
+    std::shuffle(all_ints.begin(), all_ints.end(), gen);
+  }
+
   VLOG(1) << "Predicate expects " << expected.size() << " results: " << expected;
 
   for (int trial = 0; trial < FLAGS_num_iters; trial++) {
@@ -262,7 +272,7 @@ void TestMerge(const TestIntRangePredicate &predicate) {
     spec.AddPredicate(predicate.pred_);
     LOG(INFO) << "Predicate: " << predicate.pred_.ToString();
 
-    LOG_TIMING(INFO, "Iterate merged lists") {
+    LOG_TIMING(INFO, "iterating merged lists") {
       unique_ptr<RowwiseIterator> merger(NewMergeIterator({ std::move(to_merge) }));
       ASSERT_OK(merger->Init(&spec));
 
@@ -285,6 +295,9 @@ void TestMerge(const TestIntRangePredicate &predicate) {
         }
       }
       ASSERT_EQ(total_idx, expected.size());
+
+      LOG(INFO) << "Total number of comparisons performed: "
+                << GetMergeIteratorNumComparisonsForTests(merger);
     }
   }
 }
@@ -294,6 +307,11 @@ TEST(TestMergeIterator, TestMerge) {
   TestMerge(predicate);
 }
 
+TEST(TestMergeIterator, TestMergeNonOverlapping) {
+  TestIntRangePredicate predicate(0, MathLimits<uint32_t>::kMax);
+  TestMerge(predicate, /*overlapping_inputs=*/false);
+}
+
 TEST(TestMergeIterator, TestMergePredicate) {
   TestIntRangePredicate predicate(0, FLAGS_num_rows / 5);
   TestMerge(predicate);
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index 7dd3378..58b23d5 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -252,6 +252,8 @@ class MergeIterator : public RowwiseIterator {
 
   virtual Status NextBlock(RowBlock* dst) OVERRIDE;
 
+  int64_t num_comparisons() const { return num_comparisons_; }
+
  private:
   void PrepareBatch(RowBlock* dst);
   Status MaterializeBlock(RowBlock* dst);
@@ -283,12 +285,16 @@ class MergeIterator : public RowwiseIterator {
   // The copies are allocated from this pool so they can be automatically freed
   // when the UnionIterator goes out of scope.
   ObjectPool<ScanSpec> scan_spec_copies_;
+
+  // The total number of comparisons performed by each call to MaterializeBlock.
+  int64_t num_comparisons_;
 };
 
 MergeIterator::MergeIterator(vector<unique_ptr<RowwiseIterator>> iters)
     : initted_(false),
       orig_iters_(std::move(iters)),
-      num_orig_iters_(orig_iters_.size()) {
+      num_orig_iters_(orig_iters_.size()),
+      num_comparisons_(0) {
   CHECK_GT(orig_iters_.size(), 0);
 }
 
@@ -408,10 +414,15 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
     for (size_t i = 0; i < states_.size(); i++) {
       unique_ptr<MergeIterState> &state = states_[i];
 
-      if (smallest == nullptr ||
-          schema_->Compare(state->next_row(), smallest->next_row()) < 0) {
+      if (PREDICT_FALSE(smallest == nullptr)) {
         smallest = state.get();
         smallest_idx = i;
+      } else {
+        num_comparisons_++;
+        if (schema_->Compare(state->next_row(), smallest->next_row()) < 0) {
+          smallest = state.get();
+          smallest_idx = i;
+        }
       }
     }
 
@@ -456,6 +467,12 @@ unique_ptr<RowwiseIterator> NewMergeIterator(
   return unique_ptr<RowwiseIterator>(new MergeIterator(std::move(iters)));
 }
 
+int64_t GetMergeIteratorNumComparisonsForTests(
+    const unique_ptr<RowwiseIterator>& iter) {
+  MergeIterator* merge = down_cast<MergeIterator*>(iter.get());
+  return merge->num_comparisons();
+}
+
 ////////////////////////////////////////////////////////////
 // UnionIterator
 ////////////////////////////////////////////////////////////
diff --git a/src/kudu/common/generic_iterators.h b/src/kudu/common/generic_iterators.h
index b4b466e..84ab636 100644
--- a/src/kudu/common/generic_iterators.h
+++ b/src/kudu/common/generic_iterators.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <cstdint>
 #include <memory>
 #include <vector>
 
@@ -61,4 +62,10 @@ Status InitAndMaybeWrap(std::unique_ptr<RowwiseIterator>* base_iter,
 const std::vector<ColumnPredicate>& GetIteratorPredicatesForTests(
     const std::unique_ptr<RowwiseIterator>& iter);
 
+// Gets the number of comparisons performed by a MergeIterator.
+//
+// Only for use by tests.
+int64_t GetMergeIteratorNumComparisonsForTests(
+    const std::unique_ptr<RowwiseIterator>& iter);
+
 } // namespace kudu