You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/04/15 22:01:41 UTC

[kudu] branch master updated: KUDU-2466: implement three-heap merge algorithm

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 1567dec  KUDU-2466: implement three-heap merge algorithm
1567dec is described below

commit 1567dec0865586ef93d8a8f555eea30a21228347
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Apr 3 01:02:15 2019 -0700

    KUDU-2466: implement three-heap merge algorithm
    
    This patch implements a new algorithm for efficient merging in
    MergeIterator. The algorithm is documented extensively in the code so
    there's no point in recapping it here. There's also a high-level overview
    with pseudocode available[1].
    
    I microbenchmarked the old implementation against the new one, using both
    overlapping and non-overlapping inputs with a varying number of lists and
    10000 rows per list (see generic_iterators-test). Here are the scan times,
    averaged across five runs:
    
    Parameters                  | old      | new      | diff
    ----------------------------+----------+----------+------------------
    overlapping, 10 lists       | 0.015s   | 0.0522s  | +0.0372   (0.29x)
    overlapping, 100 lists      | 1.0798s  | 1.1024s  | +0.0226   (0.98x)
    overlapping, 1000 lists     | 184.245s | 22.8156s | -161.4294 (8.07x)
    non-overlapping, 10 lists   | 0.0126s  | 0.0196s  | +0.007    (0.64x)
    non-overlapping, 100 lists  | 0.5038s  | 0.129s   | -0.3748   (3.91x)
    non-overlapping, 1000 lists | 89.8626s | 0.9874s  | -88.8752  (91x)
    ----------------------------+----------+----------+------------------
    
    The goal was to optimize ORDERED scans for large and mostly compacted
    tablets, and the new algorithm does just that. With smaller input, the
    overhead of using heaps becomes more pronounced. Overlapping input still
    benefits from heap-based merging, but the overlapping defeats the hot vs.
    cold optimization provided by the algorithm.
    
    To see how it performed against real data, I tested the algorithm against a
    mostly compacted "representative" 40G tablet with 1294 rowsets, all stored
    on one disk. I used a new CLI tool that does a full tablet scan without
    sending any data over the wire. I ran the tool three times: first with
    UNORDERED scans, then with ORDERED scans using the old algorithm, and
    finally with ORDERED scans using the new algorithm. Each run did six scans;
    to account for any caching effects, I dropped the first scan. Results:
    
    Parameters   | Average run time (s) | Peak RSS (kb)
    -------------+----------------------+--------------
    UNORDERED:   | 232                  | 710320
    ORDERED, old | 33787                | 4465532
    ORDERED, new | 979                  | 749440
    -------------+----------------------+--------------
    
    Lastly, the new algorithm opens the door to another optimization: while
    there's just one "hot" sub-iterator, we can copy data block-by-block instead
    of row-by-row. I'll implement this in a follow-up.
    
    1. https://docs.google.com/document/d/1uP0ubjM6ulnKVCRrXtwT_dqrTWjF9tlFSRk0JN2e_O0/edit#
    
    Change-Id: I6deab76a103f45c1b5042b104731e46a771a0f5d
    Reviewed-on: http://gerrit.cloudera.org:8080/12947
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/encoded_key.cc            |   5 +-
 src/kudu/common/generic_iterators-test.cc |   4 +-
 src/kudu/common/generic_iterators.cc      | 514 +++++++++++++++++++++++-------
 src/kudu/common/generic_iterators.h       |   7 -
 src/kudu/common/schema.cc                 |  16 +-
 src/kudu/common/schema.h                  |   5 +-
 6 files changed, 422 insertions(+), 129 deletions(-)

diff --git a/src/kudu/common/encoded_key.cc b/src/kudu/common/encoded_key.cc
index 6ae2fe5..48232cb 100644
--- a/src/kudu/common/encoded_key.cc
+++ b/src/kudu/common/encoded_key.cc
@@ -68,11 +68,12 @@ Status EncodedKey::DecodeEncodedString(const Schema& schema,
     return Status::RuntimeError("OOM");
   }
 
-  RETURN_NOT_OK(schema.DecodeRowKey(encoded, raw_key_buf, arena));
+  ContiguousRow row(&schema, raw_key_buf);
+  RETURN_NOT_OK(schema.DecodeRowKey(encoded, &row, arena));
 
   vector<const void*> raw_keys(schema.num_key_columns());
   for (int i = 0; i < schema.num_key_columns(); i++) {
-    raw_keys[i] = raw_key_buf + schema.column_offset(i);
+    raw_keys[i] = row.cell_ptr(i);
   }
 
   faststring data_copy;
diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 6c95118..1a8ad83 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -462,9 +462,6 @@ void TestMerge(const Schema& schema,
       }
       ASSERT_EQ(expected.size(), total_idx);
       ASSERT_EQ(expected.end(), expected_iter);
-
-      LOG(INFO) << "Total number of comparisons performed: "
-                << GetMergeIteratorNumComparisonsForTests(merger);
     }
   }
 }
@@ -547,6 +544,7 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) {
   // Set up a MaterializingIterator with pushdown disabled, so that the
   // PredicateEvaluatingIterator will wrap it and do evaluation.
   unique_ptr<VectorIterator> colwise(new VectorIterator(ints));
+  google::FlagSaver saver;
   FLAGS_materializing_iterator_do_pushdown = false;
   unique_ptr<RowwiseIterator> materializing(
       NewMaterializingIterator(std::move(colwise)));
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index acb4f5d..ed501a2 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -17,6 +17,8 @@
 
 #include "kudu/common/generic_iterators.h"
 
+#include <sys/types.h>
+
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
@@ -27,12 +29,12 @@
 #include <ostream>
 #include <string>
 #include <unordered_map>
-#include <unordered_set>
 #include <utility>
 
+#include <boost/heap/skew_heap.hpp>
 #include <boost/intrusive/list.hpp>
 #include <boost/intrusive/list_hook.hpp>
-
+#include <boost/iterator/iterator_facade.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
@@ -57,16 +59,18 @@
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/object_pool.h"
 
+namespace boost {
+namespace heap {
+template <class T> struct compare;
+}  // namespace heap
+}  // namespace boost
+
 using std::deque;
 using std::get;
-using std::make_shared;
 using std::pair;
-using std::remove_if;
-using std::shared_ptr;
 using std::sort;
 using std::string;
 using std::unique_ptr;
-using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -106,24 +110,57 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
   explicit MergeIterState(IterWithBounds iwb)
       : iwb_(std::move(iwb)),
         arena_(1024),
-        read_block_(&schema(), kMergeRowBuffer, &arena_),
         next_row_idx_(0),
         rows_advanced_(0),
         rows_valid_(0)
   {}
 
-  // Fetch the next row from the iterator. Does not advance the iterator.
-  // IsFullyExhausted() must return false at the time this method is invoked.
+  // Fetches the next row from the iterator's current block, or the iterator's
+  // absolute lower bound if a block has not yet been pulled.
+  //
+  // Does not advance the iterator.
   const RowBlockRow& next_row() const {
-    DCHECK_LT(rows_advanced_, rows_valid_);
-    return next_row_;
+    if (read_block_) {
+      DCHECK_LT(rows_advanced_, rows_valid_);
+      return next_row_;
+    }
+    DCHECK(decoded_bounds_);
+    return decoded_bounds_->lower;
+  }
+
+  // Fetches the last row from the iterator's current block, or the iterator's
+  // absolute upper bound if a block has not yet been pulled.
+  //
+  // Does not advance the iterator.
+  const RowBlockRow& last_row() const {
+    if (read_block_) {
+      return last_row_;
+    }
+    DCHECK(decoded_bounds_);
+    return decoded_bounds_->upper;
   }
 
-  // Initialize the underlying iterator to point to the first valid row, if
-  // any. This method should be called before calling any other methods.
-  Status Init() {
+  // Finishes construction of the MergeIterState by decoding the bounds if they
+  // exist. If not, we have to pull a block immediately: after Init() is
+  // finished it must be safe to call next_row() and last_row().
+  //
+  // Decoded bound allocations are done against 'decoded_bounds_arena'.
+  Status Init(Arena* decoded_bounds_arena) {
     CHECK_EQ(0, rows_valid_);
-    return PullNextBlock();
+
+    if (iwb_.encoded_bounds) {
+      decoded_bounds_.emplace(&schema(), decoded_bounds_arena);
+      decoded_bounds_->lower = decoded_bounds_->block.row(0);
+      decoded_bounds_->upper = decoded_bounds_->block.row(1);
+      RETURN_NOT_OK(schema().DecodeRowKey(
+          iwb_.encoded_bounds->first, &decoded_bounds_->lower, decoded_bounds_arena));
+      RETURN_NOT_OK(schema().DecodeRowKey(
+          iwb_.encoded_bounds->second, &decoded_bounds_->upper, decoded_bounds_arena));
+    } else {
+      RETURN_NOT_OK(PullNextBlock());
+    }
+
+    return Status::OK();
   }
 
   // Returns true if the underlying iterator is fully exhausted.
@@ -132,24 +169,21 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
   }
 
   // Advance to the next row in the underlying iterator.
-  Status Advance();
+  //
+  // If successful, 'pulled_new_block' is true if this block was exhausted and a
+  // new block was pulled from the underlying iterator.
+  Status Advance(bool* pulled_new_block);
 
   // Add statistics about the underlying iterator to the given vector.
   void AddStats(vector<IteratorStats>* stats) const {
     AddIterStats(*iwb_.iter, stats);
   }
 
-  // Returns the number of valid rows remaining in the current block.
-  size_t remaining_in_block() const {
-    return rows_valid_ - rows_advanced_;
-  }
-
   // Returns the schema from the underlying iterator.
   const Schema& schema() const {
     return iwb_.iter->schema();
   }
 
- private:
   // Pull the next block from the underlying iterator.
   Status PullNextBlock();
 
@@ -158,6 +192,14 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
     return rows_advanced_ == rows_valid_;
   }
 
+  string ToString() const {
+    return Substitute("[$0,$1]: $2",
+                      schema().DebugRowKey(next_row()),
+                      schema().DebugRowKey(last_row()),
+                      iwb_.iter->ToString());
+  }
+
+ private:
   // The iterator (and optional bounds) whose rows are to be merged with other
   // iterators.
   //
@@ -167,42 +209,61 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
   // Allocates memory for read_block_.
   Arena arena_;
 
