You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@quickstep.apache.org by jianqiao <gi...@git.apache.org> on 2017/02/09 06:47:06 UTC

[GitHub] incubator-quickstep pull request #185: Fuse Aggregate with HashLeftOuterJoin...

GitHub user jianqiao opened a pull request:

    https://github.com/apache/incubator-quickstep/pull/185

    Fuse Aggregate with HashLeftOuterJoin to accelerate evaluation

    This PR fuses `Aggregate` with `HashJoin` (left outer) into the `CrossReferenceCoalesceAggregate` node to enable fast-path evaluation for a class of queries.
    
    Here we briefly describe the semantics of `CrossReferenceCoalesceAggregate`: Let `L` be a table with PRIMARY KEY `u`. Let `R` be a table with FOREIGN KEY `x` referring to `L(u)`. Then `CrossReferenceCoalesceAggregate` represents a common class of analytical queries that
    > For each u in L, COUNT/SUM the records in R that correspond to u (i.e. those records satisfying R.x = L.u). In the case that there is no record for u in R, use 0 as the result value.
    
    That is, `CrossReferenceCoalesceAggregate` represents queries in the following forms:
    ```
    -- Form 1 --
    SELECT u, COALESCE(count, 0), COALESCE(sum, 0)
    FROM L LEFT OUTER JOIN (
      SELECT x, COUNT(*) AS count, SUM(y) AS sum
      FROM R
      GROUP BY x) Rt ON L.u = Rt.x;
    ```
    or (COUNT only) 
    ```
    -- Form 2 --
    SELECT u, COUNT(x) AS count
    FROM L LEFT OUTER JOIN R ON L.u = R.x
    GROUP BY u;
    ```
    where `L`, `R`, `u`, `v` and `SUM(...)/COUNT(...)` are arguments to `CrossReferenceCoalesceAggregate`.
    
    The fast-path evaluation is that, we first use `L.u` to build the `existence_map` for a `CollisionFreeVectorTable` which is then used to perform aggregation on `R` grouped by `R.x`. At the time of aggregation finalization, any `existence_map` entry that is set by `L.u` but not updated with `R`'s records will have aggregation result as `0` by default -- this implicitly achieves the left outer join + coalesce 0 semantics.
    
    This optimization improves TPC-H Q13's performance from ~15.5s to ~3s on a cloudlab machine with scale factor 100.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/incubator-quickstep aggregate-on-left-outer-join

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-quickstep/pull/185.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #185
    
----
commit a28b1e4d77ee12466b0801a5a7c5185f7a83e7f8
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Date:   2017-01-30T20:46:39Z

    Fuse Aggregate with LeftOuterJoin to accelerate evaluation.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #185: Fuse Aggregate with HashLeftOuterJoin...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/185#discussion_r100365695
  
    --- Diff: query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp ---
    @@ -0,0 +1,232 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
    +#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
    +
    +#include <cstddef>
    +#include <memory>
    +#include <string>
    +#include <vector>
    +
    +#include "query_optimizer/OptimizerTree.hpp"
    +#include "query_optimizer/expressions/Alias.hpp"
    +#include "query_optimizer/expressions/AttributeReference.hpp"
    +#include "query_optimizer/expressions/ExpressionUtil.hpp"
    +#include "query_optimizer/expressions/Predicate.hpp"
    +#include "query_optimizer/physical/Physical.hpp"
    +#include "query_optimizer/physical/PhysicalType.hpp"
    +#include "utility/Macros.hpp"
    +
    +#include "glog/logging.h"
    +
    +namespace quickstep {
    +namespace optimizer {
    +namespace physical {
    +
    +/** \addtogroup OptimizerLogical
    + *  @{
    + */
    +
    +class CrossReferenceCoalesceAggregate;
    +typedef std::shared_ptr<const CrossReferenceCoalesceAggregate> CrossReferenceCoalesceAggregatePtr;
    +
    +/**
    + * @brief A physical node that fuses a HashJoin with an Aggregate to enable
    + *        fast-path execution.
    + *
    + * Below we briefly describe the semantics of this physical node.
    + *
    + * Let L be a table with PRIMARY KEY u. Let R be a table with FOREIGN KEY x
    + * referring to L(u). Then CrossReferenceCoalesceAggregate represents a common
    + * class of analytical queries that
    + * - For each u in L, COUNT/SUM the records in R that correspond to u (i.e.
    + *   those records satisfying R.x = L.u).
    + *   In the case that there is no record for u in R, use 0 as the result value.
    + *
    + * And we have the mapping:
    + *   L -> left_child_
    + *   R -> right_child_
    + *   u -> left_join_attributes_
    + *   x -> right_join_attributes_
    + *   COUNT/SUM -> aggregate_expressions_
    + */
    +class CrossReferenceCoalesceAggregate : public Physical {
    + public:
    +  PhysicalType getPhysicalType() const override {
    +    return PhysicalType::kCrossReferenceCoalesceAggregate;
    +  }
    +
    +  std::string getName() const override {
    +    return "CrossReferenceCoalesceAggregate";
    +  }
    +
    +  /**
    +   * @return The left physical child.
    +   */
    +  const PhysicalPtr& left_child() const {
    +    return left_child_;
    +  }
    +
    +  /**
    +   * @return The right physical child.
    +   */
    +  const PhysicalPtr& right_child() const {
    +    return right_child_;
    +  }
    +
    +  /**
    +   * @return The left join attributes.
    +   */
    +  const std::vector<expressions::AttributeReferencePtr>& left_join_attributes() const {
    +    return left_join_attributes_;
    +  }
    +
    +  /**
    +   * @return The right join attributes.
    +   */
    +  const std::vector<expressions::AttributeReferencePtr>& right_join_attributes() const {
    +    return right_join_attributes_;
    +  }
    +
    +  /**
    +   * @return The predicate to be applied to the right child before aggregation.
    +   */
    +  const expressions::PredicatePtr& right_filter_predicate() const {
    +    return right_filter_predicate_;
    +  }
    +
    +  /**
    +   * @return Aggregate expressions.
    +   */
    +  const std::vector<expressions::AliasPtr>& aggregate_expressions() const {
    +    return aggregate_expressions_;
    +  }
    +
    +  /**
    +   * @return The maximum possible value of the group-by keys when mapped to
    +   *         integer.
    +   */
    +  std::size_t group_by_key_value_range() const {
    +    return group_by_key_value_range_;
    +  }
    +
    +  PhysicalPtr copyWithNewChildren(
    +      const std::vector<PhysicalPtr> &new_children) const override {
    +    DCHECK_EQ(getNumChildren(), new_children.size());
    +    return Create(new_children[0],
    +                  new_children[1],
    +                  left_join_attributes_,
    +                  right_join_attributes_,
    +                  right_filter_predicate_,
    +                  aggregate_expressions_,
    +                  group_by_key_value_range_);
    +  }
    +
    +  std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override;
    +
    +  std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
    +
    +  bool maybeCopyWithPrunedExpressions(
    +      const expressions::UnorderedNamedExpressionSet &referenced_expressions,
    +      PhysicalPtr *output) const override {
    +    return false;
    +  }
    +
    +  /**
    +   * @brief Creates a physical CrossReferenceCoalesceAggregate.
    +   *
    +   * @param left_child The left child.
    +   * @param right_child The right child.
    +   * @param left_join_attributes The join attributes of the left child.
    +   * @param right_join_attributes The join attributes of the right child.
    +   * @param right_filter_predicate Optional filtering predicate evaluated on
    +   *        the left child before aggregation.
    +   * @param aggregate_expressions The aggregate expressions.
    +   * @param group_by_key_value_range The maximum possible value of the group-by
    +   *        keys when mapped to integer.
    +   * @return An immutable physical CrossReferenceCoalesceAggregate.
    +   */
    +  static CrossReferenceCoalesceAggregatePtr Create(
    +      const PhysicalPtr &left_child,
    +      const PhysicalPtr &right_child,
    +      const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
    +      const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
    +      const expressions::PredicatePtr right_filter_predicate,
    +      const std::vector<expressions::AliasPtr> &aggregate_expressions,
    +      const std::size_t group_by_key_value_range) {
    +    return CrossReferenceCoalesceAggregatePtr(
    +        new CrossReferenceCoalesceAggregate(left_child,
    +                                            right_child,
    +                                            left_join_attributes,
    +                                            right_join_attributes,
    +                                            right_filter_predicate,
    +                                            aggregate_expressions,
    +                                            group_by_key_value_range));
    +  }
    +
    + protected:
    +  void getFieldStringItems(
    +      std::vector<std::string> *inline_field_names,
    +      std::vector<std::string> *inline_field_values,
    +      std::vector<std::string> *non_container_child_field_names,
    +      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
    +      std::vector<std::string> *container_child_field_names,
    +      std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
    +
    + private:
    +  CrossReferenceCoalesceAggregate(
    +      const PhysicalPtr &left_child,
    +      const PhysicalPtr &right_child,
    +      const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
    +      const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
    +      const expressions::PredicatePtr right_filter_predicate,
    +      const std::vector<expressions::AliasPtr> &aggregate_expressions,
    +      const std::size_t group_by_key_value_range)
    +      : left_child_(left_child),
    +        right_child_(right_child),
    +        left_join_attributes_(left_join_attributes),
    +        right_join_attributes_(right_join_attributes),
    +        right_filter_predicate_(right_filter_predicate),
    +        aggregate_expressions_(aggregate_expressions),
    +        group_by_key_value_range_(group_by_key_value_range) {
    +    addChild(left_child_);
    +    addChild(right_child_);
    +  }
    +
    +  // TODO(jianqiao): For the left child, support filter predicate fusing and
    +  // attachment of LIPFilters.
    +  PhysicalPtr left_child_;
    +  PhysicalPtr right_child_;
    +  std::vector<expressions::AttributeReferencePtr> left_join_attributes_;
    +  std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
    +  expressions::PredicatePtr right_filter_predicate_;
    +  std::vector<expressions::AliasPtr> aggregate_expressions_;
    +  std::size_t group_by_key_value_range_;
    --- End diff --
    
    Would above data members change? Otherwise, we could mark `const`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #185: Fuse Aggregate with HashLeftOuterJoin...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/185#discussion_r100369492
  
    --- Diff: relational_operators/WorkOrder.proto ---
    @@ -44,6 +44,7 @@ enum WorkOrderType {
       UPDATE = 20;
       WINDOW_AGGREGATION = 21;
       DESTROY_AGGREGATION_STATE = 22;
    +  BUILD_AGGREGATION_EXISTENCE_MAP = 23;
    --- End diff --
    
    Please sort the type in the alphabet order (similarly to `BuildAggregationExistenceMapWorkOrder` below), and add the following comment above `enum WorkOrderType {`:
    
    ```
    // Next tag: 24.
    enum WorkOrderType {
    ```
    
    Recall that this is how Google deals with changing proto definitions. And `Next tag` is the next available number to assign to a new type. After adding the new type, we need to increase `Next tag` by one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #185: Fuse Aggregate with HashLeftOuterJoin to acc...

Posted by pateljm <gi...@git.apache.org>.
Github user pateljm commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/185
  
    LGTM. Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #185: Fuse Aggregate with HashLeftOuterJoin...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/185#discussion_r100368210
  
    --- Diff: relational_operators/BuildAggregationExistenceMapOperator.hpp ---
    @@ -0,0 +1,177 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
    +#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
    +
    +#include <cstddef>
    +
    +#include <string>
    +#include <vector>
    +
    +#include "catalog/CatalogRelation.hpp"
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/QueryContext.hpp"
    +#include "relational_operators/RelationalOperator.hpp"
    +#include "relational_operators/WorkOrder.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "utility/Macros.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/id_typedefs.h"
    +
    +namespace tmb { class MessageBus; }
    +
    +namespace quickstep {
    +
    +class AggregationOperationState;
    +class CatalogRelationSchema;
    +class StorageManager;
    +class WorkOrderProtosContainer;
    +class WorkOrdersContainer;
    +
    +namespace serialization { class WorkOrder; }
    +
    +/** \addtogroup RelationalOperators
    + *  @{
    + */
    +
    +/**
    + * @brief An operator which builds a bit vector on the input relation's one
    + *        attribute where the bit vector serves as the existence map for an
    + *        AggregationOperationState's CollisionFreeVectorTable.
    + **/
    +class BuildAggregationExistenceMapOperator : public RelationalOperator {
    + public:
    +  /**
    +   * @brief Constructor.
    +   *
    +   * @param query_id The ID of the query to which this operator belongs.
    +   * @param input_relation The relation to build the existence map on.
    +   * @param build_attribute The ID of the attribute to build the existence map on.
    +   * @param input_relation_is_stored If input_relation is a stored relation and
    +   *        is fully available to the operator before it can start generating
    +   *        workorders.
    +   * @param aggr_state_index The index of the AggregationState in QueryContext.
    +   **/
    +  BuildAggregationExistenceMapOperator(const std::size_t query_id,
    +                                       const CatalogRelation &input_relation,
    +                                       const attribute_id build_attribute,
    +                                       const bool input_relation_is_stored,
    +                                       const QueryContext::aggregation_state_id aggr_state_index)
    +      : RelationalOperator(query_id),
    +        input_relation_(input_relation),
    +        build_attribute_(build_attribute),
    +        input_relation_is_stored_(input_relation_is_stored),
    +        input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
    +                                                           : std::vector<block_id>()),
    +        aggr_state_index_(aggr_state_index),
    +        num_workorders_generated_(0),
    +        started_(false) {}
    +
    +  ~BuildAggregationExistenceMapOperator() override {}
    +
    +  std::string getName() const override {
    +    return "BuildAggregationExistenceMapOperator";
    +  }
    +
    +  /**
    +   * @return The input relation.
    +   */
    +  const CatalogRelation& input_relation() const {
    +    return input_relation_;
    +  }
    +
    +  bool getAllWorkOrders(WorkOrdersContainer *container,
    +                        QueryContext *query_context,
    +                        StorageManager *storage_manager,
    +                        const tmb::client_id scheduler_client_id,
    +                        tmb::MessageBus *bus) override;
    +
    +  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
    +
    +  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
    +                      const partition_id part_id) override {
    +    input_relation_block_ids_.push_back(input_block_id);
    +  }
    +
    + private:
    +  serialization::WorkOrder* createWorkOrderProto(const block_id block);
    +
    +  const CatalogRelation &input_relation_;
    +  const attribute_id build_attribute_;
    +  const bool input_relation_is_stored_;
    +  std::vector<block_id> input_relation_block_ids_;
    +  const QueryContext::aggregation_state_id aggr_state_index_;
    +
    +  std::vector<block_id>::size_type num_workorders_generated_;
    +  bool started_;
    +
    +  DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapOperator);
    +};
    +
    +/**
    + * @brief A WorkOrder produced by BuildAggregationExistenceMapOperator.
    + **/
    +class BuildAggregationExistenceMapWorkOrder : public WorkOrder {
    + public:
    +  /**
    +   * @brief Constructor
    +   *
    +   * @param query_id The ID of this query.
    +   * @param input_relation The relation to build the existence map on.
    +   * @param build_block_id The block id.
    +   * @param build_attribute The ID of the attribute to build on.
    +   * @param state The AggregationState to use.
    +   * @param storage_manager The StorageManager to use.
    +   **/
    +  BuildAggregationExistenceMapWorkOrder(const std::size_t query_id,
    +                                        const CatalogRelationSchema &input_relation,
    +                                        const block_id build_block_id,
    +                                        const attribute_id build_attribute,
    +                                        AggregationOperationState *state,
    +                                        StorageManager *storage_manager)
    +      : WorkOrder(query_id),
    +        input_relation_(input_relation),
    +        build_block_id_(build_block_id),
    +        build_attribute_(build_attribute),
    +        state_(DCHECK_NOTNULL(state)),
    +        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
    +
    +  ~BuildAggregationExistenceMapWorkOrder() override {}
    +
    +  void execute() override;
    +
    + private:
    +  const CatalogRelationSchema &input_relation_;
    +  const block_id build_block_id_;
    +  const attribute_id build_attribute_;
    +  AggregationOperationState *state_;
    +
    --- End diff --
    
    Move this empty line before `state_`, so that all constants are together.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #185: Fuse Aggregate with HashLeftOuterJoin...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/185#discussion_r100251764
  
    --- Diff: query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp ---
    @@ -49,15 +55,24 @@
     #include "query_optimizer/physical/TableGenerator.hpp"
     #include "query_optimizer/physical/TableReference.hpp"
     #include "query_optimizer/physical/TopLevelPlan.hpp"
    +#include "types/Type.hpp"
    +#include "types/TypeID.hpp"
     #include "types/TypedValue.hpp"
     #include "types/NullType.hpp"
    +#include "utility/EqualsAnyConstant.hpp"
    +
    +#include "gflags/gflags.h"
     
     #include "glog/logging.h"
     
     namespace quickstep {
     namespace optimizer {
     namespace cost {
     
    +DEFINE_int64(collision_free_vector_table_max_size, 1000000000,
    --- End diff --
    
    Either use `uint64`, or add a static validation function to check for negative values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #185: Fuse Aggregate with HashLeftOuterJoin...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/185#discussion_r100365438
  
    --- Diff: query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp ---
    @@ -0,0 +1,232 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
    +#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
    +
    +#include <cstddef>
    +#include <memory>
    +#include <string>
    +#include <vector>
    +
    +#include "query_optimizer/OptimizerTree.hpp"
    +#include "query_optimizer/expressions/Alias.hpp"
    +#include "query_optimizer/expressions/AttributeReference.hpp"
    +#include "query_optimizer/expressions/ExpressionUtil.hpp"
    +#include "query_optimizer/expressions/Predicate.hpp"
    +#include "query_optimizer/physical/Physical.hpp"
    +#include "query_optimizer/physical/PhysicalType.hpp"
    +#include "utility/Macros.hpp"
    +
    +#include "glog/logging.h"
    +
    +namespace quickstep {
    +namespace optimizer {
    +namespace physical {
    +
    +/** \addtogroup OptimizerLogical
    + *  @{
    + */
    +
    +class CrossReferenceCoalesceAggregate;
    +typedef std::shared_ptr<const CrossReferenceCoalesceAggregate> CrossReferenceCoalesceAggregatePtr;
    +
    +/**
    + * @brief A physical node that fuses a HashJoin with an Aggregate to enable
    + *        fast-path execution.
    + *
    + * Below we briefly describe the semantics of this physical node.
    + *
    + * Let L be a table with PRIMARY KEY u. Let R be a table with FOREIGN KEY x
    + * referring to L(u). Then CrossReferenceCoalesceAggregate represents a common
    --- End diff --
    
    Should be `L.u`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #185: Fuse Aggregate with HashLeftOuterJoin...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/185#discussion_r100368014
  
    --- Diff: relational_operators/BuildAggregationExistenceMapOperator.hpp ---
    @@ -0,0 +1,177 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
    +#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
    +
    +#include <cstddef>
    +
    +#include <string>
    +#include <vector>
    +
    +#include "catalog/CatalogRelation.hpp"
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/QueryContext.hpp"
    +#include "relational_operators/RelationalOperator.hpp"
    +#include "relational_operators/WorkOrder.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "utility/Macros.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/id_typedefs.h"
    +
    +namespace tmb { class MessageBus; }
    +
    +namespace quickstep {
    +
    +class AggregationOperationState;
    +class CatalogRelationSchema;
    +class StorageManager;
    +class WorkOrderProtosContainer;
    +class WorkOrdersContainer;
    +
    +namespace serialization { class WorkOrder; }
    +
    +/** \addtogroup RelationalOperators
    + *  @{
    + */
    +
    +/**
    + * @brief An operator which builds a bit vector on the input relation's one
    + *        attribute where the bit vector serves as the existence map for an
    + *        AggregationOperationState's CollisionFreeVectorTable.
    + **/
    +class BuildAggregationExistenceMapOperator : public RelationalOperator {
    + public:
    +  /**
    +   * @brief Constructor.
    +   *
    +   * @param query_id The ID of the query to which this operator belongs.
    +   * @param input_relation The relation to build the existence map on.
    +   * @param build_attribute The ID of the attribute to build the existence map on.
    +   * @param input_relation_is_stored If input_relation is a stored relation and
    +   *        is fully available to the operator before it can start generating
    +   *        workorders.
    +   * @param aggr_state_index The index of the AggregationState in QueryContext.
    +   **/
    +  BuildAggregationExistenceMapOperator(const std::size_t query_id,
    +                                       const CatalogRelation &input_relation,
    +                                       const attribute_id build_attribute,
    +                                       const bool input_relation_is_stored,
    +                                       const QueryContext::aggregation_state_id aggr_state_index)
    +      : RelationalOperator(query_id),
    +        input_relation_(input_relation),
    +        build_attribute_(build_attribute),
    +        input_relation_is_stored_(input_relation_is_stored),
    +        input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
    +                                                           : std::vector<block_id>()),
    +        aggr_state_index_(aggr_state_index),
    +        num_workorders_generated_(0),
    +        started_(false) {}
    +
    +  ~BuildAggregationExistenceMapOperator() override {}
    +
    +  std::string getName() const override {
    +    return "BuildAggregationExistenceMapOperator";
    +  }
    +
    +  /**
    +   * @return The input relation.
    +   */
    +  const CatalogRelation& input_relation() const {
    +    return input_relation_;
    +  }
    +
    +  bool getAllWorkOrders(WorkOrdersContainer *container,
    +                        QueryContext *query_context,
    +                        StorageManager *storage_manager,
    +                        const tmb::client_id scheduler_client_id,
    +                        tmb::MessageBus *bus) override;
    +
    +  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
    +
    +  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
    +                      const partition_id part_id) override {
    +    input_relation_block_ids_.push_back(input_block_id);
    +  }
    +
    + private:
    +  serialization::WorkOrder* createWorkOrderProto(const block_id block);
    +
    +  const CatalogRelation &input_relation_;
    +  const attribute_id build_attribute_;
    +  const bool input_relation_is_stored_;
    +  std::vector<block_id> input_relation_block_ids_;
    --- End diff --
    
    Please move this line before `num_workorders_generated_`, so that all constants are together.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #185: Fuse Aggregate with HashLeftOuterJoin...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/185#discussion_r100369933
  
    --- Diff: relational_operators/WorkOrderFactory.cpp ---
    @@ -91,6 +92,19 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               CreateLIPFilterAdaptiveProberHelper(
                   proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context));
         }
    +    case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
    +      LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder in Shiftboss " << shiftboss_index;
    +
    +      return new BuildAggregationExistenceMapWorkOrder(
    +          proto.query_id(),
    +          catalog_database->getRelationSchemaById(
    +              proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)),
    +          proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id),
    +          proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute),
    +          query_context->getAggregationState(
    +              proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)),
    --- End diff --
    
    This is a bug: replace `AggregationWorkOrder` with `BuildAggregationExistenceMapWorkOrder`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #185: Fuse Aggregate with HashLeftOuterJoin...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-quickstep/pull/185


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #185: Fuse Aggregate with HashLeftOuterJoin...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/185#discussion_r100367721
  
    --- Diff: relational_operators/BuildAggregationExistenceMapOperator.cpp ---
    @@ -0,0 +1,196 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
    +
    +#include <memory>
    +#include <vector>
    +
    +#include "catalog/CatalogAttribute.hpp"
    +#include "catalog/CatalogRelationSchema.hpp"
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/QueryContext.hpp"
    +#include "query_execution/WorkOrderProtosContainer.hpp"
    +#include "query_execution/WorkOrdersContainer.hpp"
    +#include "relational_operators/WorkOrder.pb.h"
    +#include "storage/AggregationOperationState.hpp"
    +#include "storage/CollisionFreeVectorTable.hpp"
    +#include "storage/StorageBlock.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "storage/StorageManager.hpp"
    +#include "storage/TupleStorageSubBlock.hpp"
    +#include "storage/ValueAccessor.hpp"
    +#include "storage/ValueAccessorUtil.hpp"
    +#include "types/Type.hpp"
    +#include "types/TypeID.hpp"
    +#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/id_typedefs.h"
    +
    +namespace quickstep {
    +
    +namespace {
    +
    +template <typename CppType, bool is_attr_nullable>
    +void ExecuteBuild(const attribute_id attr_id,
    +                  ValueAccessor *accessor,
    +                  BarrieredReadWriteConcurrentBitVector *existence_map) {
    +  InvokeOnAnyValueAccessor(
    +      accessor,
    +      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
    +    accessor->beginIteration();
    +    while (accessor->next()) {
    +      const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
    +      if (!is_attr_nullable || value != nullptr) {
    +        existence_map->setBit(*reinterpret_cast<const CppType *>(value));
    +      }
    +    }
    +  });
    +}
    +
    +// Dispatch helper.
    +template <typename CppType>
    +void ExecuteHelper(const attribute_id attr_id,
    +                   const bool is_attr_nullable,
    +                   ValueAccessor *accessor,
    +                   BarrieredReadWriteConcurrentBitVector *existence_map)  {
    +  if (is_attr_nullable) {
    +    ExecuteBuild<CppType, true>(attr_id, accessor, existence_map);
    +  } else {
    +    ExecuteBuild<CppType, false>(attr_id, accessor, existence_map);
    +  }
    +}
    +
    +}  // namespace
    +
    +bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
    +    WorkOrdersContainer *container,
    +    QueryContext *query_context,
    +    StorageManager *storage_manager,
    +    const tmb::client_id scheduler_client_id,
    +    tmb::MessageBus *bus) {
    +  if (input_relation_is_stored_) {
    +    if (!started_) {
    +      for (const block_id input_block_id : input_relation_block_ids_) {
    +        container->addNormalWorkOrder(
    +            new BuildAggregationExistenceMapWorkOrder(
    +                query_id_,
    +                input_relation_,
    +                input_block_id,
    +                build_attribute_,
    +                query_context->getAggregationState(aggr_state_index_),
    +                storage_manager),
    +            op_index_);
    +      }
    +      started_ = true;
    +    }
    +    return true;
    +  } else {
    +    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
    +      container->addNormalWorkOrder(
    +          new BuildAggregationExistenceMapWorkOrder(
    +                query_id_,
    +                input_relation_,
    +                input_relation_block_ids_[num_workorders_generated_],
    +                build_attribute_,
    +                query_context->getAggregationState(aggr_state_index_),
    +                storage_manager),
    +          op_index_);
    +      ++num_workorders_generated_;
    +    }
    +    return done_feeding_input_relation_;
    +  }
    +}
    +
    +bool BuildAggregationExistenceMapOperator
    +    ::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
    +  if (input_relation_is_stored_) {
    +    if (!started_) {
    +      for (const block_id block : input_relation_block_ids_) {
    +        container->addWorkOrderProto(createWorkOrderProto(block), op_index_);
    +      }
    +      started_ = true;
    +    }
    +    return true;
    +  } else {
    +    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
    +      container->addWorkOrderProto(
    +          createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
    +          op_index_);
    +      ++num_workorders_generated_;
    +    }
    +    return done_feeding_input_relation_;
    +  }
    +}
    +
    +serialization::WorkOrder* BuildAggregationExistenceMapOperator
    +    ::createWorkOrderProto(const block_id block) {
    +  serialization::WorkOrder *proto = new serialization::WorkOrder;
    +  proto->set_work_order_type(serialization::BUILD_LIP_FILTER);
    --- End diff --
    
    This is a bug: we need to use the new type called `BUILD_AGGREGATION_EXISTENCE_MAP` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #185: Fuse Aggregate with HashLeftOuterJoin...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/185#discussion_r100244520
  
    --- Diff: query_optimizer/ExecutionGenerator.cpp ---
    @@ -1730,6 +1631,148 @@ void ExecutionGenerator::convertAggregate(
       }
     }
     
    +void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
    +    const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
    +  DCHECK_EQ(1u, physical_plan->left_join_attributes().size());
    +  DCHECK_EQ(1u, physical_plan->right_join_attributes().size());
    +
    +  const CatalogRelationInfo *left_relation_info =
    +      findRelationInfoOutputByPhysical(physical_plan->left_child());
    +  const CatalogRelationInfo *right_relation_info =
    +      findRelationInfoOutputByPhysical(physical_plan->right_child());
    +
    +  // Create aggr state proto.
    +  const QueryContext::aggregation_state_id aggr_state_index =
    +      query_context_proto_->aggregation_states_size();
    +  S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
    +
    +  aggr_state_proto->set_relation_id(right_relation_info->relation->getID());
    +
    +  // Group by the right join attribute.
    +  std::unique_ptr<const Scalar> execution_group_by_expression(
    +      physical_plan->right_join_attributes().front()->concretize(
    +          attribute_substitution_map_));
    +  aggr_state_proto->add_group_by_expressions()->CopyFrom(
    --- End diff --
    
    Use `MergeFrom` to avoid zero-out.
    
    Similarly below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---