You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/14 01:51:19 UTC

[2/2] incubator-quickstep git commit: Initial commit

Initial commit


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/372411e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/372411e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/372411e8

Branch: refs/heads/reorder-attrs
Commit: 372411e8de219c6ce4a458615cfbb77efaa7bd08
Parents: 066723f
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Fri Jan 13 19:51:04 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Fri Jan 13 19:51:04 2017 -0600

----------------------------------------------------------------------
 query_optimizer/rules/ReorderColumns.cpp  | 57 ++++++++++------
 query_optimizer/rules/ReorderColumns.hpp  |  5 +-
 relational_operators/HashJoinOperator.cpp | 93 ++++++++++++++++----------
 3 files changed, 99 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/372411e8/query_optimizer/rules/ReorderColumns.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ReorderColumns.cpp b/query_optimizer/rules/ReorderColumns.cpp
index 8f139eb..f9420bc 100644
--- a/query_optimizer/rules/ReorderColumns.cpp
+++ b/query_optimizer/rules/ReorderColumns.cpp
@@ -42,24 +42,31 @@ namespace P = ::quickstep::optimizer::physical;
 P::PhysicalPtr ReorderColumns::apply(const P::PhysicalPtr &input) {
   DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
 
-  return applyInternal(input);
+  return applyInternal(input, true);
 }
 
-bool ReorderColumns::IsApplicable(const physical::PhysicalPtr &input) {
-  switch (input->getPhysicalType()) {
-    case P::PhysicalType::kHashJoin:  // Fall through
-    case P::PhysicalType::kSelection:
-      return true;
-    default:
-      return false;
+P::PhysicalPtr ReorderColumns::applyInternal(const P::PhysicalPtr &input,
+                                             bool lock_ordering) {
+  // We have to guarantee that the top level ordering of the columns remain
+  // unchanged so that the output columns are ordered as specified by the user.
+  // So here we use the flag "lock_ordering" to skip the first transformable
+  // node (i.e. the first Selection or HashJoin).
+  bool skip_transform;
+  if (IsTransformable(input)) {
+    if (lock_ordering) {
+      skip_transform = true;
+      lock_ordering = false;
+    } else {
+      skip_transform = false;
+    }
+  } else {
+    skip_transform = true;
   }
-}
 
-P::PhysicalPtr ReorderColumns::applyInternal(const P::PhysicalPtr &input) {
-  if (!IsApplicable(input)) {
+  if (skip_transform) {
     std::vector<P::PhysicalPtr> new_children;
     for (const P::PhysicalPtr &child : input->children()) {
-      new_children.emplace_back(applyInternal(child));
+      new_children.emplace_back(applyInternal(child, lock_ordering));
     }
 
     if (new_children != input->children()) {
@@ -69,16 +76,18 @@ P::PhysicalPtr ReorderColumns::applyInternal(const P::PhysicalPtr &input) {
     }
   }
 
+  // Collect the maximal chain of transformable nodes.
   std::vector<P::PhysicalPtr> nodes;
-  std::unordered_map<E::ExprId, std::size_t> gen;
-  std::unordered_map<E::ExprId, std::size_t> kill;
-  std::unordered_map<E::ExprId, std::size_t> base;
-
-  for (P::PhysicalPtr node = input; IsApplicable(node); node = node->children().front()) {
+  for (P::PhysicalPtr node = input; IsTransformable(node); node = node->children().front()) {
     nodes.emplace_back(node);
   }
   std::reverse(nodes.begin(), nodes.end());
 
+  // Analyze the attributes in the nodes.
+  std::unordered_map<E::ExprId, std::size_t> base, gen, kill;
+
+  const P::PhysicalPtr base_node =
+      applyInternal(nodes.front()->children().front(), false);
   const std::vector<E::AttributeReferencePtr> base_attrs =
       nodes.front()->children().front()->getOutputAttributes();
   for (std::size_t i = 0; i < base_attrs.size(); ++i) {
@@ -137,7 +146,7 @@ P::PhysicalPtr ReorderColumns::applyInternal(const P::PhysicalPtr &input) {
     return lhs_id < rhs_id;
   };
 
-  P::PhysicalPtr output = applyInternal(nodes.front()->children().front());
+  P::PhysicalPtr output = base_node;
 
   for (const auto &node : nodes) {
     std::vector<E::NamedExpressionPtr> project_expressions;
@@ -163,7 +172,7 @@ P::PhysicalPtr ReorderColumns::applyInternal(const P::PhysicalPtr &input) {
         const P::HashJoinPtr old_node =
             std::static_pointer_cast<const P::HashJoin>(node);
         output = P::HashJoin::Create(output,
-                                     applyInternal(old_node->right()),
+                                     applyInternal(old_node->right(), false),
                                      old_node->left_join_attributes(),
                                      old_node->right_join_attributes(),
                                      old_node->residual_predicate(),
@@ -187,5 +196,15 @@ P::PhysicalPtr ReorderColumns::applyInternal(const P::PhysicalPtr &input) {
   return output;
 }
 
+bool ReorderColumns::IsTransformable(const physical::PhysicalPtr &input) {
+  switch (input->getPhysicalType()) {
+    case P::PhysicalType::kHashJoin:  // Fall through
+    case P::PhysicalType::kSelection:
+      return true;
+    default:
+      return false;
+  }
+}
+
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/372411e8/query_optimizer/rules/ReorderColumns.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ReorderColumns.hpp b/query_optimizer/rules/ReorderColumns.hpp
index dd0cf33..ad699aa 100644
--- a/query_optimizer/rules/ReorderColumns.hpp
+++ b/query_optimizer/rules/ReorderColumns.hpp
@@ -48,9 +48,10 @@ class ReorderColumns : public Rule<physical::Physical> {
   physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
 
  private:
-  physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input);
+  physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input,
+                                      bool lock_ordering);
 
-  inline static bool IsApplicable(const physical::PhysicalPtr &input);
+  inline static bool IsTransformable(const physical::PhysicalPtr &input);
 
   DISALLOW_COPY_AND_ASSIGN(ReorderColumns);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/372411e8/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index fb7fae7..7f61ecc 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -465,33 +465,32 @@ void HashInnerJoinWorkOrder::execute() {
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   }
 
-  VectorsOfPairsJoinedTuplesCollector collector;
-  if (join_key_attributes_.size() == 1) {
-    hash_table_.getAllFromValueAccessor(
-        probe_accessor.get(),
-        join_key_attributes_.front(),
-        any_join_key_attributes_nullable_,
-        &collector);
-  } else {
-    hash_table_.getAllFromValueAccessorCompositeKey(
-        probe_accessor.get(),
-        join_key_attributes_,
-        any_join_key_attributes_nullable_,
-        &collector);
-  }
-
   const relation_id build_relation_id = build_relation_.getID();
   const relation_id probe_relation_id = probe_relation_.getID();
 
   if (probe_accessor->getImplementationType() == ValueAccessor::Implementation::kSplitRowStore) {
-    std::vector<tuple_id> build_tids;
-    std::vector<tuple_id> probe_tids;
+    PairsOfVectorsJoinedTuplesCollector collector;
+    if (join_key_attributes_.size() == 1) {
+      hash_table_.getAllFromValueAccessor(
+          probe_accessor.get(),
+          join_key_attributes_.front(),
+          any_join_key_attributes_nullable_,
+          &collector);
+    } else {
+      hash_table_.getAllFromValueAccessorCompositeKey(
+          probe_accessor.get(),
+          join_key_attributes_,
+          any_join_key_attributes_nullable_,
+          &collector);
+    }
 
     for (auto &build_block_entry : *collector.getJoinedTuples()) {
       BlockReference build_block =
           storage_manager_->getBlock(build_block_entry.first, build_relation_);
       const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
       std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
+      const std::vector<tuple_id> &build_tids = build_block_entry.second.first;
+      const std::vector<tuple_id> &probe_tids = build_block_entry.second.second;
 
       // Evaluate '*residual_predicate_', if any.
       //
@@ -504,29 +503,22 @@ void HashInnerJoinWorkOrder::execute() {
       // hash join is below a reasonable threshold so that we don't blow up
       // temporary memory requirements to an unreasonable degree.
       if (residual_predicate_ != nullptr) {
-        VectorOfPairs filtered_matches;
-        for (const std::pair<tuple_id, tuple_id> &hash_match
-             : build_block_entry.second) {
+        std::pair<std::vector<tuple_id>, std::vector<tuple_id>> filtered_matches;
+        for (std::size_t i = 0; i < build_tids.size(); ++i) {
           if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
                                                           build_relation_id,
-                                                          hash_match.first,
+                                                          build_tids[i],
                                                           *probe_accessor,
                                                           probe_relation_id,
-                                                          hash_match.second)) {
-            filtered_matches.emplace_back(hash_match);
+                                                          probe_tids[i])) {
+            filtered_matches.first.push_back(build_tids[i]);
+            filtered_matches.second.push_back(probe_tids[i]);
           }
         }
 
         build_block_entry.second = std::move(filtered_matches);
       }
 
-      build_tids.clear();
-      probe_tids.clear();
-      for (const auto &hash_match : build_block_entry.second) {
-        build_tids.emplace_back(hash_match.first);
-        probe_tids.emplace_back(hash_match.second);
-      }
-
       // TODO(chasseur): If all the output expressions are ScalarAttributes,
       // we could implement a similar fast-path to StorageBlock::selectSimple()
       // that avoids a copy.
@@ -588,6 +580,8 @@ void HashInnerJoinWorkOrder::execute() {
       }
 
       attribute_id dest_attr = 0;
+      std::vector<std::pair<tuple_id, tuple_id>> zipped_joined_tuple_ids;
+
       for (auto &selection_cit : selection_) {
         // If the Scalar (column) is not an attribute in build/probe blocks, then
         // insert it into a ColumnVectorsValueAccessor.
@@ -595,12 +589,25 @@ void HashInnerJoinWorkOrder::execute() {
           // Current destination attribute maps to the column we'll create now.
           accessor_attribute_map[temp_index].second[dest_attr] = temp_result.getNumColumns();
 
+          if (temp_result.getNumColumns() == 0) {
+            // The getAllValuesForJoin function below needs joined tuple IDs as
+            // a vector of pair of (build-tuple-ID, probe-tuple-ID), and we have
+            // a pair of (build-tuple-IDs-vector, probe-tuple-IDs-vector). So
+            // we'll have to zip our two vectors together. We do this inside
+            // the loop because most queries don't exercise this code since
+            // they don't have scalar expressions with attributes from both
+            // build and probe relations (other expressions would have been
+            // pushed down to before the join).
+            zipped_joined_tuple_ids.reserve(build_tids.size());
+            for (std::size_t i = 0; i < build_tids.size(); ++i) {
+              zipped_joined_tuple_ids.push_back(std::make_pair(build_tids[i], probe_tids[i]));
+            }
+          }
           temp_result.addColumn(
-              selection_cit->getAllValuesForJoin(build_relation_id,
-                                                 build_accessor.get(),
-                                                 probe_relation_id,
-                                                 probe_accessor.get(),
-                                                 build_block_entry.second));
+              selection_cit
+                  ->getAllValuesForJoin(build_relation_id, build_accessor.get(),
+                                        probe_relation_id, probe_accessor.get(),
+                                        zipped_joined_tuple_ids));
         } else {
           auto scalar_attr = static_cast<const ScalarAttribute *>(selection_cit.get());
           const attribute_id attr_id = scalar_attr->getAttribute().getID();
@@ -620,6 +627,21 @@ void HashInnerJoinWorkOrder::execute() {
       output_destination_->bulkInsertTuplesFromValueAccessors(accessor_attribute_map);
     }
   } else {
+    VectorsOfPairsJoinedTuplesCollector collector;
+    if (join_key_attributes_.size() == 1) {
+      hash_table_.getAllFromValueAccessor(
+          probe_accessor.get(),
+          join_key_attributes_.front(),
+          any_join_key_attributes_nullable_,
+          &collector);
+    } else {
+      hash_table_.getAllFromValueAccessorCompositeKey(
+          probe_accessor.get(),
+          join_key_attributes_,
+          any_join_key_attributes_nullable_,
+          &collector);
+    }
+
     for (std::pair<const block_id, VectorOfPairs>
              &build_block_entry : *collector.getJoinedTuples()) {
       BlockReference build_block =
@@ -639,6 +661,7 @@ void HashInnerJoinWorkOrder::execute() {
       // temporary memory requirements to an unreasonable degree.
       if (residual_predicate_ != nullptr) {
         VectorOfPairs filtered_matches;
+
         for (const std::pair<tuple_id, tuple_id> &hash_match
              : build_block_entry.second) {
           if (residual_predicate_->matchesForJoinedTuples(*build_accessor,