You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/01/24 01:12:01 UTC

[kudu] 04/05: generic_iterators: assorted cleanup

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 111f1843084dedf378898310fcf3e85f1e38a7e1
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Tue Nov 27 15:19:33 2018 -0800

    generic_iterators: assorted cleanup
    
    This patch cleans up the MergeIterator and makes it look more like the
    UnionIterator:
    1. Addressed a long-standing TODO about schema verification.
    2. Extracted the schema from the sub-iterators.
    
    Change-Id: I7f1fd5f4cd1a8ef621d3ac994e764dfd19e99544
    Reviewed-on: http://gerrit.cloudera.org:8080/12156
    Tested-by: Kudu Jenkins
    Reviewed-by: Mike Percy <mp...@apache.org>
---
 src/kudu/common/generic_iterators-test.cc |   4 +-
 src/kudu/common/generic_iterators.cc      | 133 ++++++++++++++++++------------
 src/kudu/common/generic_iterators.h       |  33 ++++----
 src/kudu/tablet/rowset.cc                 |   2 +-
 src/kudu/tablet/tablet.cc                 |   2 +-
 5 files changed, 100 insertions(+), 74 deletions(-)

diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 4bf9732..61c4317 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -144,7 +144,7 @@ TEST(TestMergeIterator, TestMergeEmpty) {
   shared_ptr<RowwiseIterator> iter(
     new MaterializingIterator(
         shared_ptr<ColumnwiseIterator>(new VectorIterator({}))));
-  MergeIterator merger(kIntSchema, { std::move(iter) });
+  MergeIterator merger({ std::move(iter) });
   ASSERT_OK(merger.Init(nullptr));
   ASSERT_FALSE(merger.HasNext());
 }