+  // Optional rowset bounds, decoded during Init().
+  struct DecodedBounds {
+    // 'block' must be constructed immediately; the bounds themselves can be
+    // initialized later.
+    DecodedBounds(const Schema* schema, Arena* arena)
+        : block(schema, /*nrows=*/2, arena) {}
+
+    RowBlock block;
+    RowBlockRow lower;
+    RowBlockRow upper;
+  };
+  boost::optional<DecodedBounds> decoded_bounds_;
+
   // Current block of buffered rows from the iterator.
   //
   // The memory backing the rows was allocated out of the arena.
-  RowBlock read_block_;
+  unique_ptr<RowBlock> read_block_;
 
   // The row currently pointed to by the iterator.
   RowBlockRow next_row_;
 
+  // The last row available in read_block_.
+  RowBlockRow last_row_;
+
   // Row index of next_row_ in read_block_.
   size_t next_row_idx_;
 
-  // Number of rows we've advanced past in the current RowBlock.
+  // Number of rows we've advanced past in read_block_.
   size_t rows_advanced_;
 
-  // Number of valid (selected) rows in the current RowBlock.
+  // Number of valid (selected) rows in read_block_.
   size_t rows_valid_;
 
   DISALLOW_COPY_AND_ASSIGN(MergeIterState);
 };
 
-Status MergeIterState::Advance() {
+Status MergeIterState::Advance(bool* pulled_new_block) {
   rows_advanced_++;
   if (IsBlockExhausted()) {
     arena_.Reset();
-    return PullNextBlock();
+    RETURN_NOT_OK(PullNextBlock());
+    *pulled_new_block = true;
+    return Status::OK();
   }
 
   // Seek to the next selected row.
-  SelectionVector *selection = read_block_.selection_vector();
-  for (++next_row_idx_; next_row_idx_ < read_block_.nrows(); next_row_idx_++) {
+  SelectionVector *selection = read_block_->selection_vector();
+  for (++next_row_idx_; next_row_idx_ < read_block_->nrows(); next_row_idx_++) {
     if (selection->IsRowSelected(next_row_idx_)) {
-      next_row_.Reset(&read_block_, next_row_idx_);
+      next_row_.Reset(read_block_.get(), next_row_idx_);
       break;
     }
   }
-  DCHECK_NE(next_row_idx_, read_block_.nrows()+1) << "No selected rows found!";
+  DCHECK_NE(next_row_idx_, read_block_->nrows()) << "No selected rows found!";
+  *pulled_new_block = false;
   return Status::OK();
 }
 
@@ -210,24 +271,40 @@ Status MergeIterState::PullNextBlock() {
   CHECK_EQ(rows_advanced_, rows_valid_)
       << "should not pull next block until current block is exhausted";
 
+  if (!read_block_) {
+    read_block_.reset(new RowBlock(&schema(), kMergeRowBuffer, &arena_));
+  }
   while (iwb_.iter->HasNext()) {
-    RETURN_NOT_OK(iwb_.iter->NextBlock(&read_block_));
+    RETURN_NOT_OK(iwb_.iter->NextBlock(read_block_.get()));
     rows_advanced_ = 0;
     // Honor the selection vector of the read_block_, since not all rows are necessarily selected.
-    SelectionVector *selection = read_block_.selection_vector();
-    DCHECK_EQ(selection->nrows(), read_block_.nrows());
-    DCHECK_LE(selection->CountSelected(), read_block_.nrows());
+    SelectionVector *selection = read_block_->selection_vector();
+    DCHECK_EQ(selection->nrows(), read_block_->nrows());
+    DCHECK_LE(selection->CountSelected(), read_block_->nrows());
     rows_valid_ = selection->CountSelected();
-    VLOG(2) << Substitute("$0/$1 rows selected", rows_valid_, read_block_.nrows());
+    VLOG(2) << Substitute("$0/$1 rows selected", rows_valid_, read_block_->nrows());
     if (rows_valid_ == 0) {
       // Short-circuit: this block is entirely unselected and can be skipped.
       continue;
     }
 
-    // Seek next_row_ to the first selected row.
+    // Seek next_row_ and last_row_ to the first and last selected rows
+    // respectively (which could be identical).
+    //
+    // We use a signed size_t type to avoid underflowing when finding last_row_.
+    //
+    // TODO(adar): this can be simplified if there was a BitmapFindLastSet().
     CHECK(selection->FindFirstRowSelected(&next_row_idx_));
-    next_row_.Reset(&read_block_, next_row_idx_);
-    return Status::OK();
+    next_row_.Reset(read_block_.get(), next_row_idx_);
+    for (ssize_t row_idx = read_block_->nrows() - 1; row_idx >= 0; row_idx--) {
+      if (selection->IsRowSelected(row_idx)) {
+        last_row_.Reset(read_block_.get(), row_idx);
+        VLOG(1) << "Pulled new block: " << ToString();
+        return Status::OK();
+      }
+    }
+
+    LOG(FATAL) << "unreachable code"; // guaranteed by the short-circuit above
   }
 
   // The underlying iterator is fully exhausted.
@@ -239,8 +316,112 @@ Status MergeIterState::PullNextBlock() {
 // An iterator which merges the results of other iterators, comparing
 // based on keys.
 //
-// Note: the sub-iterators must be constructed using a projection that includes
-// key columns; otherwise the merge algorithm's key comparisons won't work.
+// Three different heaps are used to optimize the merge process. To explain how
+// it works, let's start with an explanation of a traditional heap-based merge:
+// there exist N sorted lists of elements and the goal is to produce a single
+// sorted list containing all of the elements.
+//
+// To begin:
+// - For each list, peek the first element into a per-list buffer.
+// - Add all of the lists to a min-heap ordered on the per-list buffers. This
+//   means that the heap's top-most entry (accessible in O(1) time) will be the
+//   list containing the smallest not-yet-consumed element.
+//
+// To perform the merge, loop while the min-heap isn't empty:
+// - Pop the top-most list from the min-heap.
+// - Copy that list's peeked element to the output.
+// - Peek the list's next element. If the list is empty, discard it.
+// - If the list has more elements, push it back into the min-heap.
+//
+// This algorithm runs in O(n log n) time and is generally superior to a naive
+// O(n^2) merge. However, it requires peeked elements to remain resident in
+// memory during the merge.
+//
+// The MergeIterator's sub-iterators operate much like the lists described
+// above: elements correspond to rows and sorting is based on rows' primary
+// keys. However, there are several important differences that open the door for
+// further optimization:
+// 1.  Each sub-iterator corresponds to a Kudu rowset, and DiskRowSets' smallest
+//     and largest possible primary keys (i.e. bounds) are known ahead of time.
+// 2.  When iterating on DiskRowSets, peeking even one row means decoding a page
+//     of columnar data for each projected column. This decoded data remains
+//     resident in memory so that we needn't repeat the decoding for each row,
+//     but it means we have a strong motivation to minimize the set of peeked
+//     sub-iterators in order to minimize memory consumption.
+// 2a. Related to #2, there's little reason not to peek more than row at a time,
+//     since the predominant source of memory usage is in the decoded pages
+//     rather than the buffered rows.
+//
+// The aggressive peeking allows us to tweak the list model slightly: instead
+// of treating sub-iterators as continuous sequences of single rows, we can
+// think of each as a sequence of discrete row "runs". Each run has a lower
+// bound (the key of the first row in the run) and an upper bound (the key of
+// the last row). One run in each sub-iterator is NEXT in that its rows have
+// been peeked and are resident in memory. When we draw rows from the
+// sub-iterator, we'll draw them from this run. We can use the bounds to
+// establish overlapping relationships between runs across sub-iterators. In
+// theory, the less overlap, the fewer runs need to be considered when merging.
+//
+// To exploit this, we need to formally define the concept of a "merge window".
+// The window describes, at any given time, the key space interval where we
+// expect to find the row with the smallest key. A sub-iterator whose NEXT
+// overlaps with the merge window is said to be actively participating in the merge.
+//
+// The merge window is defined as follows:
+// 1.  The window's start is the smallest lower bound of all sub-iterators. We
+//     refer to the sub-iterator that owns this lower bound as LOW.
+// 2.  The window’s end is the smallest upper bound of all sub-iterators whose
+//     lower bounds are less than or equal to LOW's upper bound.
+// 2a. The window's end could be LOW's upper bound itself, if it is the smallest
+//     upper bound, but this isn't necessarily the case.
+// 3.  The merge window's dimensions change as the merge proceeds, though it
+//     only ever moves "to the right" (i.e. the window start/end only increase).
+//
+// Armed with the merge window concept, we can bifurcate the sub-iterators into
+// a set whose NEXTs overlap with the merge window and a set whose NEXTs do not.
+// We store the first set in a HOT min-heap (ordered by each's NEXT's lower
+// bound). Now the merge steady state resembles that of a traditional heap-based
+// merge: the top-most sub-iterator is popped from HOT, the lower bound is
+// copied to the output and advanced, and the sub-iterator is pushed back to
+// HOT. The bifurcation hasn't yielded any algorithmic improvements, but the
+// more non-overlapped the input (i.e. the more compacted the tablet), the fewer
+// sub-iterators will be in HOT and thus the fewer comparisons and heap motion
+// will take place.
+//
+// How do sub-iterators move between the two sets? At the outset, we examine all
+// sub-iterators to find the initial merge window and bifurcate accordingly. In
+// the steady state, we need to move to HOT whenever the end of the merge window
+// moves; that's a sign that the window may now overlap with a NEXT belonging to
+// a sub-iterator in the second set. The end of the merge window moves when a
+// sub-iterator is fully exhausted (i.e. all rows have been copied to the
+// output), or when a sub-iterator finishes its NEXT and needs to peek again.
+//
+// But which sub-iterators should be moved? To answer this question efficiently,
+// we need two more heaps:
+// - COLD: a min-heap for sub-iterators in the second set, ordered by each's
+//   NEXT's lower bound. At any given time, the NEXT belonging to the top-most
+//   sub-iterator in COLD is nearest the merge window.
+// - HOTMAXES: a min-heap for keys. Each entry corresponds to a sub-iterator
+//   present in HOT, and specifically, to its NEXT's upper bound. At any given
+//   time, the top-most key in HOTMAXES represents the end of the merge window.
+//
+// When the merge window's end has moved and we need to refill HOT, the top-most
+// sub-iterator in COLD is the best candidate. To figure out whether it should
+// be moved, we compare its NEXT's lower bound against the top-most key in
+// HOTMAXES: if the lower bound is less than or equal to the key, we move the
+// sub-iterator from COLD to HOT. On the flip side, when a sub-iterator from HOT
+// finishes its NEXT and peeks again, we also need to check whether it has
+// exited the merge window. The approach is similar: if its NEXT's lower bound
+// is greater than the top-most key in HOTMAXES, it's time to move it to COLD.
+//
+// There's one more piece to this puzzle: the absolute bounds that are known
+// ahead of time are used as stand-ins for NEXT's lower and upper bounds. This
+// helps defer peeking for as long as possible, at least until the sub-iterator
+// moves from COLD to HOT. After that, peeked memory must remain resident until
+// the sub-iterator is fully exhausted.
+//
+// For another description of this algorithm including pseudocode, see
+// https://docs.google.com/document/d/1uP0ubjM6ulnKVCRrXtwT_dqrTWjF9tlFSRk0JN2e_O0/edit#
 class MergeIterator : public RowwiseIterator {
  public:
   // Constructs a MergeIterator of the given iterators.
@@ -266,13 +447,18 @@ class MergeIterator : public RowwiseIterator {
 
   virtual Status NextBlock(RowBlock* dst) OVERRIDE;
 
-  int64_t num_comparisons() const { return num_comparisons_; }
-
  private:
   void PrepareBatch(RowBlock* dst);
   Status MaterializeBlock(RowBlock* dst);
   Status InitSubIterators(ScanSpec *spec);
 
+  // Moves sub-iterators from cold_ to hot_ if they now overlap with the merge
+  // window. Should be called whenever the merge window moves.
+  Status RefillHotHeap();
+
+  // Destroys a fully exhausted sub-iterator.
+  void DestroySubIterator(MergeIterState* state);
+
   const MergeIteratorOptions opts_;
 
   // Initialized during Init.
@@ -284,8 +470,9 @@ class MergeIterator : public RowwiseIterator {
   // column exists in the schema.
   int is_deleted_col_index_;
 
-  // Holds the subiterators until Init is called, at which point this is cleared.
-  // This is required because we can't create a MergeIterState of an uninitialized iterator.
+  // Holds the sub-iterators until Init is called, at which point this is
+  // cleared. This is required because we can't create a MergeIterState of an
+  // uninitialized sub-iterator.
   vector<IterWithBounds> orig_iters_;
 
   // See UnionIterator::iters_lock_ for details on locking. This follows the same
@@ -306,8 +493,58 @@ class MergeIterator : public RowwiseIterator {
   // when the UnionIterator goes out of scope.
   ObjectPool<ScanSpec> scan_spec_copies_;
 
-  // The total number of comparisons performed by each call to MaterializeBlock.
-  int64_t num_comparisons_;
+  // Arena dedicated to MergeIterState bounds decoding.
+  //
+  // Each MergeIterState has an arena for buffered row data, but it is reset
+  // every time a new block is pulled. This single arena ensures that a
+  // MergeIterState's decoded bounds remain allocated for its lifetime.
+  Arena decoded_bounds_arena_;
+
+  // Min-heap that orders rows by their keys. A call to top() will yield the row
+  // with the smallest key.
+  struct RowComparator {
+    bool operator()(const RowBlockRow& a, const RowBlockRow& b) const {
+      // This is counter-intuitive, but it's because boost::heap defaults to
+      // a max-heap; the comparator must be inverted to yield a min-heap.
+      return a.schema()->Compare(a, b) > 0;
+    }
+  };
+  typedef boost::heap::skew_heap<
+    RowBlockRow, boost::heap::compare<RowComparator>> RowMinHeap;
+
+  // Min-heap that orders sub-iterators by their next row key. A call to top()
+  // will yield the sub-iterator with the smallest next row key.
+  struct MergeIterStateComparator {
+    bool operator()(const MergeIterState* a, const MergeIterState* b) const {
+      // This is counter-intuitive, but it's because boost::heap defaults to
+      // a max-heap; the comparator must be inverted to yield a min-heap.
+      return a->schema().Compare(a->next_row(), b->next_row()) > 0;
+    }
+  };
+  typedef boost::heap::skew_heap<
+    MergeIterState*, boost::heap::compare<MergeIterStateComparator>> MergeStateMinHeap;
+
+  // The three heaps as described in the algorithm above.
+  //
+  // Note that the heaps do not "own" any of the objects they contain:
+  // - The MergeIterStates in hot_ and cold_ are owned by states_.
+  // - The data backing the rows in hotmaxes_ is owned by all states' read_block_.
+  //
+  // Care must be taken to remove entries from the heaps when the corresponding
+  // objects are destroyed.
+  //
+  // Boost offers a variety of different heap data structures[1]. Perf testing
+  // via generic_iterators-test (TestMerge and TestMergeNonOverlapping with
+  // num_iters=10, num_lists=1000, and num_rows=1000) shows that while all heaps
+  // perform more or less equally well for non-overlapping input, skew heaps
+  // outperform the rest for overlapping input. Basic priority queues (i.e.
+  // boost::heap::priority_queue and std::priority_queue) were excluded as they
+  // do not offer ordered iteration.
+  //
+  // 1. https://www.boost.org/doc/libs/1_69_0/doc/html/heap/data_structures.html
+  MergeStateMinHeap hot_;
+  MergeStateMinHeap cold_;
+  RowMinHeap hotmaxes_;
 };
 
 MergeIterator::MergeIterator(MergeIteratorOptions opts,
@@ -316,7 +553,7 @@ MergeIterator::MergeIterator(MergeIteratorOptions opts,
       initted_(false),
       orig_iters_(std::move(iters)),
       num_orig_iters_(orig_iters_.size()),
-      num_comparisons_(0) {
+      decoded_bounds_arena_(1024) {
   CHECK_GT(orig_iters_.size(), 0);
 }
 
@@ -327,20 +564,12 @@ MergeIterator::~MergeIterator() {
 Status MergeIterator::Init(ScanSpec *spec) {
   CHECK(!initted_);
 
-  // Initialize the iterators and construct the per-iterator merge states.
+  // Initialize the iterators and the per-iterator merge states.
   //
   // When this method finishes, orig_iters_ has been cleared and states_ has
   // been populated.
   RETURN_NOT_OK(InitSubIterators(spec));
 
-  // Retrieve every iterator's first block.
-  //
-  // TODO(adar): establish dominance between iterators and only initialize
-  // non-dominated iterators.
-  for (auto& s : states_) {
-    RETURN_NOT_OK(s.Init());
-  }
-
   // Verify that the schemas match in debug builds.
   //
   // It's important to do the verification after initializing the iterators, as
@@ -372,6 +601,12 @@ Status MergeIterator::Init(ScanSpec *spec) {
       [](const MergeIterState& s) { return PREDICT_FALSE(s.IsFullyExhausted()); },
       [](MergeIterState* s) { delete s; });
 
+  // Establish the merge window and initialize the three heaps.
+  for (auto& s : states_) {
+    cold_.push(&s);
+  }
+  RETURN_NOT_OK(RefillHotHeap());
+
   initted_ = true;
   return Status::OK();
 }
@@ -387,6 +622,7 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) {
     ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
     RETURN_NOT_OK(InitAndMaybeWrap(&i.iter, spec_copy));
     unique_ptr<MergeIterState> state(new MergeIterState(std::move(i)));
+    RETURN_NOT_OK(state->Init(&decoded_bounds_arena_));
     states_.push_back(*state.release());
   }
   orig_iters_.clear();
@@ -399,7 +635,59 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) {
   return Status::OK();
 }
 
+Status MergeIterator::RefillHotHeap() {
+  VLOG(2) << "Refilling hot heap";
+  while (!cold_.empty() &&
+         (hotmaxes_.empty() ||
+          schema_->Compare(hotmaxes_.top(), cold_.top()->next_row()) >= 0)) {
+    MergeIterState* warmest = cold_.top();
+    cold_.pop();
+
+    // This will only be true once per sub-iterator, when it becomes hot for the
+    // very first time.
+    if (warmest->IsBlockExhausted()) {
+      RETURN_NOT_OK(warmest->PullNextBlock());
+
+      // After pulling a block, we can't just assume 'warmest' overlaps the
+      // merge window: there could have been a huge gap between the pulled block
+      // and the sub-iterator's absolute bounds. In other words, although the
+      // bounds told us that 'warmest' was the best candidate, the block is the
+      // ultimate source of truth.
+      //
+      // To deal with this, we pretend 'warmest' doesn't overlap and restart the
+      // algorithm. In the worst case (little to no gap between the block and
+      // the bounds), we'll pop 'warmest' right back out again.
+      if (warmest->IsFullyExhausted()) {
+        DestroySubIterator(warmest);
+      } else {
+        cold_.push(warmest);
+      }
+      continue;
+    }
+    VLOG(2) << "Became hot: " << warmest->ToString();
+    hot_.push(warmest);
+    hotmaxes_.push(warmest->last_row());
+  }
+  if (VLOG_IS_ON(2)) {
+    VLOG(2) << "Done refilling hot heap";
+    for (const auto* c : cold_) {
+      VLOG(2) << "Still cold: " << c->ToString();
+    }
+  }
+  return Status::OK();
+}
+
+void MergeIterator::DestroySubIterator(MergeIterState* state) {
+  DCHECK(state->IsFullyExhausted());
+
+  std::lock_guard<rw_spinlock> l(states_lock_);
+  state->AddStats(&finished_iter_stats_by_col_);
+  states_.erase_and_dispose(states_.iterator_to(*state),
+                            [](MergeIterState* s) { delete s; });
+}
+
 Status MergeIterator::NextBlock(RowBlock* dst) {
+  VLOG(3) << "Called NextBlock on " << ToString();
   CHECK(initted_);
   DCHECK_SCHEMA_EQ(*dst->schema(), schema());
 
@@ -413,16 +701,6 @@ void MergeIterator::PrepareBatch(RowBlock* dst) {
   if (dst->arena()) {
     dst->arena()->Reset();
   }
-
-  // We can always provide at least as many rows as are remaining
-  // in the currently queued up blocks.
-  size_t available = 0;
-  for (const auto& s : states_) {
-    if (available >= dst->row_capacity()) break;
-    available += s.remaining_in_block();
-  }
-
-  dst->Resize(std::min(dst->row_capacity(), available));
 }
 
 // TODO(todd): this is an obvious spot to add codegen - there's a ton of branching
@@ -433,47 +711,40 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
   // smallest row key at a given moment during the merge because there may be
   // multiple deleted rows with the same row key across multiple rowsets, and
   // up to one live instance, that we have to deduplicate.
-  vector<MergeIterState*> smallest(states_.size());
+  vector<MergeIterState*> smallest(hot_.size());
 
   // Initialize the selection vector.
   // MergeIterState only returns selected rows.
   dst->selection_vector()->SetAllTrue();
   size_t dst_row_idx = 0;
   while (dst_row_idx < dst->nrows()) {
+    // If the hot heap is empty, we must be out of sub-iterators.
+    if (PREDICT_FALSE(hot_.empty())) {
+      DCHECK(states_.empty());
+      break;
+    }
 
-    // Find the sub-iterator that is currently smallest.
+    // TODO(adar): optimize the case where hot_.size == 1.
+
+    // Find the set of sub-iterators whose matching next row keys are the
+    // smallest across all sub-iterators.
+    //
+    // Note: heap ordered iteration isn't the same as a total ordering. For
+    // example, the two absolute smallest keys might be in the same sub-iterator
+    // rather than in the first two sub-iterators yielded via ordered iteration.
+    // However, the goal here is to identify a group of matching keys for the
+    // purpose of deduplication, and we're guaranteed that such matching keys
+    // cannot exist in the same sub-iterator (i.e. the same rowset).
     smallest.clear();
-
-    // Typically the number of states_ is not that large, so using a priority
-    // queue is not worth it.
-    for (auto& iter : states_) {
-      // To merge in row key order, we need to consume the smallest row at any
-      // given time. We locate that row by peeking at the next row in each of
-      // the states_ iterators, which includes all possible candidates for the
-      // next row in key order.
-      int cmp;
-      if (!smallest.empty()) {
-        // If we have a candidate for smallest row, compare it against the
-        // smallest row in each iterator.
-        cmp = schema_->Compare(iter.next_row(), smallest[0]->next_row());
-        num_comparisons_++;
-      }
-      if (smallest.empty() || cmp < 0) {
-        // If we have no candidates for the next row yet, or the row found is
-        // smaller than the previously-smallest, replace the smallest with the
-        // new row found.
-        smallest.clear();
-        smallest.emplace_back(&iter);
-      } else if (!smallest.empty() && cmp == 0) {
-        // If we have found a duplicate of the smallest row, at least one must
-        // be a ghost row. Collect all duplicates in order to merge them later.
-        smallest.emplace_back(&iter);
+    for (auto iter = hot_.ordered_begin(); iter != hot_.ordered_end(); ++iter) {
+      MergeIterState* state = *iter;
+      if (!smallest.empty() &&
+          schema_->Compare(state->next_row(), smallest[0]->next_row()) != 0) {
+        break;
       }
+      smallest.emplace_back(state);
     }
 
-    // If no iterators had any row left, then we're done iterating.
-    if (PREDICT_FALSE(smallest.empty())) break;
-
     MergeIterState* row_to_return_iter = nullptr;
     if (!opts_.include_deleted_rows) {
       // Since deleted rows are not included here, there can only be a single
@@ -511,17 +782,48 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
             << "expected deleted row";
       }
     }
+    VLOG(3) << Substitute("Copying row $0 from $1",
+                          dst_row_idx, row_to_return_iter->ToString());
     RowBlockRow dst_row = dst->row(dst_row_idx++);
     RETURN_NOT_OK(CopyRow(row_to_return_iter->next_row(), &dst_row, dst->arena()));
 
     // Advance all matching sub-iterators and remove any that are exhausted.
-    for (MergeIterState* s : smallest) {
-      RETURN_NOT_OK(s->Advance());
+    for (auto& s : smallest) {
+      bool pulled_new_block;
+      RETURN_NOT_OK(s->Advance(&pulled_new_block));
+      hot_.pop();
+
+      // Note that hotmaxes_ is not yet popped as it's not necessary to do so if
+      // the merge window hasn't changed. Thus, we can avoid some work by
+      // deferring it into the cases below.
+
       if (s->IsFullyExhausted()) {
-        std::lock_guard<rw_spinlock> l(states_lock_);
-        s->AddStats(&finished_iter_stats_by_col_);
-        states_.erase_and_dispose(states_.iterator_to(*s),
-                                  [](MergeIterState* s) { delete s; });
+        hotmaxes_.pop();
+        DestroySubIterator(s);
+
+        // This sub-iterator's removal means the end of the merge window may
+        // have shifted.
+        RETURN_NOT_OK(RefillHotHeap());
+      } else if (pulled_new_block) {
+        hotmaxes_.pop();
+
+        // This sub-iterator has a new block, which means the end of the merge
+        // window may have shifted.
+        if (!hotmaxes_.empty() && schema_->Compare(hotmaxes_.top(), s->next_row()) < 0) {
+          // The new block lies beyond the new end of the merge window.
+          VLOG(2) << "Block finished, became cold: " << s->ToString();
+          cold_.push(s);
+        } else {
+          // The new block is still within the merge window.
+          VLOG(2) << "Block finished, still hot: " << s->ToString();
+          hot_.push(s);
+          hotmaxes_.push(s->last_row());
+        }
+        RETURN_NOT_OK(RefillHotHeap());
+      } else {
+        // The sub-iterator's block's upper bound remains the same; the merge
+        // window has not changed.
+        hot_.push(s);
       }
     }
   }
@@ -560,12 +862,6 @@ unique_ptr<RowwiseIterator> NewMergeIterator(
   return unique_ptr<RowwiseIterator>(new MergeIterator(opts, std::move(iters)));
 }
 
-int64_t GetMergeIteratorNumComparisonsForTests(
-    const unique_ptr<RowwiseIterator>& iter) {
-  MergeIterator* merge = down_cast<MergeIterator*>(iter.get());
-  return merge->num_comparisons();
-}
-
 ////////////////////////////////////////////////////////////
 // UnionIterator
 ////////////////////////////////////////////////////////////
diff --git a/src/kudu/common/generic_iterators.h b/src/kudu/common/generic_iterators.h
index b93a788..92462c2 100644
--- a/src/kudu/common/generic_iterators.h
+++ b/src/kudu/common/generic_iterators.h
@@ -16,7 +16,6 @@
 // under the License.
 #pragma once
 
-#include <cstdint>
 #include <memory>
 #include <string>
 #include <utility>
@@ -85,10 +84,4 @@ Status InitAndMaybeWrap(std::unique_ptr<RowwiseIterator>* base_iter,
 const std::vector<ColumnPredicate>& GetIteratorPredicatesForTests(
     const std::unique_ptr<RowwiseIterator>& iter);
 
-// Gets the number of comparisons performed by a MergeIterator.
-//
-// Only for use by tests.
-int64_t GetMergeIteratorNumComparisonsForTests(
-    const std::unique_ptr<RowwiseIterator>& iter);
-
 } // namespace kudu
diff --git a/src/kudu/common/schema.cc b/src/kudu/common/schema.cc
index 074689a..8a5509e 100644
--- a/src/kudu/common/schema.cc
+++ b/src/kudu/common/schema.cc
@@ -21,6 +21,7 @@
 #include <unordered_set>
 
 #include "kudu/common/row.h"
+#include "kudu/common/rowblock.h" // IWYU pragma: keep
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/strcat.h"
@@ -461,11 +462,10 @@ string Schema::ToString(ToStringMode mode) const {
                 "\n)");
 }
 
+template <class RowType>
 Status Schema::DecodeRowKey(Slice encoded_key,
-                            uint8_t* buffer,
+                            RowType* row,
                             Arena* arena) const {
-  ContiguousRow row(this, buffer);
-
   for (size_t col_idx = 0; col_idx < num_key_columns(); ++col_idx) {
     const ColumnSchema& col = column(col_idx);
     const KeyEncoder<faststring>& key_encoder = GetKeyEncoder<faststring>(col.type_info());
@@ -473,7 +473,7 @@ Status Schema::DecodeRowKey(Slice encoded_key,
     RETURN_NOT_OK_PREPEND(key_encoder.Decode(&encoded_key,
                                              is_last,
                                              arena,
-                                             row.mutable_cell_ptr(col_idx)),
+                                             row->mutable_cell_ptr(col_idx)),
                           Substitute("Error decoding composite key component '$0'",
                                      col.name()));
   }
@@ -490,11 +490,11 @@ string Schema::DebugEncodedRowKey(Slice encoded_key, StartOrEnd start_or_end) co
 
   Arena arena(256);
   uint8_t* buf = reinterpret_cast<uint8_t*>(arena.AllocateBytes(key_byte_size()));
-  Status s = DecodeRowKey(encoded_key, buf, &arena);
+  ContiguousRow row(this, buf);
+  Status s = DecodeRowKey(encoded_key, &row, &arena);
   if (!s.ok()) {
     return "<invalid key: " + s.ToString() + ">";
   }
-  ConstContiguousRow row(this, buf);
   return DebugRowKey(row);
 }
 
@@ -523,6 +523,10 @@ size_t Schema::memory_footprint_including_this() const {
   return kudu_malloc_usable_size(this) + memory_footprint_excluding_this();
 }
 
+// Explicit specialization for callers outside this compilation unit.
+template
+Status Schema::DecodeRowKey(Slice encoded_key, RowBlockRow* row, Arena* arena) const;
+
 // ============================================================================
 //  Schema Builder
 // ============================================================================
diff --git a/src/kudu/common/schema.h b/src/kudu/common/schema.h
index 610897d..307ac3b 100644
--- a/src/kudu/common/schema.h
+++ b/src/kudu/common/schema.h
@@ -662,12 +662,13 @@ class Schema {
     return DebugRowColumns(row, num_key_columns());
   }
 
-  // Decode the specified encoded key into the given 'buffer', which
+  // Decode the specified encoded key into the given 'row', which
   // must be at least as large as this->key_byte_size().
   //
   // 'arena' is used for allocating indirect strings, but is unused
   // for other datatypes.
-  Status DecodeRowKey(Slice encoded_key, uint8_t* buffer,
+  template<class RowType>
+  Status DecodeRowKey(Slice encoded_key, RowType* row,
                       Arena* arena) const WARN_UNUSED_RESULT;
 
   // Decode and stringify the given contiguous encoded row key in