You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/04/17 21:10:18 UTC

[kudu] 05/05: generic_iterators: refactor MergeIterator for whole-block-copy optimization

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit a4f37461dbb9655f13bbc8447d6c46b2d27e0d03
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Thu Apr 11 20:51:15 2019 -0700

    generic_iterators: refactor MergeIterator for whole-block-copy optimization
    
    This patch refactors MergeIterator::MaterializeBlock in preparation for the
    whole-block-copy optimization. Specifically, the "copy next row" and
    "advance iter and rebalance heaps" operations have been decomposed into
    standalone functions so that it'll be easier to add "copy whole block" as
    an equivalent function in the future.
    
    Also of note: we no longer set the entire client selection vector up front,
    as this won't necessarily be the case when we copy a block.
    
    Change-Id: I050116edc51bb3e91852e6286c221175770e6382
    Reviewed-on: http://gerrit.cloudera.org:8080/13010
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/common/generic_iterators.cc | 251 ++++++++++++++++++-----------------
 1 file changed, 128 insertions(+), 123 deletions(-)

diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index ed501a2..85a463c 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -268,7 +268,7 @@ Status MergeIterState::Advance(bool* pulled_new_block) {
 }
 
 Status MergeIterState::PullNextBlock() {
-  CHECK_EQ(rows_advanced_, rows_valid_)
+  CHECK(IsBlockExhausted())
       << "should not pull next block until current block is exhausted";
 
   if (!read_block_) {
@@ -448,10 +448,19 @@ class MergeIterator : public RowwiseIterator {
   virtual Status NextBlock(RowBlock* dst) OVERRIDE;
 
  private:
-  void PrepareBatch(RowBlock* dst);
-  Status MaterializeBlock(RowBlock* dst);
+  // Finds the next row and materializes it into 'dst' at offset 'dst_row_idx'.
+  //
+  // On success, the selection vector in 'dst' and 'dst_row_idx' are both updated.
+  Status MaterializeOneRow(RowBlock* dst, size_t* dst_row_idx);
+
+  // Calls Init() on all of sub-iterators, wrapping them in predicate evaluating
+  // iterators if necessary and setting up additional per-iterator bookkeeping.
   Status InitSubIterators(ScanSpec *spec);
 
+  // Advances to the next row in 'state', destroying it and/or updating the
+  // three heaps if necessary.
+  Status AdvanceAndReheap(MergeIterState* state);
+
   // Moves sub-iterators from cold_ to hot_ if they now overlap with the merge
   // window. Should be called whenever the merge window moves.
   Status RefillHotHeap();
@@ -635,6 +644,46 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) {
   return Status::OK();
 }
 
+Status MergeIterator::AdvanceAndReheap(MergeIterState* state) {
+  bool pulled_new_block;
+  RETURN_NOT_OK(state->Advance(&pulled_new_block));
+  hot_.pop();
+
+  // Note that hotmaxes_ is not yet popped as it's not necessary to do so if the
+  // merge window hasn't changed. Thus, we can avoid some work by deferring it
+  // into the cases below.
+
+  if (state->IsFullyExhausted()) {
+    hotmaxes_.pop();
+    DestroySubIterator(state);
+
+    // This sub-iterator's removal means the end of the merge window may have shifted.
+    RETURN_NOT_OK(RefillHotHeap());
+  } else if (pulled_new_block) {
+    hotmaxes_.pop();
+
+    // This sub-iterator has a new block, which means the end of the merge window
+    //may have shifted.
+    if (!hotmaxes_.empty() &&
+        schema_->Compare(hotmaxes_.top(), state->next_row()) < 0) {
+      // The new block lies beyond the new end of the merge window.
+      VLOG(2) << "Block finished, became cold: " << state->ToString();
+      cold_.push(state);
+    } else {
+      // The new block is still within the merge window.
+      VLOG(2) << "Block finished, still hot: " << state->ToString();
+      hot_.push(state);
+      hotmaxes_.push(state->last_row());
+    }
+    RETURN_NOT_OK(RefillHotHeap());
+  } else {
+    // The sub-iterator's block's upper bound remains the same; the merge window
+    // has not changed.
+    hot_.push(state);
+  }
+  return Status::OK();
+}
+
 Status MergeIterator::RefillHotHeap() {
   VLOG(2) << "Refilling hot heap";
   while (!cold_.empty() &&
@@ -691,31 +740,10 @@ Status MergeIterator::NextBlock(RowBlock* dst) {
   CHECK(initted_);
   DCHECK_SCHEMA_EQ(*dst->schema(), schema());
 
-  PrepareBatch(dst);
-  RETURN_NOT_OK(MaterializeBlock(dst));
-
-  return Status::OK();
-}
-
-void MergeIterator::PrepareBatch(RowBlock* dst) {
   if (dst->arena()) {
     dst->arena()->Reset();
   }
-}
 
-// TODO(todd): this is an obvious spot to add codegen - there's a ton of branching
-// and such around the comparisons. A simple experiment indicated there's some
-// 2x to be gained.
-Status MergeIterator::MaterializeBlock(RowBlock *dst) {
-  // We need a vector to track the iterators whose next_row() contains the
-  // smallest row key at a given moment during the merge because there may be
-  // multiple deleted rows with the same row key across multiple rowsets, and
-  // up to one live instance, that we have to deduplicate.
-  vector<MergeIterState*> smallest(hot_.size());
-
-  // Initialize the selection vector.
-  // MergeIterState only returns selected rows.
-  dst->selection_vector()->SetAllTrue();
   size_t dst_row_idx = 0;
   while (dst_row_idx < dst->nrows()) {
     // If the hot heap is empty, we must be out of sub-iterators.
@@ -723,118 +751,95 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
       DCHECK(states_.empty());
       break;
     }
+    // TODO(adar): When hot_.size() == 1, materialize an entire block.
+    RETURN_NOT_OK(MaterializeOneRow(dst, &dst_row_idx));
+  }
+
+  if (dst_row_idx < dst->nrows()) {
+    dst->Resize(dst_row_idx);
+  }
 
-    // TODO(adar): optimize the case where hot_.size == 1.
+  return Status::OK();
+}
 
-    // Find the set of sub-iterators whose matching next row keys are the
-    // smallest across all sub-iterators.
-    //
-    // Note: heap ordered iteration isn't the same as a total ordering. For
-    // example, the two absolute smallest keys might be in the same sub-iterator
-    // rather than in the first two sub-iterators yielded via ordered iteration.
-    // However, the goal here is to identify a group of matching keys for the
-    // purpose of deduplication, and we're guaranteed that such matching keys
-    // cannot exist in the same sub-iterator (i.e. the same rowset).
-    smallest.clear();
-    for (auto iter = hot_.ordered_begin(); iter != hot_.ordered_end(); ++iter) {
-      MergeIterState* state = *iter;
-      if (!smallest.empty() &&
-          schema_->Compare(state->next_row(), smallest[0]->next_row()) != 0) {
-        break;
-      }
-      smallest.emplace_back(state);
+// TODO(todd): this is an obvious spot to add codegen - there's a ton of branching
+// and such around the comparisons. A simple experiment indicated there's some
+// 2x to be gained.
+Status MergeIterator::MaterializeOneRow(RowBlock* dst, size_t* dst_row_idx) {
+  // We need a vector to track the iterators whose next_row() contains the
+  // smallest row key at a given moment during the merge because there may be
+  // multiple deleted rows with the same row key across multiple rowsets, and up
+  // to one live instance, that we have to deduplicate.
+  vector<MergeIterState*> smallest;
+  smallest.reserve(hot_.size());
+
+  // Find the set of sub-iterators whose matching next row keys are the smallest
+  // across all sub-iterators.
+  //
+  // Note: heap ordered iteration isn't the same as a total ordering. For
+  // example, the two absolute smallest keys might be in the same sub-iterator
+  // rather than in the first two sub-iterators yielded via ordered iteration.
+  // However, the goal here is to identify a group of matching keys for the
+  // purpose of deduplication, and we're guaranteed that such matching keys
+  // cannot exist in the same sub-iterator (i.e. the same rowset).
+  for (auto iter = hot_.ordered_begin(); iter != hot_.ordered_end(); ++iter) {
+    MergeIterState* state = *iter;
+    if (!smallest.empty() &&
+        schema_->Compare(state->next_row(), smallest[0]->next_row()) != 0) {
+      break;
     }
+    smallest.emplace_back(state);
+  }
 
-    MergeIterState* row_to_return_iter = nullptr;
-    if (!opts_.include_deleted_rows) {
-      // Since deleted rows are not included here, there can only be a single
-      // instance of any given row key in 'smallest'.
-      CHECK_EQ(1, smallest.size()) << "expected only a single smallest row";
-      row_to_return_iter = smallest[0];
-    } else {
-      // Deduplicate any deleted rows. Row instance de-duplication criteria:
-      // 1. If there is a non-deleted instance, return that instance.
-      // 2. If all rows are deleted, any instance will suffice because we
-      //    don't guarantee that we will return valid field values for deleted
-      //    rows.
-      int live_rows_found = 0;
-      for (const auto& s : smallest) {
-        bool is_deleted =
-            *schema_->ExtractColumnFromRow<IS_DELETED>(s->next_row(), is_deleted_col_index_);
-        if (!is_deleted) {
-          // We found the single live instance of the row.
-          row_to_return_iter = s;
+  MergeIterState* row_to_return_iter = nullptr;
+  if (!opts_.include_deleted_rows) {
+    // Since deleted rows are not included here, there can only be a single
+    // instance of any given row key in 'smallest'.
+    CHECK_EQ(1, smallest.size()) << "expected only a single smallest row";
+    row_to_return_iter = smallest[0];
+  } else {
+    // Deduplicate any deleted rows. Row instance de-duplication criteria:
+    // 1. If there is a non-deleted instance, return that instance.
+    // 2. If all rows are deleted, any instance will suffice because we
+    //    don't guarantee that we will return valid field values for deleted
+    //    rows.
+    int live_rows_found = 0;
+    for (const auto& s : smallest) {
+      bool is_deleted =
+          *schema_->ExtractColumnFromRow<IS_DELETED>(s->next_row(), is_deleted_col_index_);
+      if (!is_deleted) {
+        // We found the single live instance of the row.
+        row_to_return_iter = s;
 #ifndef NDEBUG
-          live_rows_found++; // In DEBUG mode, do a sanity-check count of the live rows.
+        live_rows_found++; // In DEBUG mode, do a sanity-check count of the live rows.
 #else
-          break; // In RELEASE mode, short-circuit the loop.
+        break; // In RELEASE mode, short-circuit the loop.
 #endif
-        }
-      }
-      DCHECK_LE(live_rows_found, 1) << "expected at most one live row";
-
-      // If all instances of a given row are deleted then return an arbitrary
-      // deleted instance.
-      if (row_to_return_iter == nullptr) {
-        row_to_return_iter = smallest[0];
-        DCHECK(*schema_->ExtractColumnFromRow<IS_DELETED>(row_to_return_iter->next_row(),
-                                                          is_deleted_col_index_))
-            << "expected deleted row";
       }
     }
-    VLOG(3) << Substitute("Copying row $0 from $1",
-                          dst_row_idx, row_to_return_iter->ToString());
-    RowBlockRow dst_row = dst->row(dst_row_idx++);
-    RETURN_NOT_OK(CopyRow(row_to_return_iter->next_row(), &dst_row, dst->arena()));
-
-    // Advance all matching sub-iterators and remove any that are exhausted.
-    for (auto& s : smallest) {
-      bool pulled_new_block;
-      RETURN_NOT_OK(s->Advance(&pulled_new_block));
-      hot_.pop();
-
-      // Note that hotmaxes_ is not yet popped as it's not necessary to do so if
-      // the merge window hasn't changed. Thus, we can avoid some work by
-      // deferring it into the cases below.
-
-      if (s->IsFullyExhausted()) {
-        hotmaxes_.pop();
-        DestroySubIterator(s);
-
-        // This sub-iterator's removal means the end of the merge window may
-        // have shifted.
-        RETURN_NOT_OK(RefillHotHeap());
-      } else if (pulled_new_block) {
-        hotmaxes_.pop();
-
-        // This sub-iterator has a new block, which means the end of the merge
-        // window may have shifted.
-        if (!hotmaxes_.empty() && schema_->Compare(hotmaxes_.top(), s->next_row()) < 0) {
-          // The new block lies beyond the new end of the merge window.
-          VLOG(2) << "Block finished, became cold: " << s->ToString();
-          cold_.push(s);
-        } else {
-          // The new block is still within the merge window.
-          VLOG(2) << "Block finished, still hot: " << s->ToString();
-          hot_.push(s);
-          hotmaxes_.push(s->last_row());
-        }
-        RETURN_NOT_OK(RefillHotHeap());
-      } else {
-        // The sub-iterator's block's upper bound remains the same; the merge
-        // window has not changed.
-        hot_.push(s);
-      }
+    DCHECK_LE(live_rows_found, 1) << "expected at most one live row";
+
+    // If all instances of a given row are deleted then return an arbitrary
+    // deleted instance.
+    if (row_to_return_iter == nullptr) {
+      row_to_return_iter = smallest[0];
+      DCHECK(*schema_->ExtractColumnFromRow<IS_DELETED>(row_to_return_iter->next_row(),
+                                                        is_deleted_col_index_))
+          << "expected deleted row";
     }
   }
-
-  // The number of rows actually copied to the destination RowBlock may be less
-  // than its original capacity due to deduplication of ghost rows.
-  DCHECK_LE(dst_row_idx, dst->nrows());
-  if (dst_row_idx < dst->nrows()) {
-    dst->Resize(dst_row_idx);
+  VLOG(3) << Substitute("Copying row $0 from $1",
+                        *dst_row_idx, row_to_return_iter->ToString());
+  RowBlockRow dst_row = dst->row(*dst_row_idx);
+  RETURN_NOT_OK(CopyRow(row_to_return_iter->next_row(), &dst_row, dst->arena()));
+
+  // Advance all matching sub-iterators and remove any that are exhausted.
+  for (auto& s : smallest) {
+    RETURN_NOT_OK(AdvanceAndReheap(s));
   }
 
+  dst->selection_vector()->SetRowSelected(*dst_row_idx);
+  (*dst_row_idx)++;
   return Status::OK();
 }