You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/04/10 03:46:54 UTC

[kudu] branch master updated: generic_iterators: switch MergeIterator to intrusive list of states

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 28f7dac  generic_iterators: switch MergeIterator to intrusive list of states
28f7dac is described below

commit 28f7dacd9d07f68a7bd7dbe40dda365de8b26eb8
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Thu Mar 14 22:48:13 2019 -0700

    generic_iterators: switch MergeIterator to intrusive list of states
    
    I originally wanted this for dominance because, in that world, a state is
    either linked to the main list or to another state's dominated list. We're
    going to go in a different direction, but an intrusive list still makes
    sense because:
    1. We don't need random access into states_, and
    2. An intrusive list will yield O(1) erase without increasing the total
       number of allocations.
    
    I also changed the MergeIterator tests to use int64_t instead of uint32_t
    for values, as the latter reliably overflowed the non-overlapped input test
    when num_lists >= 30.
    
    Change-Id: I1a165858e37c3e0a6ef85e46e20078c264fa8a65
    Reviewed-on: http://gerrit.cloudera.org:8080/12944
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/generic_iterators-test.cc | 76 +++++++++++++++++++----------
 src/kudu/common/generic_iterators.cc      | 80 +++++++++++++++----------------
 2 files changed, 89 insertions(+), 67 deletions(-)

diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 10592d2..3662299 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -72,10 +72,10 @@ namespace kudu {
 struct IteratorStats;
 
 static const int kValColIdx = 0; // Index of 'val' column in these test schemas.
-static const Schema kIntSchema({ ColumnSchema("val", UINT32) },
+static const Schema kIntSchema({ ColumnSchema("val", INT64) },
                                /*key_columns=*/1);
 static const bool kIsDeletedReadDefault = false;
-static const Schema kIntSchemaWithVCol({ ColumnSchema("val", UINT32),
+static const Schema kIntSchemaWithVCol({ ColumnSchema("val", INT64),
                                          ColumnSchema("is_deleted", IS_DELETED,
                                                       /*is_nullable=*/false,
                                                       /*read_default=*/&kIsDeletedReadDefault) },
@@ -85,7 +85,7 @@ static const Schema kIntSchemaWithVCol({ ColumnSchema("val", UINT32),
 // vector.
 class VectorIterator : public ColumnwiseIterator {
  public:
-  VectorIterator(vector<uint32_t> ints, vector<uint8_t> is_deleted, Schema schema)
+  VectorIterator(vector<int64_t> ints, vector<uint8_t> is_deleted, Schema schema)
       : ints_(std::move(ints)),
         is_deleted_(std::move(is_deleted)),
         schema_(std::move(schema)),
@@ -95,7 +95,7 @@ class VectorIterator : public ColumnwiseIterator {
     CHECK_EQ(ints_.size(), is_deleted_.size());
   }
 
-  explicit VectorIterator(const vector<uint32_t>& ints)
+  explicit VectorIterator(const vector<int64_t>& ints)
       : VectorIterator(ints, vector<uint8_t>(ints.size()), kIntSchema) {
   }
 
@@ -144,7 +144,7 @@ class VectorIterator : public ColumnwiseIterator {
     DCHECK_LE(prepared_, ctx->block()->nrows());
 
     switch (ctx->block()->type_info()->physical_type()) {
-      case UINT32:
+      case INT64:
         for (size_t i = 0; i < prepared_; i++) {
           ctx->block()->SetCellValue(i, &(ints_[cur_idx_ + i]));
         }
@@ -184,7 +184,7 @@ class VectorIterator : public ColumnwiseIterator {
   }
 
  private:
-  vector<uint32_t> ints_;
+  vector<int64_t> ints_;
   // We use vector<uint8_t> instead of vector<bool> to represent the IS_DELETED
   // column so we can call ColumnBlock::SetCellValue() in MaterializeColumn(),
   // whose API requires taking an address to a non-temporary for the value.
@@ -225,24 +225,50 @@ TEST(TestMergeIterator, TestMergeEmptyViaSelectionVector) {
   ASSERT_FALSE(merger->HasNext());
 }
 
+// Tests that if we stop using a MergeIterator with several elements remaining,
+// it is cleaned up properly.
+TEST(TestMergeIterator, TestNotConsumedCleanup) {
+  unique_ptr<VectorIterator> vec1(new VectorIterator({ 1 }));
+  unique_ptr<VectorIterator> vec2(new VectorIterator({ 2 }));
+  unique_ptr<VectorIterator> vec3(new VectorIterator({ 3 }));
+
+  vector<unique_ptr<RowwiseIterator>> input;
+  input.emplace_back(NewMaterializingIterator(std::move(vec1)));
+  input.emplace_back(NewMaterializingIterator(std::move(vec2)));
+  input.emplace_back(NewMaterializingIterator(std::move(vec3)));
+  unique_ptr<RowwiseIterator> merger(NewMergeIterator(
+      MergeIteratorOptions(/*include_deleted_rows=*/false), std::move(input)));
+  ASSERT_OK(merger->Init(nullptr));
+
+  ASSERT_TRUE(merger->HasNext());
+  RowBlock dst(&kIntSchema, 1, nullptr);
+  ASSERT_OK(merger->NextBlock(&dst));
+  ASSERT_EQ(1, dst.nrows());
+  ASSERT_TRUE(merger->HasNext());
+
+  // Let the MergeIterator go out of scope with some remaining elements.
+}
+
 class TestIntRangePredicate {
  public:
-  TestIntRangePredicate(uint32_t lower, uint32_t upper, const ColumnSchema& column)
+  TestIntRangePredicate(int64_t lower, int64_t upper, const ColumnSchema& column)
       : lower_(lower),
         upper_(upper),
         pred_(ColumnPredicate::Range(column, &lower_, &upper_)) {
   }
 
-  TestIntRangePredicate(uint32_t lower, uint32_t upper)
+  TestIntRangePredicate(int64_t lower, int64_t upper)
       : TestIntRangePredicate(lower, upper, kIntSchema.column(0)) {
   }
 
-  uint32_t lower_, upper_;
+  int64_t lower_, upper_;
   ColumnPredicate pred_;
 };
 
-void TestMerge(const Schema& schema, const TestIntRangePredicate &predicate,
-               bool overlapping_ranges = true, bool include_deleted_rows = false) {
+void TestMerge(const Schema& schema,
+               const TestIntRangePredicate &predicate,
+               bool overlapping_ranges = true,
+               bool include_deleted_rows = false) {
   struct List {
     explicit List(int num_rows)
         : sv(new SelectionVector(num_rows)) {
@@ -250,26 +276,26 @@ void TestMerge(const Schema& schema, const TestIntRangePredicate &predicate,
       is_deleted.reserve(num_rows);
     }
 
-    vector<uint32_t> ints;
+    vector<int64_t> ints;
     vector<uint8_t> is_deleted;
     unique_ptr<SelectionVector> sv;
   };
 
   vector<List> all_ints;
-  map<uint32_t, bool> expected;
-  unordered_set<uint32_t> seen_live;
+  map<int64_t, bool> expected;
+  unordered_set<int64_t> seen_live;
   Random prng(SeedRandom());
 
-  uint32_t entry = 0;
+  int64_t entry = 0;
   for (int i = 0; i < FLAGS_num_lists; i++) {
     List list(FLAGS_num_rows);
-    unordered_set<uint32_t> seen_this_list;
+    unordered_set<int64_t> seen_this_list;
 
     if (overlapping_ranges) {
       entry = 0;
     }
     for (int j = 0; j < FLAGS_num_rows; j++) {
-      uint32_t potential;
+      int64_t potential;
       bool is_deleted = false;
       // The merge iterator does not support duplicate non-deleted keys.
       while (true) {
@@ -374,9 +400,9 @@ void TestMerge(const Schema& schema, const TestIntRangePredicate &predicate,
 
         for (int i = 0; i < dst.nrows(); i++) {
           ASSERT_NE(expected.end(), expected_iter);
-          uint32_t expected_key = expected_iter->first;
+          int64_t expected_key = expected_iter->first;
           bool expected_is_deleted = expected_iter->second;
-          uint32_t row_key = *schema.ExtractColumnFromRow<UINT32>(dst.row(i), kValColIdx);
+          int64_t row_key = *schema.ExtractColumnFromRow<INT64>(dst.row(i), kValColIdx);
           ASSERT_GE(row_key, predicate.lower_) << "Yielded integer excluded by predicate";
           ASSERT_LT(row_key, predicate.upper_) << "Yielded integer excluded by predicate";
           EXPECT_EQ(expected_key, row_key) << "Yielded out of order at idx " << total_idx;
@@ -403,12 +429,12 @@ void TestMerge(const Schema& schema, const TestIntRangePredicate &predicate,
 }
 
 TEST(TestMergeIterator, TestMerge) {
-  TestIntRangePredicate predicate(0, MathLimits<uint32_t>::kMax);
+  TestIntRangePredicate predicate(0, MathLimits<int64_t>::kMax);
   NO_FATALS(TestMerge(kIntSchema, predicate));
 }
 
 TEST(TestMergeIterator, TestMergeNonOverlapping) {
-  TestIntRangePredicate predicate(0, MathLimits<uint32_t>::kMax);
+  TestIntRangePredicate predicate(0, MathLimits<int64_t>::kMax);
   NO_FATALS(TestMerge(kIntSchema, predicate, /*overlapping_ranges=*/false));
 }
 
@@ -422,12 +448,12 @@ TEST(TestMergeIterator, TestMergePredicate) {
 // This predicate excludes the first half of the rows but accepts the
 // second half.
 TEST(TestMergeIterator, TestMergePredicate2) {
-  TestIntRangePredicate predicate(FLAGS_num_rows / 2, MathLimits<uint32_t>::kMax);
+  TestIntRangePredicate predicate(FLAGS_num_rows / 2, MathLimits<int64_t>::kMax);
   NO_FATALS(TestMerge(kIntSchema, predicate));
 }
 
 TEST(TestMergeIterator, TestDeDupGhostRows) {
-  TestIntRangePredicate match_all_pred(0, MathLimits<uint32_t>::kMax);
+  TestIntRangePredicate match_all_pred(0, MathLimits<int64_t>::kMax);
   NO_FATALS(TestMerge(kIntSchemaWithVCol, match_all_pred,
                       /*overlapping_ranges=*/true,
                       /*include_deleted_rows=*/true));
@@ -441,7 +467,7 @@ TEST(TestMaterializingIterator, TestMaterializingPredicatePushdown) {
   spec.AddPredicate(pred1.pred_);
   LOG(INFO) << "Predicate: " << pred1.pred_.ToString();
 
-  vector<uint32_t> ints(100);
+  vector<int64_t> ints(100);
   for (int i = 0; i < 100; i++) {
     ints[i] = i;
   }
@@ -472,7 +498,7 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) {
   spec.AddPredicate(pred1.pred_);
   LOG(INFO) << "Predicate: " << pred1.pred_.ToString();
 
-  vector<uint32_t> ints(100);
+  vector<int64_t> ints(100);
   for (int i = 0; i < 100; i++) {
     ints[i] = i;
   }
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index 0230fc4..8659bc2 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -30,6 +30,9 @@
 #include <unordered_set>
 #include <utility>
 
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/list_hook.hpp>
+
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
@@ -96,9 +99,9 @@ void AddIterStats(const RowwiseIterator& iter,
 static const int kMergeRowBuffer = 1000;
 
 // MergeIterState wraps a RowwiseIterator for use by the MergeIterator.
-// Importantly, it also filters out unselected rows from the wrapped RowwiseIterator,
-// such that all returned rows are valid.
-class MergeIterState {
+// Importantly, it also filters out unselected rows from the wrapped
+// RowwiseIterator such that all returned rows are valid.
+class MergeIterState : public boost::intrusive::list_base_hook<> {
  public:
   explicit MergeIterState(unique_ptr<RowwiseIterator> iter) :
       iter_(std::move(iter)),
@@ -155,8 +158,15 @@ class MergeIterState {
     return rows_advanced_ == rows_valid_;
   }
 
+  // Underlying iterator.
   unique_ptr<RowwiseIterator> iter_;
+
+  // Allocates memory for read_block_.
   Arena arena_;
+
+  // Current block of buffered rows from the iterator.
+  //
+  // The memory backing the rows was allocated out of the arena.
   RowBlock read_block_;
 
   // The row currently pointed to by the iterator.
@@ -275,10 +285,10 @@ class MergeIterator : public RowwiseIterator {
   // This is required because we can't create a MergeIterState of an uninitialized iterator.
   vector<unique_ptr<RowwiseIterator>> orig_iters_;
 
-  // See UnionIterator::states_lock_ for details on locking. This follows the same
+  // See UnionIterator::iters_lock_ for details on locking. This follows the same
   // pattern.
   mutable rw_spinlock states_lock_;
-  vector<unique_ptr<MergeIterState>> states_;
+  boost::intrusive::list<MergeIterState> states_; // NOLINT(*)
 
   // Statistics (keyed by projection column index) accumulated so far by any
   // fully-consumed sub-iterators.
@@ -307,7 +317,9 @@ MergeIterator::MergeIterator(MergeIteratorOptions opts,
   CHECK_GT(orig_iters_.size(), 0);
 }
 
-MergeIterator::~MergeIterator() {}
+MergeIterator::~MergeIterator() {
+  states_.clear_and_dispose([](MergeIterState* s) { delete s; });
+}
 
 Status MergeIterator::Init(ScanSpec *spec) {
   CHECK(!initted_);
@@ -323,7 +335,7 @@ Status MergeIterator::Init(ScanSpec *spec) {
   // TODO(adar): establish dominance between iterators and only initialize
   // non-dominated iterators.
   for (auto& s : states_) {
-    RETURN_NOT_OK(s->Init());
+    RETURN_NOT_OK(s.Init());
   }
 
   // Verify that the schemas match in debug builds.
@@ -331,15 +343,15 @@ Status MergeIterator::Init(ScanSpec *spec) {
   // It's important to do the verification after initializing the iterators, as
   // they may not know their own schemas until they've been initialized (in the
   // case of a union of unions).
-  schema_.reset(new Schema(states_.front()->schema()));
+  schema_.reset(new Schema(states_.front().schema()));
   CHECK_GT(schema_->num_key_columns(), 0);
   finished_iter_stats_by_col_.resize(schema_->num_columns());
 #ifndef NDEBUG
   for (const auto& s : states_) {
-    if (!s->schema().Equals(*schema_)) {
+    if (!s.schema().Equals(*schema_)) {
       return Status::InvalidArgument(
           Substitute("Schemas do not match: $0 vs. $1",
-                     schema_->ToString(), s->schema().ToString()));
+                     schema_->ToString(), s.schema().ToString()));
     }
   }
 #endif
@@ -353,11 +365,9 @@ Status MergeIterator::Init(ScanSpec *spec) {
   // Before we copy any rows, clean up any iterators which were empty
   // to start with. Otherwise, HasNext() won't properly return false
   // if we were passed only empty iterators.
-  states_.erase(
-      remove_if(states_.begin(), states_.end(), [] (const unique_ptr<MergeIterState>& iter) {
-        return PREDICT_FALSE(iter->IsFullyExhausted());
-      }),
-      states_.end());
+  states_.remove_and_dispose_if(
+      [](const MergeIterState& s) { return PREDICT_FALSE(s.IsFullyExhausted()); },
+      [](MergeIterState* s) { delete s; });
 
   initted_ = true;
   return Status::OK();
@@ -373,7 +383,8 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) {
   for (auto& i : orig_iters_) {
     ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
     RETURN_NOT_OK(InitAndMaybeWrap(&i, spec_copy));
-    states_.emplace_back(unique_ptr<MergeIterState>(new MergeIterState(std::move(i))));
+    unique_ptr<MergeIterState> state(new MergeIterState(std::move(i)));
+    states_.push_back(*state.release());
   }
   orig_iters_.clear();
 
@@ -404,7 +415,8 @@ void MergeIterator::PrepareBatch(RowBlock* dst) {
   // in the currently queued up blocks.
   size_t available = 0;
   for (const auto& s : states_) {
-    available += s->remaining_in_block();
+    if (available >= dst->row_capacity()) break;
+    available += s.remaining_in_block();
   }
 
   dst->Resize(std::min(dst->row_capacity(), available));
@@ -431,7 +443,7 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
 
     // Typically the number of states_ is not that large, so using a priority
     // queue is not worth it.
-    for (const auto& iter : states_) {
+    for (auto& iter : states_) {
       // To merge in row key order, we need to consume the smallest row at any
       // given time. We locate that row by peeking at the next row in each of
       // the states_ iterators, which includes all possible candidates for the
@@ -440,7 +452,7 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
       if (!smallest.empty()) {
         // If we have a candidate for smallest row, compare it against the
         // smallest row in each iterator.
-        cmp = schema_->Compare(iter->next_row(), smallest[0]->next_row());
+        cmp = schema_->Compare(iter.next_row(), smallest[0]->next_row());
         num_comparisons_++;
       }
       if (smallest.empty() || cmp < 0) {
@@ -448,11 +460,11 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
         // smaller than the previously-smallest, replace the smallest with the
         // new row found.
         smallest.clear();
-        smallest.emplace_back(iter.get());
+        smallest.emplace_back(&iter);
       } else if (!smallest.empty() && cmp == 0) {
         // If we have found a duplicate of the smallest row, at least one must
         // be a ghost row. Collect all duplicates in order to merge them later.
-        smallest.emplace_back(iter.get());
+        smallest.emplace_back(&iter);
       }
     }
 
@@ -499,31 +511,15 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
     RowBlockRow dst_row = dst->row(dst_row_idx++);
     RETURN_NOT_OK(CopyRow(row_to_return_iter->next_row(), &dst_row, dst->arena()));
 
-    // Advance all matching sub-iterators and mark exhausted sub-iterators for
-    // removal.
-    unordered_set<MergeIterState*> exhausted;
+    // Advance all matching sub-iterators and remove any that are exhausted.
     for (MergeIterState* s : smallest) {
       RETURN_NOT_OK(s->Advance());
       if (s->IsFullyExhausted()) {
-        InsertOrDie(&exhausted, s);
-      }
-    }
-
-    // Remove the exhausted sub-iterators.
-    if (!exhausted.empty()) {
-      std::lock_guard<rw_spinlock> l(states_lock_);
-      for (MergeIterState* s : exhausted) {
+        std::lock_guard<rw_spinlock> l(states_lock_);
         s->AddStats(&finished_iter_stats_by_col_);
+        states_.erase_and_dispose(states_.iterator_to(*s),
+                                  [](MergeIterState* s) { delete s; });
       }
-      // TODO(mpercy): Consider making removal O(1) per element to remove by
-      // using a different data structure for 'states_'. The below
-      // erase-remove idiom gives us O(n) removal on a vector for an
-      // arbitrary number of elements to remove at once.
-      states_.erase(std::remove_if(states_.begin(), states_.end(),
-                                   [&exhausted](const unique_ptr<MergeIterState>& state) {
-                                     return ContainsKey(exhausted, state.get());
-                                   }),
-                    states_.end());
     }
   }
 
@@ -552,7 +548,7 @@ void MergeIterator::GetIteratorStats(vector<IteratorStats>* stats) const {
   *stats = finished_iter_stats_by_col_;
 
   for (const auto& s : states_) {
-    s->AddStats(stats);
+    s.AddStats(stats);
   }
 }