You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/04/17 21:46:39 UTC

[kudu] branch master updated: generic_iterators: MergeIterator whole block copy optimization

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ecc292e  generic_iterators: MergeIterator whole block copy optimization
ecc292e is described below

commit ecc292e1b0b506b089acea1a4f5e99e56c711915
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Fri Apr 12 10:06:39 2019 -0700

    generic_iterators: MergeIterator whole block copy optimization
    
    This patch adds the "whole block copy" optimization to the MergeIterator.
    The idea is simple: whenever there's only one sub-iterator in the merge
    window, we can copy the entire block out of that sub-iterator instead of
    copying row-by-row.
    
    The challenging aspect was properly handling the client's SelectionVector:
    - When copying row-by-row, the MergeIterator must skip deselected rows in
      order to always return the next smallest row. The MergeIterState
      bookkeeping helped enforce this invariant.
    - When copying block-by-block, skipping deselected rows is harder (and
      potentially less performant) than the simple bitmap copy we currently do.
      Plus it's not necessary for correctness; the scan endpoint will skip
      deselected rows when serializing the RowBlock.
    
    So I opted to retain deselected rows in the block-by-block case, and updated
    the MergeIterState bookkeeping to cope.
    
    I also changed the default RowBlock sizes in various merge-related paths to
    be a power of 2. This should increase the likelihood of hitting the
    BitmapCopy "fast path" during a RowBlock copy, though it doesn't guarantee
    it (e.g. consider a merge where two sub-iterators overlap for 63/128 rows,
    then one sub-iterator has an additional 65 rows: the BitmapCopy operations
    in the block copy could not be byte-aligned). It yielded a slight
    improvement in microbenchmarks, though not in the macrobenchmark.
    
    Finally, I tweaked the heuristic for whole-block-copying such that it only
    happens when there's more than one row to copy. I don't totally buy the
    rationale, but it did yield an improvement in both the micro and macro
    benchmarks, so I must be onto something.
    
    Below are the micro and macrobenchmark results. The microbenchmark was
    generic_iterators-test's overlapping and non-overlapping MergeIterator tests
    with 10 iterations, 1000 lists, and 10000 rows per list, collecting the
    wallclock time to scan and averaging it across the runs. The macrobenchmark
    was running 'kudu perf tablet_scan' on a representative 40GB tablet six
    times, dropping the first time (to reduce cache effects), and averaging the
    remaining wallclock times.
    
    no whole-block-copy:
    - non-overlapping: 0.9437
    - overlapping: 9.5113
    - representative tablet: 786.732
    
    whole-block-copy, no power-of-2 RowBlocks
    - non-overlapping: 0.6301
    - overlapping: 9.7518
    - representative tablet: 751.297
    
    whole-block-copy, power-of-2 RowBlocks
    - non-overlapping: 0.5316
    - overlapping: 9.5718
    - representative tablet: 754.961
    
    whole-block-copy, power-of-2 RowBlocks, only copy when >1 rows left:
    - non-overlapping: 0.5247
    - overlapping: 9.1287
    - representative tablet: 720.534
    
    Todd and I discussed another possible optimization which I want to note here
    for posterity: rather than copying blocks, we could "attach" the data to the
    client's RowbBlock and serialize it to the wire directly. The attachment
    could be RowBlock-based by allowing RowBlocks to work like a refcounted
    "iovec", or it could be done more deeply to enable direct serialization of
    cached decoder data. Either way, anything that helps us avoid copying data
    is likely to be a performance win.
    
    Change-Id: I3a4b56169644f35f1aa8aef27d4af0ce4ec0792d
    Reviewed-on: http://gerrit.cloudera.org:8080/13011
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/common/generic_iterators-test.cc |   9 +-
 src/kudu/common/generic_iterators.cc      | 171 ++++++++++++++++++++----------
 src/kudu/common/rowblock.h                |   7 +-
 src/kudu/tserver/tablet_service.cc        |   4 +-
 4 files changed, 129 insertions(+), 62 deletions(-)

diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 1a8ad83..d170ffa 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -405,7 +405,7 @@ void TestMerge(const Schema& schema,
     vector<IterWithBounds> to_merge;
     for (const auto& list : all_ints) {
       unique_ptr<VectorIterator> vec_it(new VectorIterator(list.ints, list.is_deleted, schema));
-      vec_it->set_block_size(10);
+      vec_it->set_block_size(16);
       vec_it->set_selection_vector(list.sv.get());
       unique_ptr<RowwiseIterator> mat_it(NewMaterializingIterator(std::move(vec_it)));
       IterWithBounds mat_iwb;
@@ -431,7 +431,9 @@ void TestMerge(const Schema& schema,
           MergeIteratorOptions(include_deleted_rows), std::move(to_merge)));
       ASSERT_OK(merger->Init(&spec));
 
-      RowBlock dst(&schema, 100, nullptr);
+      // The RowBlock is sized to a power of 2 to improve BitmapCopy performance
+      // when copying another RowBlock into it.
+      RowBlock dst(&schema, 128, nullptr);
       size_t total_idx = 0;
       auto expected_iter = expected.cbegin();
       while (merger->HasNext()) {
@@ -440,6 +442,9 @@ void TestMerge(const Schema& schema,
           "if HasNext() returns true, must return some rows";
 
         for (int i = 0; i < dst.nrows(); i++) {
+          if (!dst.selection_vector()->IsRowSelected(i)) {
+            continue;
+          }
           ASSERT_NE(expected.end(), expected_iter);
           int64_t expected_key = expected_iter->first;
           bool expected_is_deleted = expected_iter->second;
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index 85a463c..8752ab5 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -99,20 +99,19 @@ void AddIterStats(const RowwiseIterator& iter,
 // MergeIterator
 ////////////////////////////////////////////////////////////
 
+// This is sized to a power of 2 to improve BitmapCopy performance when copying
+// a RowBlock.
+//
 // TODO(todd): this should be sized by # bytes, not # rows.
-static const int kMergeRowBuffer = 1000;
+static const int kMergeRowBuffer = 1024;
 
 // MergeIterState wraps a RowwiseIterator for use by the MergeIterator.
-// Importantly, it also filters out unselected rows from the wrapped
-// RowwiseIterator such that all returned rows are valid.
 class MergeIterState : public boost::intrusive::list_base_hook<> {
  public:
   explicit MergeIterState(IterWithBounds iwb)
       : iwb_(std::move(iwb)),
         arena_(1024),
-        next_row_idx_(0),
-        rows_advanced_(0),
-        rows_valid_(0)
+        next_row_idx_(0)
   {}
 
   // Fetches the next row from the iterator's current block, or the iterator's
@@ -121,7 +120,7 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
   // Does not advance the iterator.
   const RowBlockRow& next_row() const {
     if (read_block_) {
-      DCHECK_LT(rows_advanced_, rows_valid_);
+      DCHECK_LT(next_row_idx_, read_block_->nrows());
       return next_row_;
     }
     DCHECK(decoded_bounds_);
@@ -146,7 +145,7 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
   //
   // Decoded bound allocations are done against 'decoded_bounds_arena'.
   Status Init(Arena* decoded_bounds_arena) {
-    CHECK_EQ(0, rows_valid_);
+    DCHECK(!read_block_);
 
     if (iwb_.encoded_bounds) {
       decoded_bounds_.emplace(&schema(), decoded_bounds_arena);
@@ -165,31 +164,43 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
 
   // Returns true if the underlying iterator is fully exhausted.
   bool IsFullyExhausted() const {
-    return rows_valid_ == 0 && !iwb_.iter->HasNext();
+    return (!read_block_ || read_block_->nrows() == 0) && !iwb_.iter->HasNext();
   }
 
-  // Advance to the next row in the underlying iterator.
+  // Advances to the next row in the underlying iterator.
   //
   // If successful, 'pulled_new_block' is true if this block was exhausted and a
   // new block was pulled from the underlying iterator.
-  Status Advance(bool* pulled_new_block);
+  Status Advance(size_t num_rows, bool* pulled_new_block);
 
-  // Add statistics about the underlying iterator to the given vector.
+  // Adds statistics about the underlying iterator to the given vector.
   void AddStats(vector<IteratorStats>* stats) const {
     AddIterStats(*iwb_.iter, stats);
   }
 
+  // Returns the number of rows remaining in the current block.
+  size_t remaining_in_block() const {
+    DCHECK(read_block_);
+    return read_block_->nrows() - next_row_idx_;
+  }
+
   // Returns the schema from the underlying iterator.
   const Schema& schema() const {
     return iwb_.iter->schema();
   }
 
-  // Pull the next block from the underlying iterator.
+  // Pulls the next block from the underlying iterator.
   Status PullNextBlock();
 
+  // Copies as many rows as possible from the current block of buffered rows to
+  // 'dst' (starting at 'dst_offset').
+  //
+  // If successful, 'num_rows_copied' will be set to the number of rows copied.
+  Status CopyBlock(RowBlock* dst, size_t dst_offset, size_t* num_rows_copied);
+
   // Returns true if the current block in the underlying iterator is exhausted.
   bool IsBlockExhausted() const {
-    return rows_advanced_ == rows_valid_;
+    return !read_block_ || read_block_->nrows() == next_row_idx_;
   }
 
   string ToString() const {
@@ -233,37 +244,34 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
   // The last row available in read_block_.
   RowBlockRow last_row_;
 
-  // Row index of next_row_ in read_block_.
+  // The index of the row currently pointed to by the iterator. Guaranteed to be
+  // a selected row.
   size_t next_row_idx_;
 
-  // Number of rows we've advanced past in read_block_.
-  size_t rows_advanced_;
-
-  // Number of valid (selected) rows in read_block_.
-  size_t rows_valid_;
-
   DISALLOW_COPY_AND_ASSIGN(MergeIterState);
 };
 
-Status MergeIterState::Advance(bool* pulled_new_block) {
-  rows_advanced_++;
-  if (IsBlockExhausted()) {
-    arena_.Reset();
-    RETURN_NOT_OK(PullNextBlock());
-    *pulled_new_block = true;
+Status MergeIterState::Advance(size_t num_rows, bool* pulled_new_block) {
+  DCHECK_GE(read_block_->nrows(), next_row_idx_ + num_rows);
+
+  next_row_idx_ += num_rows;
+
+  // If advancing didn't exhaust this block outright, find the next selected row.
+  size_t idx;
+  if (!IsBlockExhausted() &&
+      read_block_->selection_vector()->FindFirstRowSelected(next_row_idx_, &idx)) {
+    next_row_idx_ = idx;
+    next_row_.Reset(read_block_.get(), next_row_idx_);
+    *pulled_new_block = false;
     return Status::OK();
   }
 
-  // Seek to the next selected row.
-  SelectionVector *selection = read_block_->selection_vector();
-  for (++next_row_idx_; next_row_idx_ < read_block_->nrows(); next_row_idx_++) {
-    if (selection->IsRowSelected(next_row_idx_)) {
-      next_row_.Reset(read_block_.get(), next_row_idx_);
-      break;
-    }
-  }
-  DCHECK_NE(next_row_idx_, read_block_->nrows()) << "No selected rows found!";
-  *pulled_new_block = false;
+  // We either exhausted the block outright, or all subsequent rows were
+  // deselected. Either way, we need to pull the next block.
+  next_row_idx_ = read_block_->nrows();
+  arena_.Reset();
+  RETURN_NOT_OK(PullNextBlock());
+  *pulled_new_block = true;
   return Status::OK();
 }
 
@@ -276,26 +284,26 @@ Status MergeIterState::PullNextBlock() {
   }
   while (iwb_.iter->HasNext()) {
     RETURN_NOT_OK(iwb_.iter->NextBlock(read_block_.get()));
-    rows_advanced_ = 0;
-    // Honor the selection vector of the read_block_, since not all rows are necessarily selected.
+
     SelectionVector *selection = read_block_->selection_vector();
     DCHECK_EQ(selection->nrows(), read_block_->nrows());
     DCHECK_LE(selection->CountSelected(), read_block_->nrows());
-    rows_valid_ = selection->CountSelected();
-    VLOG(2) << Substitute("$0/$1 rows selected", rows_valid_, read_block_->nrows());
-    if (rows_valid_ == 0) {
+    size_t rows_valid = selection->CountSelected();
+    VLOG(2) << Substitute("$0/$1 rows selected", rows_valid, read_block_->nrows());
+    if (rows_valid == 0) {
       // Short-circuit: this block is entirely unselected and can be skipped.
       continue;
     }
 
     // Seek next_row_ and last_row_ to the first and last selected rows
     // respectively (which could be identical).
-    //
+
+    CHECK(selection->FindFirstRowSelected(0, &next_row_idx_));
+    next_row_.Reset(read_block_.get(), next_row_idx_);
+
     // We use a signed size_t type to avoid underflowing when finding last_row_.
     //
     // TODO(adar): this can be simplified if there was a BitmapFindLastSet().
-    CHECK(selection->FindFirstRowSelected(&next_row_idx_));
-    next_row_.Reset(read_block_.get(), next_row_idx_);
     for (ssize_t row_idx = read_block_->nrows() - 1; row_idx >= 0; row_idx--) {
       if (selection->IsRowSelected(row_idx)) {
         last_row_.Reset(read_block_.get(), row_idx);
@@ -308,8 +316,25 @@ Status MergeIterState::PullNextBlock() {
   }
 
   // The underlying iterator is fully exhausted.
-  rows_advanced_ = 0;
-  rows_valid_ = 0;
+  next_row_idx_ = 0;
+  read_block_.reset();
+  return Status::OK();
+}
+
+Status MergeIterState::CopyBlock(RowBlock* dst, size_t dst_offset,
+                                 size_t* num_rows_copied) {
+  DCHECK(read_block_);
+  DCHECK(!IsBlockExhausted());
+
+  size_t num_rows_to_copy = std::min(remaining_in_block(),
+                                     dst->nrows() - dst_offset);
+  VLOG(3) << Substitute(
+      "Copying $0 rows from RowBlock (s:$1,o:$2) to RowBlock (s:$3,o:$4): $5",
+      num_rows_to_copy, read_block_->nrows(), next_row_idx_, dst->nrows(),
+      dst_offset, ToString());
+  RETURN_NOT_OK(read_block_->CopyTo(dst, next_row_idx_,
+                                    dst_offset, num_rows_to_copy));
+  *num_rows_copied = num_rows_to_copy;
   return Status::OK();
 }
 
@@ -448,6 +473,13 @@ class MergeIterator : public RowwiseIterator {
   virtual Status NextBlock(RowBlock* dst) OVERRIDE;
 
  private:
+  // Materializes as much of the next block's worth of data into 'dst' at offset
+  // 'dst_row_idx' as possible. Only permitted when there's only one
+  // sub-iterator in the hot heap.
+  //
+  // On success, the selection vector in 'dst' and 'dst_row_idx' are both updated.
+  Status MaterializeBlock(RowBlock* dst, size_t* dst_row_idx);
+
   // Finds the next row and materializes it into 'dst' at offset 'dst_row_idx'.
   //
   // On success, the selection vector in 'dst' and 'dst_row_idx' are both updated.
@@ -457,9 +489,9 @@ class MergeIterator : public RowwiseIterator {
   // iterators if necessary and setting up additional per-iterator bookkeeping.
   Status InitSubIterators(ScanSpec *spec);
 
-  // Advances to the next row in 'state', destroying it and/or updating the
-  // three heaps if necessary.
-  Status AdvanceAndReheap(MergeIterState* state);
+  // Advances 'state' by 'num_rows_to_advance', destroying it and/or updating
+  // the three heaps if necessary.
+  Status AdvanceAndReheap(MergeIterState* state, size_t num_rows_to_advance);
 
   // Moves sub-iterators from cold_ to hot_ if they now overlap with the merge
   // window. Should be called whenever the merge window moves.
@@ -644,9 +676,10 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) {
   return Status::OK();
 }
 
-Status MergeIterator::AdvanceAndReheap(MergeIterState* state) {
+Status MergeIterator::AdvanceAndReheap(MergeIterState* state,
+                                       size_t num_rows_to_advance) {
   bool pulled_new_block;
-  RETURN_NOT_OK(state->Advance(&pulled_new_block));
+  RETURN_NOT_OK(state->Advance(num_rows_to_advance, &pulled_new_block));
   hot_.pop();
 
   // Note that hotmaxes_ is not yet popped as it's not necessary to do so if the
@@ -736,7 +769,7 @@ void MergeIterator::DestroySubIterator(MergeIterState* state) {
 }
 
 Status MergeIterator::NextBlock(RowBlock* dst) {
-  VLOG(3) << "Called NextBlock on " << ToString();
+  VLOG(3) << "Called NextBlock (" << dst->nrows() << " rows) on " << ToString();
   CHECK(initted_);
   DCHECK_SCHEMA_EQ(*dst->schema(), schema());
 
@@ -751,8 +784,21 @@ Status MergeIterator::NextBlock(RowBlock* dst) {
       DCHECK(states_.empty());
       break;
     }
-    // TODO(adar): When hot_.size() == 1, materialize an entire block.
-    RETURN_NOT_OK(MaterializeOneRow(dst, &dst_row_idx));
+
+    // If there's just one hot sub-iterator, we can copy its entire block
+    // instead of copying row-by-row.
+    //
+    // When N sub-iterators fully overlap, there'll only be one hot sub-iterator
+    // when consuming the very last row. A block copy for this case is more
+    // overhead than just copying out the last row.
+    //
+    // TODO(adar): this can be further optimized by "attaching" data to 'dst'
+    // rather than copying it.
+    if (hot_.size() == 1 && hot_.top()->remaining_in_block() > 1) {
+      RETURN_NOT_OK(MaterializeBlock(dst, &dst_row_idx));
+    } else {
+      RETURN_NOT_OK(MaterializeOneRow(dst, &dst_row_idx));
+    }
   }
 
   if (dst_row_idx < dst->nrows()) {
@@ -762,6 +808,19 @@ Status MergeIterator::NextBlock(RowBlock* dst) {
   return Status::OK();
 }
 
+Status MergeIterator::MaterializeBlock(RowBlock* dst, size_t* dst_row_idx) {
+  DCHECK_EQ(1, hot_.size());
+
+  MergeIterState* state = hot_.top();
+  size_t num_rows_copied;
+  RETURN_NOT_OK(state->CopyBlock(dst, *dst_row_idx, &num_rows_copied));
+  RETURN_NOT_OK(AdvanceAndReheap(state, num_rows_copied));
+
+  // CopyBlock() already updated dst's SelectionVector.
+  *dst_row_idx += num_rows_copied;
+  return Status::OK();
+}
+
 // TODO(todd): this is an obvious spot to add codegen - there's a ton of branching
 // and such around the comparisons. A simple experiment indicated there's some
 // 2x to be gained.
@@ -835,7 +894,7 @@ Status MergeIterator::MaterializeOneRow(RowBlock* dst, size_t* dst_row_idx) {
 
   // Advance all matching sub-iterators and remove any that are exhausted.
   for (auto& s : smallest) {
-    RETURN_NOT_OK(AdvanceAndReheap(s));
+    RETURN_NOT_OK(AdvanceAndReheap(s, /*num_rows_to_advance=*/1));
   }
 
   dst->selection_vector()->SetRowSelected(*dst_row_idx);
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 660c272..8d3c4ec 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -92,13 +92,14 @@ class SelectionVector {
     BitmapClear(&bitmap_[0], row);
   }
 
-  // Finds the first selected row.
+  // Finds the first selected row beginning at 'row_offset'.
   //
   // Returns true if at least one row is selected and writes its index to 'row';
   // returns false otherwise.
-  bool FindFirstRowSelected(size_t* row) {
+  bool FindFirstRowSelected(size_t row_offset, size_t* row) {
+    DCHECK_LT(row_offset, n_rows_);
     DCHECK(row);
-    return BitmapFindFirstSet(&bitmap_[0], 0, n_rows_, row);
+    return BitmapFindFirstSet(&bitmap_[0], row_offset, n_rows_, row);
   }
 
   uint8_t *mutable_bitmap() {
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 1335b5d..f3f78be 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -118,7 +118,9 @@ DEFINE_int32(scanner_max_batch_size_bytes, 8 * 1024 * 1024,
 TAG_FLAG(scanner_max_batch_size_bytes, advanced);
 TAG_FLAG(scanner_max_batch_size_bytes, runtime);
 
-DEFINE_int32(scanner_batch_size_rows, 100,
+// The default value is sized to a power of 2 to improve BitmapCopy performance
+// when copying a RowBlock (in ORDERED scans).
+DEFINE_int32(scanner_batch_size_rows, 128,
              "The number of rows to batch for servicing scan requests.");
 TAG_FLAG(scanner_batch_size_rows, advanced);
 TAG_FLAG(scanner_batch_size_rows, runtime);