@@ -206,7 +206,7 @@ void TestMerge(const TestIntRangePredicate &predicate) {
     LOG(INFO) << "Predicate: " << predicate.pred_.ToString();
 
     LOG_TIMING(INFO, "Iterate merged lists") {
-      MergeIterator merger(kIntSchema, std::move(to_merge));
+      MergeIterator merger(std::move(to_merge));
       ASSERT_OK(merger.Init(&spec));
 
       RowBlock dst(kIntSchema, 100, nullptr);
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index eb6ebe6..06c4f25 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/common/generic_iterators.h"
+
 #include <unistd.h>
 
 #include <algorithm>
@@ -32,13 +34,12 @@
 #include "kudu/common/column_materialization_context.h"
 #include "kudu/common/column_predicate.h"
 #include "kudu/common/columnblock.h"
-#include "kudu/common/generic_iterators.h"
 #include "kudu/common/iterator_stats.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
-#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/memory/arena.h"
@@ -52,6 +53,7 @@ using std::sort;
 using std::string;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
 DEFINE_bool(materializing_iterator_do_pushdown, true,
             "Should MaterializingIterator do predicate pushdown");
@@ -127,6 +129,11 @@ class MergeIterState {
     return rows_valid_ - rows_advanced_;
   }
 
+  // Returns the schema from the underlying iterator.
+  const Schema& schema() const {
+    return iter_->schema();
+  }
+
  private:
   // Pull the next block from the underlying iterator.
   Status PullNextBlock();
@@ -204,38 +211,58 @@ Status MergeIterState::PullNextBlock() {
   return Status::OK();
 }
 
-MergeIterator::MergeIterator(
-    const Schema& schema,
-    vector<shared_ptr<RowwiseIterator>> iters)
-    : schema_(schema),
-      initted_(false),
+MergeIterator::MergeIterator(vector<shared_ptr<RowwiseIterator>> iters)
+    : initted_(false),
       orig_iters_(std::move(iters)),
-      finished_iter_stats_by_col_(schema_.num_columns()),
       num_orig_iters_(orig_iters_.size()) {
   CHECK_GT(orig_iters_.size(), 0);
-  CHECK_GT(schema.num_key_columns(), 0);
 }
 
 MergeIterator::~MergeIterator() {}
 
 Status MergeIterator::Init(ScanSpec *spec) {
   CHECK(!initted_);
-  // TODO(todd): check that schemas match up!
 
+  // Initialize the iterators and construct the per-iterator merge states.
+  //
+  // When this method finishes, orig_iters_ has been cleared and states_ has
+  // been populated.
   RETURN_NOT_OK(InitSubIterators(spec));
 
-  for (unique_ptr<MergeIterState> &state : iters_) {
-    RETURN_NOT_OK(state->Init());
+  // Retrieve every iterator's first block.
+  //
+  // TODO(adar): establish dominance between iterators and only initialize
+  // non-dominated iterators.
+  for (auto& s : states_) {
+    RETURN_NOT_OK(s->Init());
+  }
+
+  // Verify that the schemas match in debug builds.
+  //
+  // It's important to do the verification after initializing the iterators, as
+  // they may not know their own schemas until they've been initialized (in the
+  // case of a union of unions).
+  schema_.reset(new Schema(states_.front()->schema()));
+  CHECK_GT(schema_->num_key_columns(), 0);
+  finished_iter_stats_by_col_.resize(schema_->num_columns());
+#ifndef NDEBUG
+  for (const auto& s : states_) {
+    if (!s->schema().Equals(*schema_)) {
+      return Status::InvalidArgument(
+          Substitute("Schemas do not match: $0 vs. $1",
+                     schema_->ToString(), s->schema().ToString()));
+    }
   }
+#endif
 
   // Before we copy any rows, clean up any iterators which were empty
   // to start with. Otherwise, HasNext() won't properly return false
   // if we were passed only empty iterators.
-  iters_.erase(
-      remove_if(iters_.begin(), iters_.end(), [] (const unique_ptr<MergeIterState>& iter) {
+  states_.erase(
+      remove_if(states_.begin(), states_.end(), [] (const unique_ptr<MergeIterState>& iter) {
         return PREDICT_FALSE(iter->IsFullyExhausted());
       }),
-      iters_.end());
+      states_.end());
 
   initted_ = true;
   return Status::OK();
@@ -243,15 +270,15 @@ Status MergeIterator::Init(ScanSpec *spec) {
 
 bool MergeIterator::HasNext() const {
   CHECK(initted_);
-  return !iters_.empty();
+  return !states_.empty();
 }
 
 Status MergeIterator::InitSubIterators(ScanSpec *spec) {
   // Initialize all the sub iterators.
-  for (shared_ptr<RowwiseIterator> &iter : orig_iters_) {
+  for (auto& i : orig_iters_) {
     ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
-    RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&iter, spec_copy));
-    iters_.push_back(unique_ptr<MergeIterState>(new MergeIterState(std::move(iter))));
+    RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&i, spec_copy));
+    states_.push_back(unique_ptr<MergeIterState>(new MergeIterState(std::move(i))));
   }
   orig_iters_.clear();
 
@@ -281,8 +308,8 @@ void MergeIterator::PrepareBatch(RowBlock* dst) {
   // We can always provide at least as many rows as are remaining
   // in the currently queued up blocks.
   size_t available = 0;
-  for (unique_ptr<MergeIterState> &iter : iters_) {
-    available += iter->remaining_in_block();
+  for (const auto& s : states_) {
+    available += s->remaining_in_block();
   }
 
   dst->Resize(std::min(dst->row_capacity(), available));
@@ -302,13 +329,13 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
     MergeIterState *smallest = nullptr;
     ssize_t smallest_idx = -1;
 
-    // Typically the number of iters_ is not that large, so using a priority
+    // Typically the number of states_ is not that large, so using a priority
     // queue is not worth it
-    for (size_t i = 0; i < iters_.size(); i++) {
-      unique_ptr<MergeIterState> &state = iters_[i];
+    for (size_t i = 0; i < states_.size(); i++) {
+      unique_ptr<MergeIterState> &state = states_[i];
 
       if (smallest == nullptr ||
-          schema_.Compare(state->next_row(), smallest->next_row()) < 0) {
+          schema_->Compare(state->next_row(), smallest->next_row()) < 0) {
         smallest = state.get();
         smallest_idx = i;
       }
@@ -322,9 +349,9 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
     RETURN_NOT_OK(smallest->Advance());
 
     if (smallest->IsFullyExhausted()) {
-      std::lock_guard<rw_spinlock> l(iters_lock_);
+      std::lock_guard<rw_spinlock> l(states_lock_);
       smallest->AddStats(&finished_iter_stats_by_col_);
-      iters_.erase(iters_.begin() + smallest_idx);
+      states_.erase(states_.begin() + smallest_idx);
     }
   }
 
@@ -332,21 +359,21 @@ Status MergeIterator::MaterializeBlock(RowBlock *dst) {
 }
 
 string MergeIterator::ToString() const {
-  return strings::Substitute("Merge($0 iters)", num_orig_iters_);
+  return Substitute("Merge($0 iters)", num_orig_iters_);
 }
 
 const Schema& MergeIterator::schema() const {
   CHECK(initted_);
-  return schema_;
+  return *schema_;
 }
 
 void MergeIterator::GetIteratorStats(vector<IteratorStats>* stats) const {
-  shared_lock<rw_spinlock> l(iters_lock_);
+  shared_lock<rw_spinlock> l(states_lock_);
   CHECK(initted_);
   *stats = finished_iter_stats_by_col_;
 
-  for (const auto& iter_state : iters_) {
-    iter_state->AddStats(stats);
+  for (const auto& s : states_) {
+    s->AddStats(stats);
   }
 }
 
@@ -368,19 +395,22 @@ Status UnionIterator::Init(ScanSpec *spec) {
   // Initialize the underlying iterators
   RETURN_NOT_OK(InitSubIterators(spec));
 
-  // Verify schemas match.
-  // Important to do the verification after initializing the
-  // sub-iterators, since they may not know their own schemas
-  // until they've been initialized (in the case of a union of unions)
+  // Verify that the schemas match in debug builds.
+  //
+  // It's important to do the verification after initializing the iterators, as
+  // they may not know their own schemas until they've been initialized (in the
+  // case of a union of unions).
   schema_.reset(new Schema(iters_.front()->schema()));
-  for (const shared_ptr<RowwiseIterator> &iter : iters_) {
-    if (!iter->schema().Equals(*schema_)) {
+  finished_iter_stats_by_col_.resize(schema_->num_columns());
+#ifndef NDEBUG
+  for (const auto& i : iters_) {
+    if (!i->schema().Equals(*schema_)) {
       return Status::InvalidArgument(
-        string("Schemas do not match: ") + schema_->ToString()
-        + " vs " + iter->schema().ToString());
+          Substitute("Schemas do not match: $0 vs. $1",
+                     schema_->ToString(), i->schema().ToString()));
     }
   }
-  finished_iter_stats_by_col_.resize(schema_->num_columns());
+#endif
 
   initted_ = true;
   return Status::OK();
@@ -388,9 +418,9 @@ Status UnionIterator::Init(ScanSpec *spec) {
 
 
 Status UnionIterator::InitSubIterators(ScanSpec *spec) {
-  for (shared_ptr<RowwiseIterator> &iter : iters_) {
+  for (auto& i : iters_) {
     ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
-    RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&iter, spec_copy));
+    RETURN_NOT_OK(PredicateEvaluatingIterator::InitAndMaybeWrap(&i, spec_copy));
   }
   // Since we handle predicates in all the wrapped iterators, we can clear
   // them here.
@@ -402,8 +432,8 @@ Status UnionIterator::InitSubIterators(ScanSpec *spec) {
 
 bool UnionIterator::HasNext() const {
   CHECK(initted_);
-  for (const shared_ptr<RowwiseIterator> &iter : iters_) {
-    if (iter->HasNext()) return true;
+  for (const auto& i : iters_) {
+    if (i->HasNext()) return true;
   }
 
   return false;
@@ -446,14 +476,9 @@ void UnionIterator::PopFront() {
 string UnionIterator::ToString() const {
   string s;
   s.append("Union(");
-  bool first = true;
-  for (const shared_ptr<RowwiseIterator> &iter : iters_) {
-    if (!first) {
-      s.append(", ");
-    }
-    first = false;
-    s.append(iter->ToString());
-  }
+  s += JoinMapped(iters_, [](const shared_ptr<RowwiseIterator>& it) {
+      return it->ToString();
+    }, ",");
   s.append(")");
   return s;
 }
@@ -674,7 +699,7 @@ Status PredicateEvaluatingIterator::NextBlock(RowBlock *dst) {
 }
 
 string PredicateEvaluatingIterator::ToString() const {
-  return strings::Substitute("PredicateEvaluating($0)", base_iter_->ToString());
+  return Substitute("PredicateEvaluating($0)", base_iter_->ToString());
 }
 
 } // namespace kudu
diff --git a/src/kudu/common/generic_iterators.h b/src/kudu/common/generic_iterators.h
index 49ed9b5..bc3ac63 100644
--- a/src/kudu/common/generic_iterators.h
+++ b/src/kudu/common/generic_iterators.h
@@ -34,7 +34,6 @@
 #include "kudu/common/iterator_stats.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
-#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/object_pool.h"
@@ -49,11 +48,14 @@ class RowBlock;
 // based on keys.
 class MergeIterator : public RowwiseIterator {
  public:
-  // TODO: clarify whether schema is just the projection, or must include the merge
-  // key columns. It should probably just be the required projection, which must be
-  // a subset of the columns in 'iters'.
-  MergeIterator(const Schema& schema,
-                std::vector<std::shared_ptr<RowwiseIterator>> iters);
+  // Constructs a MergeIterator of the given iterators.
+  //
+  // The iterators must have matching schemas and should not yet be initialized.
+  //
+  // Note: the iterators must be constructed using a projection that includes
+  // all key columns; otherwise a CHECK will fire at initialization time.
+  explicit MergeIterator(std::vector<std::shared_ptr<RowwiseIterator>> iters);
+
   virtual ~MergeIterator();
 
   // The passed-in iterators should be already initialized.
@@ -74,7 +76,8 @@ class MergeIterator : public RowwiseIterator {
   Status MaterializeBlock(RowBlock* dst);
   Status InitSubIterators(ScanSpec *spec);
 
-  const Schema schema_;
+  // Initialized during Init.
+  std::unique_ptr<Schema> schema_;
 
   bool initted_;
 
@@ -82,10 +85,10 @@ class MergeIterator : public RowwiseIterator {
   // This is required because we can't create a MergeIterState of an uninitialized iterator.
   std::vector<std::shared_ptr<RowwiseIterator>> orig_iters_;
 
-  // See UnionIterator::iters_lock_ for details on locking. This follows the same
+  // See UnionIterator::states_lock_ for details on locking. This follows the same
   // pattern.
-  mutable rw_spinlock iters_lock_;
-  std::vector<std::unique_ptr<MergeIterState>> iters_;
+  mutable rw_spinlock states_lock_;
+  std::vector<std::unique_ptr<MergeIterState>> states_;
 
   // Statistics (keyed by projection column index) accumulated so far by any
   // fully-consumed sub-iterators.
@@ -108,12 +111,9 @@ class MergeIterator : public RowwiseIterator {
 // part of the projection.
 class UnionIterator : public RowwiseIterator {
  public:
-  // Construct a union iterator of the given iterators.
-  // The iterators must have matching schemas.
-  // The passed-in iterators should not yet be initialized.
+  // Constructs a UnionIterator of the given iterators.
   //
-  // All passed-in iterators must be fully able to evaluate all predicates - i.e.
-  // calling iter->Init(spec) should remove all predicates from the spec.
+  // The iterators must have matching schemas and should not yet be initialized.
   explicit UnionIterator(std::vector<std::shared_ptr<RowwiseIterator>> iters);
 
   Status Init(ScanSpec *spec) OVERRIDE;
@@ -143,7 +143,8 @@ class UnionIterator : public RowwiseIterator {
   void PopFront();
 
   // Schema: initialized during Init()
-  gscoped_ptr<Schema> schema_;
+  std::unique_ptr<Schema> schema_;
+
   bool initted_;
 
   // Lock protecting 'iters_' and 'finished_iter_stats_by_col_'.
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index a6dadb5..3f54456 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -102,7 +102,7 @@ Status DuplicatingRowSet::NewRowIterator(const RowIteratorOptions& opts,
 
   switch (opts.order) {
     case ORDERED:
-      out->reset(new MergeIterator(*opts.projection, std::move(iters)));
+      out->reset(new MergeIterator(std::move(iters)));
       break;
     case UNORDERED:
       out->reset(new UnionIterator(std::move(iters)));
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 67f3853..a6343d6 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -2464,7 +2464,7 @@ Status Tablet::Iterator::Init(ScanSpec *spec) {
 
   switch (opts_.order) {
     case ORDERED:
-      iter_.reset(new MergeIterator(projection_, std::move(iters)));
+      iter_.reset(new MergeIterator(std::move(iters)));
       break;
     case UNORDERED:
     default: