You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/01/24 01:12:01 UTC
[kudu] 04/05: generic_iterators: assorted cleanup
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 111f1843084dedf378898310fcf3e85f1e38a7e1
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Tue Nov 27 15:19:33 2018 -0800
generic_iterators: assorted cleanup
This patch cleans up the MergeIterator and makes it look more like the
UnionIterator:
1. Addressed a long-standing TODO about schema verification.
2. Extracted the schema from the sub-iterators.
Change-Id: I7f1fd5f4cd1a8ef621d3ac994e764dfd19e99544
Reviewed-on: http://gerrit.cloudera.org:8080/12156
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
---
src/kudu/common/generic_iterators-test.cc | 4 +-
src/kudu/common/generic_iterators.cc | 133 ++++++++++++++++++------------
src/kudu/common/generic_iterators.h | 33 ++++----
src/kudu/tablet/rowset.cc | 2 +-
src/kudu/tablet/tablet.cc | 2 +-
5 files changed, 100 insertions(+), 74 deletions(-)
diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 4bf9732..61c4317 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -144,7 +144,7 @@ TEST(TestMergeIterator, TestMergeEmpty) {
shared_ptr<RowwiseIterator> iter(
new MaterializingIterator(
shared_ptr<ColumnwiseIterator>(new VectorIterator({}))));
- MergeIterator merger(kIntSchema, { std::move(iter) });
+ MergeIterator merger({ std::move(iter) });
ASSERT_OK(merger.Init(nullptr));
ASSERT_FALSE(merger.HasNext());
}
@@ -206,7 +206,7 @@ void TestMerge(const TestIntRangePredicate &predicate) {
LOG(INFO) << "Predicate: " << predicate.pred_.ToString();
LOG_TIMING(INFO, "Iterate merged lists") {
- MergeIterator merger(kIntSchema, std::move(to_merge));
+ MergeIterator merger(std::move(to_merge));
ASSERT_OK(merger.Init(&spec));
RowBlock dst(kIntSchema, 100, nullptr);
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index eb6ebe6..06c4f25 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include "kudu/common/generic_iterators.h"
+
#include <unistd.h>
#include <algorithm>
@@ -32,13 +34,12 @@
#include "kudu/common/column_materialization_context.h"
#include "kudu/common/column_predicate.h"
#include "kudu/common/columnblock.h"
-#include "kudu/common/generic_iterators.h"
#include "kudu/common/iterator_stats.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
-#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/memory/arena.h"
@@ -52,6 +53,7 @@ using std::sort;
using std::string;
using std::unique_ptr;
using std::vector;
+using strings::Substitute;
DEFINE_bool(materializing_iterator_do_pushdown, true,
"Should MaterializingIterator do predicate pushdown");
@@ -127,6 +129,11 @@ class MergeIterState {
return rows_valid_ - rows_advanced_;
}
+ // Returns the schema from the underlying iterator.
+ const Schema& schema() const {
+ return iter_->schema();
+ }
+
private:
// Pull the next block from the underlying iterator.
Status PullNextBlock();
@@ -204,38 +211,58 @@ Status MergeIterState::PullNextBlock() {
return Status::OK();
}
-MergeIterator::MergeIterator(
- const Schema& schema,
- vector<shared_ptr<RowwiseIterator>> iters)
- : schema_(schema),
- initted_(false),
+MergeIterator::MergeIterator(vector<shared_ptr<RowwiseIterator>> iters)
+ : initted_(false),
orig_iters_(std::move(iters)),
- finished_iter_stats_by_col_(schema_.num_columns()),
num_orig_iters_(orig_iters_.size()) {
CHECK_GT(orig_iters_.size(), 0);
- CHECK_GT(schema.num_key_columns(), 0);
}
MergeIterator::~MergeIterator() {}
Status MergeIterator::Init(ScanSpec *spec) {
CHECK(!initted_);
- // TODO(todd): check that schemas match up!
+ // Initialize the iterators and construct the per-iterator merge states.
+ //
+ // When this method finishes, orig_iters_ has been cleared and states_ has
+ // been populated.
RETURN_NOT_OK(InitSubIterators(spec));
- for (unique_ptr<MergeIterState> &state : iters_) {
- RETURN_NOT_OK(state->Init());
+ // Retrieve every iterator's first block.
+ //
+ // TODO(adar): establish dominance between iterators and only initialize
+ // non-dominated iterators.
+ for (auto& s : states_) {
+ RETURN_NOT_OK(s->Init());
+ }
+
+ // Verify that the schemas match in debug builds.
+ //
+ // It's important to do the verification after initializing the iterators, as
+ // they may not know their own schemas until they've been initialized (in the
+ // case of a union of unions).
+ schema_.reset(new Schema(states_.front()->schema()));
+ CHECK_GT(schema_->num_key_columns(), 0);
+ finished_iter_stats_by_col_.resize(schema_->num_columns());
+#ifndef NDEBUG
+ for (const auto& s : states_) {
+ if (!s->schema().Equals(*schema_)) {
+ return Status::InvalidArgument(
+ Substitute("Schemas do not match: $0 vs. $1",
+ schema_->ToString(), s->schema().ToString()));
+ }
}
+#endif
// Before we copy any rows, clean up any iterators which were empty
// to start with. Otherwise, HasNext() won't properly return false
// if we were passed only empty iterators.
- iters_.erase(
- remove_if(iters_.begin(), iters_.end(), [] (const unique_ptr<MergeIterState>& iter) {
+ states_.erase(
+ remove_if(states_.begin(), states_.end(), [] (const unique_ptr<MergeIterState>& iter) {
return PREDICT_FALSE(iter->IsFullyExhausted());
}),
- iters_.end());
+ states_.end());
initted_ = true;
return Status::OK();
@@ -243,15 +270,15 @@ Status MergeIterator::Init(ScanSpec *spec) {
bool MergeIterator::HasNext() const {
CHECK(initted_);
- return !iters_.empty();
+ return !states_.empty();
}
Status MergeIterator::InitSubIterators(ScanSpec *spec) {
// Initialize all the sub iterators.
- for (shared_ptr<RowwiseIterator> &iter : orig_iters_) {
+ for (auto& i : orig_iters_) {
ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
- RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&iter, spec_copy));
- iters_.push_back(unique_ptr<MergeIterState>(new MergeIterState(std::move(iter))));
+ RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&i, spec_copy));
+ states_.push_back(unique_ptr<MergeIterState>(new MergeIterState(std::move(i))));
}
orig_iters_.clear();
@@ -281,8 +308,8 @@ void MergeIterator::PrepareBatch(RowBlock* dst) {
// We can always provide at least as many rows as are remaining
// in the currently queued up blocks.
size_t available = 0;
- for (unique_ptr<MergeIterState> &iter : iters_) {
- available += iter->remaining_in_block();
+ for (const auto& s : states_) {
+ available += s->remaining_in_block();
}
dst->Resize(std::min(dst->row_capacity(), available));
@@ -302,13 +329,13 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
MergeIterState *smallest = nullptr;
ssize_t smallest_idx = -1;
- // Typically the number of iters_ is not that large, so using a priority
+ // Typically the number of states_ is not that large, so using a priority
// queue is not worth it
- for (size_t i = 0; i < iters_.size(); i++) {
- unique_ptr<MergeIterState> &state = iters_[i];
+ for (size_t i = 0; i < states_.size(); i++) {
+ unique_ptr<MergeIterState> &state = states_[i];
if (smallest == nullptr ||
- schema_.Compare(state->next_row(), smallest->next_row()) < 0) {
+ schema_->Compare(state->next_row(), smallest->next_row()) < 0) {
smallest = state.get();
smallest_idx = i;
}
@@ -322,9 +349,9 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
RETURN_NOT_OK(smallest->Advance());
if (smallest->IsFullyExhausted()) {
- std::lock_guard<rw_spinlock> l(iters_lock_);
+ std::lock_guard<rw_spinlock> l(states_lock_);
smallest->AddStats(&finished_iter_stats_by_col_);
- iters_.erase(iters_.begin() + smallest_idx);
+ states_.erase(states_.begin() + smallest_idx);
}
}
@@ -332,21 +359,21 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
}
string MergeIterator::ToString() const {
- return strings::Substitute("Merge($0 iters)", num_orig_iters_);
+ return Substitute("Merge($0 iters)", num_orig_iters_);
}
const Schema& MergeIterator::schema() const {
CHECK(initted_);
- return schema_;
+ return *schema_;
}
void MergeIterator::GetIteratorStats(vector<IteratorStats>* stats) const {
- shared_lock<rw_spinlock> l(iters_lock_);
+ shared_lock<rw_spinlock> l(states_lock_);
CHECK(initted_);
*stats = finished_iter_stats_by_col_;
- for (const auto& iter_state : iters_) {
- iter_state->AddStats(stats);
+ for (const auto& s : states_) {
+ s->AddStats(stats);
}
}
@@ -368,19 +395,22 @@ Status UnionIterator::Init(ScanSpec *spec) {
// Initialize the underlying iterators
RETURN_NOT_OK(InitSubIterators(spec));
- // Verify schemas match.
- // Important to do the verification after initializing the
- // sub-iterators, since they may not know their own schemas
- // until they've been initialized (in the case of a union of unions)
+ // Verify that the schemas match in debug builds.
+ //
+ // It's important to do the verification after initializing the iterators, as
+ // they may not know their own schemas until they've been initialized (in the
+ // case of a union of unions).
schema_.reset(new Schema(iters_.front()->schema()));
- for (const shared_ptr<RowwiseIterator> &iter : iters_) {
- if (!iter->schema().Equals(*schema_)) {
+ finished_iter_stats_by_col_.resize(schema_->num_columns());
+#ifndef NDEBUG
+ for (const auto& i : iters_) {
+ if (!i->schema().Equals(*schema_)) {
return Status::InvalidArgument(
- string("Schemas do not match: ") + schema_->ToString()
- + " vs " + iter->schema().ToString());
+ Substitute("Schemas do not match: $0 vs. $1",
+ schema_->ToString(), i->schema().ToString()));
}
}
- finished_iter_stats_by_col_.resize(schema_->num_columns());
+#endif
initted_ = true;
return Status::OK();
@@ -388,9 +418,9 @@ Status UnionIterator::Init(ScanSpec *spec) {
Status UnionIterator::InitSubIterators(ScanSpec *spec) {
- for (shared_ptr<RowwiseIterator> &iter : iters_) {
+ for (auto& i : iters_) {
ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
- RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&iter, spec_copy));
+ RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&i, spec_copy));
}
// Since we handle predicates in all the wrapped iterators, we can clear
// them here.
@@ -402,8 +432,8 @@ Status UnionIterator::InitSubIterators(ScanSpec *spec) {
bool UnionIterator::HasNext() const {
CHECK(initted_);
- for (const shared_ptr<RowwiseIterator> &iter : iters_) {
- if (iter->HasNext()) return true;
+ for (const auto& i : iters_) {
+ if (i->HasNext()) return true;
}
return false;
@@ -446,14 +476,9 @@ void UnionIterator::PopFront() {
string UnionIterator::ToString() const {
string s;
s.append("Union(");
- bool first = true;
- for (const shared_ptr<RowwiseIterator> &iter : iters_) {
- if (!first) {
- s.append(", ");
- }
- first = false;
- s.append(iter->ToString());
- }
+ s += JoinMapped(iters_, [](const shared_ptr<RowwiseIterator>& it) {
+ return it->ToString();
+ }, ",");
s.append(")");
return s;
}
@@ -674,7 +699,7 @@ Status PredicateEvaluatingIterator::NextBlock(RowBlock *dst) {
}
string PredicateEvaluatingIterator::ToString() const {
- return strings::Substitute("PredicateEvaluating($0)", base_iter_->ToString());
+ return Substitute("PredicateEvaluating($0)", base_iter_->ToString());
}
} // namespace kudu
diff --git a/src/kudu/common/generic_iterators.h b/src/kudu/common/generic_iterators.h
index 49ed9b5..bc3ac63 100644
--- a/src/kudu/common/generic_iterators.h
+++ b/src/kudu/common/generic_iterators.h
@@ -34,7 +34,6 @@
#include "kudu/common/iterator_stats.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
-#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/port.h"
#include "kudu/util/locks.h"
#include "kudu/util/object_pool.h"
@@ -49,11 +48,14 @@ class RowBlock;
// based on keys.
class MergeIterator : public RowwiseIterator {
public:
- // TODO: clarify whether schema is just the projection, or must include the merge
- // key columns. It should probably just be the required projection, which must be
- // a subset of the columns in 'iters'.
- MergeIterator(const Schema& schema,
- std::vector<std::shared_ptr<RowwiseIterator>> iters);
+ // Constructs a MergeIterator of the given iterators.
+ //
+ // The iterators must have matching schemas and should not yet be initialized.
+ //
+ // Note: the iterators must be constructed using a projection that includes
+ // all key columns; otherwise a CHECK will fire at initialization time.
+ explicit MergeIterator(std::vector<std::shared_ptr<RowwiseIterator>> iters);
+
virtual ~MergeIterator();
// The passed-in iterators should be already initialized.
@@ -74,7 +76,8 @@ class MergeIterator : public RowwiseIterator {
Status MaterializeBlock(RowBlock* dst);
Status InitSubIterators(ScanSpec *spec);
- const Schema schema_;
+ // Initialized during Init.
+ std::unique_ptr<Schema> schema_;
bool initted_;
@@ -82,10 +85,10 @@ class MergeIterator : public RowwiseIterator {
// This is required because we can't create a MergeIterState of an uninitialized iterator.
std::vector<std::shared_ptr<RowwiseIterator>> orig_iters_;
- // See UnionIterator::iters_lock_ for details on locking. This follows the same
+ // See UnionIterator::states_lock_ for details on locking. This follows the same
// pattern.
- mutable rw_spinlock iters_lock_;
- std::vector<std::unique_ptr<MergeIterState>> iters_;
+ mutable rw_spinlock states_lock_;
+ std::vector<std::unique_ptr<MergeIterState>> states_;
// Statistics (keyed by projection column index) accumulated so far by any
// fully-consumed sub-iterators.
@@ -108,12 +111,9 @@ class MergeIterator : public RowwiseIterator {
// part of the projection.
class UnionIterator : public RowwiseIterator {
public:
- // Construct a union iterator of the given iterators.
- // The iterators must have matching schemas.
- // The passed-in iterators should not yet be initialized.
+ // Constructs a UnionIterator of the given iterators.
//
- // All passed-in iterators must be fully able to evaluate all predicates - i.e.
- // calling iter->Init(spec) should remove all predicates from the spec.
+ // The iterators must have matching schemas and should not yet be initialized.
explicit UnionIterator(std::vector<std::shared_ptr<RowwiseIterator>> iters);
Status Init(ScanSpec *spec) OVERRIDE;
@@ -143,7 +143,8 @@ class UnionIterator : public RowwiseIterator {
void PopFront();
// Schema: initialized during Init()
- gscoped_ptr<Schema> schema_;
+ std::unique_ptr<Schema> schema_;
+
bool initted_;
// Lock protecting 'iters_' and 'finished_iter_stats_by_col_'.
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index a6dadb5..3f54456 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -102,7 +102,7 @@ Status DuplicatingRowSet::NewRowIterator(const RowIteratorOptions& opts,
switch (opts.order) {
case ORDERED:
- out->reset(new MergeIterator(*opts.projection, std::move(iters)));
+ out->reset(new MergeIterator(std::move(iters)));
break;
case UNORDERED:
out->reset(new UnionIterator(std::move(iters)));
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 67f3853..a6343d6 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -2464,7 +2464,7 @@ Status Tablet::Iterator::Init(ScanSpec *spec) {
switch (opts_.order) {
case ORDERED:
- iter_.reset(new MergeIterator(projection_, std::move(iters)));
+ iter_.reset(new MergeIterator(std::move(iters)));
break;
case UNORDERED:
default: