You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/04/10 03:46:54 UTC
[kudu] branch master updated: generic_iterators: switch
MergeIterator to intrusive list of states
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 28f7dac generic_iterators: switch MergeIterator to intrusive list of states
28f7dac is described below
commit 28f7dacd9d07f68a7bd7dbe40dda365de8b26eb8
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Thu Mar 14 22:48:13 2019 -0700
generic_iterators: switch MergeIterator to intrusive list of states
I originally wanted this for dominance because, in that world, a state is
either linked to the main list or to another state's dominated list. We're
going to go in a different direction, but an intrusive list still makes
sense because:
1. We don't need random access into states_, and
2. An intrusive list will yield O(1) erase without increasing the total
number of allocations.
I also changed the MergeIterator tests to use int64_t instead of uint32_t
for values, as the latter reliably overflowed the non-overlapped input test
when num_lists >= 30.
Change-Id: I1a165858e37c3e0a6ef85e46e20078c264fa8a65
Reviewed-on: http://gerrit.cloudera.org:8080/12944
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Kudu Jenkins
---
src/kudu/common/generic_iterators-test.cc | 76 +++++++++++++++++++----------
src/kudu/common/generic_iterators.cc | 80 +++++++++++++++----------------
2 files changed, 89 insertions(+), 67 deletions(-)
diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 10592d2..3662299 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -72,10 +72,10 @@ namespace kudu {
struct IteratorStats;
static const int kValColIdx = 0; // Index of 'val' column in these test schemas.
-static const Schema kIntSchema({ ColumnSchema("val", UINT32) },
+static const Schema kIntSchema({ ColumnSchema("val", INT64) },
/*key_columns=*/1);
static const bool kIsDeletedReadDefault = false;
-static const Schema kIntSchemaWithVCol({ ColumnSchema("val", UINT32),
+static const Schema kIntSchemaWithVCol({ ColumnSchema("val", INT64),
ColumnSchema("is_deleted", IS_DELETED,
/*is_nullable=*/false,
/*read_default=*/&kIsDeletedReadDefault) },
@@ -85,7 +85,7 @@ static const Schema kIntSchemaWithVCol({ ColumnSchema("val", UINT32),
// vector.
class VectorIterator : public ColumnwiseIterator {
public:
- VectorIterator(vector<uint32_t> ints, vector<uint8_t> is_deleted, Schema schema)
+ VectorIterator(vector<int64_t> ints, vector<uint8_t> is_deleted, Schema schema)
: ints_(std::move(ints)),
is_deleted_(std::move(is_deleted)),
schema_(std::move(schema)),
@@ -95,7 +95,7 @@ class VectorIterator : public ColumnwiseIterator {
CHECK_EQ(ints_.size(), is_deleted_.size());
}
- explicit VectorIterator(const vector<uint32_t>& ints)
+ explicit VectorIterator(const vector<int64_t>& ints)
: VectorIterator(ints, vector<uint8_t>(ints.size()), kIntSchema) {
}
@@ -144,7 +144,7 @@ class VectorIterator : public ColumnwiseIterator {
DCHECK_LE(prepared_, ctx->block()->nrows());
switch (ctx->block()->type_info()->physical_type()) {
- case UINT32:
+ case INT64:
for (size_t i = 0; i < prepared_; i++) {
ctx->block()->SetCellValue(i, &(ints_[cur_idx_ + i]));
}
@@ -184,7 +184,7 @@ class VectorIterator : public ColumnwiseIterator {
}
private:
- vector<uint32_t> ints_;
+ vector<int64_t> ints_;
// We use vector<uint8_t> instead of vector<bool> to represent the IS_DELETED
// column so we can call ColumnBlock::SetCellValue() in MaterializeColumn(),
// whose API requires taking an address to a non-temporary for the value.
@@ -225,24 +225,50 @@ TEST(TestMergeIterator, TestMergeEmptyViaSelectionVector) {
ASSERT_FALSE(merger->HasNext());
}
+// Tests that if we stop using a MergeIterator with several elements remaining,
+// it is cleaned up properly.
+TEST(TestMergeIterator, TestNotConsumedCleanup) {
+ unique_ptr<VectorIterator> vec1(new VectorIterator({ 1 }));
+ unique_ptr<VectorIterator> vec2(new VectorIterator({ 2 }));
+ unique_ptr<VectorIterator> vec3(new VectorIterator({ 3 }));
+
+ vector<unique_ptr<RowwiseIterator>> input;
+ input.emplace_back(NewMaterializingIterator(std::move(vec1)));
+ input.emplace_back(NewMaterializingIterator(std::move(vec2)));
+ input.emplace_back(NewMaterializingIterator(std::move(vec3)));
+ unique_ptr<RowwiseIterator> merger(NewMergeIterator(
+ MergeIteratorOptions(/*include_deleted_rows=*/false), std::move(input)));
+ ASSERT_OK(merger->Init(nullptr));
+
+ ASSERT_TRUE(merger->HasNext());
+ RowBlock dst(&kIntSchema, 1, nullptr);
+ ASSERT_OK(merger->NextBlock(&dst));
+ ASSERT_EQ(1, dst.nrows());
+ ASSERT_TRUE(merger->HasNext());
+
+ // Let the MergeIterator go out of scope with some remaining elements.
+}
+
class TestIntRangePredicate {
public:
- TestIntRangePredicate(uint32_t lower, uint32_t upper, const ColumnSchema& column)
+ TestIntRangePredicate(int64_t lower, int64_t upper, const ColumnSchema& column)
: lower_(lower),
upper_(upper),
pred_(ColumnPredicate::Range(column, &lower_, &upper_)) {
}
- TestIntRangePredicate(uint32_t lower, uint32_t upper)
+ TestIntRangePredicate(int64_t lower, int64_t upper)
: TestIntRangePredicate(lower, upper, kIntSchema.column(0)) {
}
- uint32_t lower_, upper_;
+ int64_t lower_, upper_;
ColumnPredicate pred_;
};
-void TestMerge(const Schema& schema, const TestIntRangePredicate &predicate,
- bool overlapping_ranges = true, bool include_deleted_rows = false) {
+void TestMerge(const Schema& schema,
+ const TestIntRangePredicate &predicate,
+ bool overlapping_ranges = true,
+ bool include_deleted_rows = false) {
struct List {
explicit List(int num_rows)
: sv(new SelectionVector(num_rows)) {
@@ -250,26 +276,26 @@ void TestMerge(const Schema& schema, const TestIntRangePredicate &predicate,
is_deleted.reserve(num_rows);
}
- vector<uint32_t> ints;
+ vector<int64_t> ints;
vector<uint8_t> is_deleted;
unique_ptr<SelectionVector> sv;
};
vector<List> all_ints;
- map<uint32_t, bool> expected;
- unordered_set<uint32_t> seen_live;
+ map<int64_t, bool> expected;
+ unordered_set<int64_t> seen_live;
Random prng(SeedRandom());
- uint32_t entry = 0;
+ int64_t entry = 0;
for (int i = 0; i < FLAGS_num_lists; i++) {
List list(FLAGS_num_rows);
- unordered_set<uint32_t> seen_this_list;
+ unordered_set<int64_t> seen_this_list;
if (overlapping_ranges) {
entry = 0;
}
for (int j = 0; j < FLAGS_num_rows; j++) {
- uint32_t potential;
+ int64_t potential;
bool is_deleted = false;
// The merge iterator does not support duplicate non-deleted keys.
while (true) {
@@ -374,9 +400,9 @@ void TestMerge(const Schema& schema, const TestIntRangePredicate &predicate,
for (int i = 0; i < dst.nrows(); i++) {
ASSERT_NE(expected.end(), expected_iter);
- uint32_t expected_key = expected_iter->first;
+ int64_t expected_key = expected_iter->first;
bool expected_is_deleted = expected_iter->second;
- uint32_t row_key = *schema.ExtractColumnFromRow<UINT32>(dst.row(i), kValColIdx);
+ int64_t row_key = *schema.ExtractColumnFromRow<INT64>(dst.row(i), kValColIdx);
ASSERT_GE(row_key, predicate.lower_) << "Yielded integer excluded by predicate";
ASSERT_LT(row_key, predicate.upper_) << "Yielded integer excluded by predicate";
EXPECT_EQ(expected_key, row_key) << "Yielded out of order at idx " << total_idx;
@@ -403,12 +429,12 @@ void TestMerge(const Schema& schema, const TestIntRangePredicate &predicate,
}
TEST(TestMergeIterator, TestMerge) {
- TestIntRangePredicate predicate(0, MathLimits<uint32_t>::kMax);
+ TestIntRangePredicate predicate(0, MathLimits<int64_t>::kMax);
NO_FATALS(TestMerge(kIntSchema, predicate));
}
TEST(TestMergeIterator, TestMergeNonOverlapping) {
- TestIntRangePredicate predicate(0, MathLimits<uint32_t>::kMax);
+ TestIntRangePredicate predicate(0, MathLimits<int64_t>::kMax);
NO_FATALS(TestMerge(kIntSchema, predicate, /*overlapping_ranges=*/false));
}
@@ -422,12 +448,12 @@ TEST(TestMergeIterator, TestMergePredicate) {
// This predicate excludes the first half of the rows but accepts the
// second half.
TEST(TestMergeIterator, TestMergePredicate2) {
- TestIntRangePredicate predicate(FLAGS_num_rows / 2, MathLimits<uint32_t>::kMax);
+ TestIntRangePredicate predicate(FLAGS_num_rows / 2, MathLimits<int64_t>::kMax);
NO_FATALS(TestMerge(kIntSchema, predicate));
}
TEST(TestMergeIterator, TestDeDupGhostRows) {
- TestIntRangePredicate match_all_pred(0, MathLimits<uint32_t>::kMax);
+ TestIntRangePredicate match_all_pred(0, MathLimits<int64_t>::kMax);
NO_FATALS(TestMerge(kIntSchemaWithVCol, match_all_pred,
/*overlapping_ranges=*/true,
/*include_deleted_rows=*/true));
@@ -441,7 +467,7 @@ TEST(TestMaterializingIterator, TestMaterializingPredicatePushdown) {
spec.AddPredicate(pred1.pred_);
LOG(INFO) << "Predicate: " << pred1.pred_.ToString();
- vector<uint32_t> ints(100);
+ vector<int64_t> ints(100);
for (int i = 0; i < 100; i++) {
ints[i] = i;
}
@@ -472,7 +498,7 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) {
spec.AddPredicate(pred1.pred_);
LOG(INFO) << "Predicate: " << pred1.pred_.ToString();
- vector<uint32_t> ints(100);
+ vector<int64_t> ints(100);
for (int i = 0; i < 100; i++) {
ints[i] = i;
}
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index 0230fc4..8659bc2 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -30,6 +30,9 @@
#include <unordered_set>
#include <utility>
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/list_hook.hpp>
+
#include <gflags/gflags.h>
#include <glog/logging.h>
@@ -96,9 +99,9 @@ void AddIterStats(const RowwiseIterator& iter,
static const int kMergeRowBuffer = 1000;
// MergeIterState wraps a RowwiseIterator for use by the MergeIterator.
-// Importantly, it also filters out unselected rows from the wrapped RowwiseIterator,
-// such that all returned rows are valid.
-class MergeIterState {
+// Importantly, it also filters out unselected rows from the wrapped
+// RowwiseIterator such that all returned rows are valid.
+class MergeIterState : public boost::intrusive::list_base_hook<> {
public:
explicit MergeIterState(unique_ptr<RowwiseIterator> iter) :
iter_(std::move(iter)),
@@ -155,8 +158,15 @@ class MergeIterState {
return rows_advanced_ == rows_valid_;
}
+ // Underlying iterator.
unique_ptr<RowwiseIterator> iter_;
+
+ // Allocates memory for read_block_.
Arena arena_;
+
+ // Current block of buffered rows from the iterator.
+ //
+ // The memory backing the rows was allocated out of the arena.
RowBlock read_block_;
// The row currently pointed to by the iterator.
@@ -275,10 +285,10 @@ class MergeIterator : public RowwiseIterator {
// This is required because we can't create a MergeIterState of an uninitialized iterator.
vector<unique_ptr<RowwiseIterator>> orig_iters_;
- // See UnionIterator::states_lock_ for details on locking. This follows the same
+ // See UnionIterator::iters_lock_ for details on locking. This follows the same
// pattern.
mutable rw_spinlock states_lock_;
- vector<unique_ptr<MergeIterState>> states_;
+ boost::intrusive::list<MergeIterState> states_; // NOLINT(*)
// Statistics (keyed by projection column index) accumulated so far by any
// fully-consumed sub-iterators.
@@ -307,7 +317,9 @@ MergeIterator::MergeIterator(MergeIteratorOptions opts,
CHECK_GT(orig_iters_.size(), 0);
}
-MergeIterator::~MergeIterator() {}
+MergeIterator::~MergeIterator() {
+ states_.clear_and_dispose([](MergeIterState* s) { delete s; });
+}
Status MergeIterator::Init(ScanSpec *spec) {
CHECK(!initted_);
@@ -323,7 +335,7 @@ Status MergeIterator::Init(ScanSpec *spec) {
// TODO(adar): establish dominance between iterators and only initialize
// non-dominated iterators.
for (auto& s : states_) {
- RETURN_NOT_OK(s->Init());
+ RETURN_NOT_OK(s.Init());
}
// Verify that the schemas match in debug builds.
@@ -331,15 +343,15 @@ Status MergeIterator::Init(ScanSpec *spec) {
// It's important to do the verification after initializing the iterators, as
// they may not know their own schemas until they've been initialized (in the
// case of a union of unions).
- schema_.reset(new Schema(states_.front()->schema()));
+ schema_.reset(new Schema(states_.front().schema()));
CHECK_GT(schema_->num_key_columns(), 0);
finished_iter_stats_by_col_.resize(schema_->num_columns());
#ifndef NDEBUG
for (const auto& s : states_) {
- if (!s->schema().Equals(*schema_)) {
+ if (!s.schema().Equals(*schema_)) {
return Status::InvalidArgument(
Substitute("Schemas do not match: $0 vs. $1",
- schema_->ToString(), s->schema().ToString()));
+ schema_->ToString(), s.schema().ToString()));
}
}
#endif
@@ -353,11 +365,9 @@ Status MergeIterator::Init(ScanSpec *spec) {
// Before we copy any rows, clean up any iterators which were empty
// to start with. Otherwise, HasNext() won't properly return false
// if we were passed only empty iterators.
- states_.erase(
- remove_if(states_.begin(), states_.end(), [] (const unique_ptr<MergeIterState>& iter) {
- return PREDICT_FALSE(iter->IsFullyExhausted());
- }),
- states_.end());
+ states_.remove_and_dispose_if(
+ [](const MergeIterState& s) { return PREDICT_FALSE(s.IsFullyExhausted()); },
+ [](MergeIterState* s) { delete s; });
initted_ = true;
return Status::OK();
@@ -373,7 +383,8 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) {
for (auto& i : orig_iters_) {
ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
RETURN_NOT_OK(InitAndMaybeWrap(&i, spec_copy));
- states_.emplace_back(unique_ptr<MergeIterState>(new MergeIterState(std::move(i))));
+ unique_ptr<MergeIterState> state(new MergeIterState(std::move(i)));
+ states_.push_back(*state.release());
}
orig_iters_.clear();
@@ -404,7 +415,8 @@ void MergeIterator::PrepareBatch(RowBlock* dst) {
// in the currently queued up blocks.
size_t available = 0;
for (const auto& s : states_) {
- available += s->remaining_in_block();
+ if (available >= dst->row_capacity()) break;
+ available += s.remaining_in_block();
}
dst->Resize(std::min(dst->row_capacity(), available));
@@ -431,7 +443,7 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
// Typically the number of states_ is not that large, so using a priority
// queue is not worth it.
- for (const auto& iter : states_) {
+ for (auto& iter : states_) {
// To merge in row key order, we need to consume the smallest row at any
// given time. We locate that row by peeking at the next row in each of
// the states_ iterators, which includes all possible candidates for the
@@ -440,7 +452,7 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
if (!smallest.empty()) {
// If we have a candidate for smallest row, compare it against the
// smallest row in each iterator.
- cmp = schema_->Compare(iter->next_row(), smallest[0]->next_row());
+ cmp = schema_->Compare(iter.next_row(), smallest[0]->next_row());
num_comparisons_++;
}
if (smallest.empty() || cmp < 0) {
@@ -448,11 +460,11 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
// smaller than the previously-smallest, replace the smallest with the
// new row found.
smallest.clear();
- smallest.emplace_back(iter.get());
+ smallest.emplace_back(&iter);
} else if (!smallest.empty() && cmp == 0) {
// If we have found a duplicate of the smallest row, at least one must
// be a ghost row. Collect all duplicates in order to merge them later.
- smallest.emplace_back(iter.get());
+ smallest.emplace_back(&iter);
}
}
@@ -499,31 +511,15 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
RowBlockRow dst_row = dst->row(dst_row_idx++);
RETURN_NOT_OK(CopyRow(row_to_return_iter->next_row(), &dst_row, dst->arena()));
- // Advance all matching sub-iterators and mark exhausted sub-iterators for
- // removal.
- unordered_set<MergeIterState*> exhausted;
+ // Advance all matching sub-iterators and remove any that are exhausted.
for (MergeIterState* s : smallest) {
RETURN_NOT_OK(s->Advance());
if (s->IsFullyExhausted()) {
- InsertOrDie(&exhausted, s);
- }
- }
-
- // Remove the exhausted sub-iterators.
- if (!exhausted.empty()) {
- std::lock_guard<rw_spinlock> l(states_lock_);
- for (MergeIterState* s : exhausted) {
+ std::lock_guard<rw_spinlock> l(states_lock_);
s->AddStats(&finished_iter_stats_by_col_);
+ states_.erase_and_dispose(states_.iterator_to(*s),
+ [](MergeIterState* s) { delete s; });
}
- // TODO(mpercy): Consider making removal O(1) per element to remove by
- // using a different data structure for 'states_'. The below
- // erase-remove idiom gives us O(n) removal on a vector for an
- // arbitrary number of elements to remove at once.
- states_.erase(std::remove_if(states_.begin(), states_.end(),
- [&exhausted](const unique_ptr<MergeIterState>& state) {
- return ContainsKey(exhausted, state.get());
- }),
- states_.end());
}
}
@@ -552,7 +548,7 @@ void MergeIterator::GetIteratorStats(vector<IteratorStats>* stats) const {
*stats = finished_iter_stats_by_col_;
for (const auto& s : states_) {
- s->AddStats(stats);
+ s.AddStats(stats);
}
}