You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/07/30 01:11:52 UTC
[2/3] incubator-quickstep git commit: Updates
Updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/91e49820
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/91e49820
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/91e49820
Branch: refs/heads/adaptive-bloom-filters
Commit: 91e498202d7b19cb5c7d4d8c61218d112c446b71
Parents: 9cc47e5
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Fri Jul 29 17:47:42 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Fri Jul 29 17:47:42 2016 -0500
----------------------------------------------------------------------
query_execution/QueryContext.cpp | 11 ++-
query_optimizer/ExecutionGenerator.cpp | 17 ++++
query_optimizer/ExecutionHeuristics.cpp | 36 +++++--
query_optimizer/ExecutionHeuristics.hpp | 51 +++++++---
query_optimizer/physical/Aggregate.hpp | 20 +++-
query_optimizer/physical/HashJoin.hpp | 50 ----------
query_optimizer/physical/Physical.hpp | 50 ++++++++++
query_optimizer/rules/AttachBloomFilters.cpp | 76 ++++++++++-----
.../StarSchemaHashJoinOrderOptimization.cpp | 4 +-
.../StarSchemaHashJoinOrderOptimization.hpp | 25 +++--
storage/AggregationOperationState.cpp | 98 +++++++++++++++++++-
storage/AggregationOperationState.hpp | 10 +-
storage/AggregationOperationState.proto | 6 ++
storage/HashTable.hpp | 27 ++----
storage/HashTable.proto | 6 +-
storage/HashTableFactory.hpp | 9 +-
storage/StorageBlock.cpp | 28 +-----
storage/StorageBlock.hpp | 7 +-
utility/PlanVisualizer.cpp | 24 +++++
19 files changed, 377 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 7019b6a..fd0ed08 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -61,15 +61,16 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
<< "Attempted to create QueryContext from an invalid proto description:\n"
<< proto.DebugString();
+ for (int i = 0; i < proto.bloom_filters_size(); ++i) {
+ bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i)));
+ }
+
for (int i = 0; i < proto.aggregation_states_size(); ++i) {
aggregation_states_.emplace_back(
AggregationOperationState::ReconstructFromProto(proto.aggregation_states(i),
database,
- storage_manager));
- }
-
- for (int i = 0; i < proto.bloom_filters_size(); ++i) {
- bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i)));
+ storage_manager,
+ bloom_filters_));
}
for (int i = 0; i < proto.generator_functions_size(); ++i) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index fe6b6e7..e10f991 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1344,6 +1344,16 @@ void ExecutionGenerator::convertAggregate(
findRelationInfoOutputByPhysical(physical_plan->input());
aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+ const P::BloomFilterConfig &bloom_filter_config =
+ physical_plan->bloom_filter_config();
+ std::vector<attribute_id> bloom_filter_attribute_ids;
+
+ for (const auto &bf : bloom_filter_config.probe_side_bloom_filters) {
+ const CatalogAttribute *bf_catalog_attribute
+ = attribute_substitution_map_[bf.attribute->id()];
+ bloom_filter_attribute_ids.emplace_back(bf_catalog_attribute->getID());
+ }
+
std::vector<const Type*> group_by_types;
for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
unique_ptr<const Scalar> execution_group_by_expression;
@@ -1458,6 +1468,13 @@ void ExecutionGenerator::convertAggregate(
std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
output_relation);
+
+ if (FLAGS_optimize_joins) {
+ execution_heuristics_->addAggregateInfo(aggregation_operator_index,
+ bloom_filter_config,
+ std::move(bloom_filter_attribute_ids),
+ aggr_state_index);
+ }
}
void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/ExecutionHeuristics.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.cpp b/query_optimizer/ExecutionHeuristics.cpp
index b407453..26c4378 100644
--- a/query_optimizer/ExecutionHeuristics.cpp
+++ b/query_optimizer/ExecutionHeuristics.cpp
@@ -65,10 +65,7 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
bloom_filter_config.builder),
std::make_pair(bloom_filter_id, info.build_operator_index_));
- auto *build_side_bloom_filter = hash_table_proto->add_build_side_bloom_filters();
- build_side_bloom_filter->set_bloom_filter_id(bloom_filter_id);
- build_side_bloom_filter->set_attr_id(info.build_side_bloom_filter_ids_[i]);
-
+ hash_table_proto->add_build_side_bloom_filter_id(bloom_filter_id);
std::cout << "Build " << build_side_bf.attribute->toString()
<< " @" << bloom_filter_config.builder << "\n";
}
@@ -83,7 +80,7 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
auto *probe_side_bloom_filter = hash_table_proto->add_probe_side_bloom_filters();
const auto &probe_side_bf =
bloom_filter_config.probe_side_bloom_filters[i];
- std::cout << "Probe " << probe_side_bf.attribute->toString()
+ std::cout << "HashJoin probe " << probe_side_bf.attribute->toString()
<< " @" << probe_side_bf.builder << "\n";
const auto &build_side_info =
@@ -92,13 +89,40 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
probe_side_bf.builder));
probe_side_bloom_filter->set_bloom_filter_id(build_side_info.first);
probe_side_bloom_filter->set_attr_id(info.probe_side_bloom_filter_ids_[i]);
- std::cout << "Probe attr_id = " << info.probe_side_bloom_filter_ids_[i] << "\n";
+ std::cout << "HashJoin probe attr_id = " << info.probe_side_bloom_filter_ids_[i] << "\n";
query_plan->addDirectDependency(info.join_operator_index_,
build_side_info.second,
true /* is_pipeline_breaker */);
}
}
+
+ for (const auto &info : aggregates_) {
+ auto *aggregate_proto =
+ query_context_proto->mutable_aggregation_states(info.aggregate_state_id_);
+ const auto &bloom_filter_config = info.bloom_filter_config_;
+
+ for (std::size_t i = 0; i < info.bloom_filter_ids_.size(); ++i) {
+ auto *bloom_filter = aggregate_proto->add_bloom_filters();
+ const auto &bf =
+ bloom_filter_config.probe_side_bloom_filters[i];
+ std::cout << "Aggregate probe " << bf.attribute->toString()
+ << " @" << bf.builder << "\n";
+
+ const auto &build_side_info =
+ bloom_filter_map.at(
+ std::make_pair(bf.source_attribute->id(),
+ bf.builder));
+ bloom_filter->set_bloom_filter_id(build_side_info.first);
+ bloom_filter->set_attr_id(info.bloom_filter_ids_[i]);
+ std::cout << "Aggregate probe attr_id = "
+ << info.bloom_filter_ids_[i] << "\n";
+
+ query_plan->addDirectDependency(info.aggregate_operator_index_,
+ build_side_info.second,
+ true /* is_pipeline_breaker */);
+ }
+ }
}
void ExecutionHeuristics::setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/ExecutionHeuristics.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.hpp b/query_optimizer/ExecutionHeuristics.hpp
index 8af1b4a..0755124 100644
--- a/query_optimizer/ExecutionHeuristics.hpp
+++ b/query_optimizer/ExecutionHeuristics.hpp
@@ -93,6 +93,23 @@ class ExecutionHeuristics {
const std::size_t estimated_build_relation_cardinality_;
};
+ struct AggregateInfo {
+ AggregateInfo(const QueryPlan::DAGNodeIndex aggregate_operator_index,
+ const physical::BloomFilterConfig &bloom_filter_config,
+ std::vector<attribute_id> &&bloom_filter_ids,
+ const QueryContext::aggregation_state_id aggregate_state_id)
+ : aggregate_operator_index_(aggregate_operator_index),
+ bloom_filter_config_(bloom_filter_config),
+ bloom_filter_ids_(bloom_filter_ids),
+ aggregate_state_id_(aggregate_state_id) {
+ }
+
+ const QueryPlan::DAGNodeIndex aggregate_operator_index_;
+ const physical::BloomFilterConfig &bloom_filter_config_;
+ const std::vector<attribute_id> bloom_filter_ids_;
+ const QueryContext::aggregation_state_id aggregate_state_id_;
+ };
+
/**
* @brief Constructor.
@@ -121,15 +138,25 @@ class ExecutionHeuristics {
std::vector<attribute_id> &&probe_side_bloom_filter_ids,
const QueryContext::join_hash_table_id join_hash_table_id,
const std::size_t estimated_build_relation_cardinality) {
- hash_joins_.push_back(HashJoinInfo(build_operator_index,
- join_operator_index,
- referenced_stored_build_relation,
- referenced_stored_probe_relation,
- bloom_filter_config,
- std::move(build_side_bloom_filter_ids),
- std::move(probe_side_bloom_filter_ids),
- join_hash_table_id,
- estimated_build_relation_cardinality));
+ hash_joins_.emplace_back(build_operator_index,
+ join_operator_index,
+ referenced_stored_build_relation,
+ referenced_stored_probe_relation,
+ bloom_filter_config,
+ std::move(build_side_bloom_filter_ids),
+ std::move(probe_side_bloom_filter_ids),
+ join_hash_table_id,
+ estimated_build_relation_cardinality);
+ }
+
+ inline void addAggregateInfo(const QueryPlan::DAGNodeIndex aggregate_operator_index,
+ const physical::BloomFilterConfig &bloom_filter_config,
+ std::vector<attribute_id> &&bloom_filter_ids,
+ const QueryContext::aggregation_state_id aggregate_state_id) {
+ aggregates_.emplace_back(aggregate_operator_index,
+ bloom_filter_config,
+ std::move(bloom_filter_ids),
+ aggregate_state_id);
}
/**
@@ -152,13 +179,9 @@ class ExecutionHeuristics {
void setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,
const std::size_t cardinality);
- std::size_t estimated_build_relation_cardinality() const {
- return estimated_build_relation_cardinality_;
- }
-
private:
std::vector<HashJoinInfo> hash_joins_;
- std::size_t estimated_build_relation_cardinality_;
+ std::vector<AggregateInfo> aggregates_;
DISALLOW_COPY_AND_ASSIGN(ExecutionHeuristics);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/physical/Aggregate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Aggregate.hpp b/query_optimizer/physical/Aggregate.hpp
index e40d894..b40997c 100644
--- a/query_optimizer/physical/Aggregate.hpp
+++ b/query_optimizer/physical/Aggregate.hpp
@@ -101,6 +101,10 @@ class Aggregate : public Physical {
bool impliesUniqueAttributes(
const std::vector<expressions::AttributeReferencePtr> &attributes) const override;
+ const BloomFilterConfig &bloom_filter_config() const {
+ return bloom_filter_config_;
+ }
+
/**
* @brief Creates an Aggregate physical node.
*
@@ -114,9 +118,14 @@ class Aggregate : public Physical {
PhysicalPtr input,
const std::vector<expressions::NamedExpressionPtr> &grouping_expressions,
const std::vector<expressions::AliasPtr> &aggregate_expressions,
- const expressions::PredicatePtr &filter_predicate) {
+ const expressions::PredicatePtr &filter_predicate,
+ const BloomFilterConfig bloom_filter_config = BloomFilterConfig()) {
return AggregatePtr(
- new Aggregate(input, grouping_expressions, aggregate_expressions, filter_predicate));
+ new Aggregate(input,
+ grouping_expressions,
+ aggregate_expressions,
+ filter_predicate,
+ bloom_filter_config));
}
protected:
@@ -133,11 +142,13 @@ class Aggregate : public Physical {
PhysicalPtr input,
const std::vector<expressions::NamedExpressionPtr> &grouping_expressions,
const std::vector<expressions::AliasPtr> &aggregate_expressions,
- const expressions::PredicatePtr &filter_predicate)
+ const expressions::PredicatePtr &filter_predicate,
+ const BloomFilterConfig &bloom_filter_config)
: input_(input),
grouping_expressions_(grouping_expressions),
aggregate_expressions_(aggregate_expressions),
- filter_predicate_(filter_predicate) {
+ filter_predicate_(filter_predicate),
+ bloom_filter_config_(bloom_filter_config) {
addChild(input_);
}
@@ -145,6 +156,7 @@ class Aggregate : public Physical {
std::vector<expressions::NamedExpressionPtr> grouping_expressions_;
std::vector<expressions::AliasPtr> aggregate_expressions_;
expressions::PredicatePtr filter_predicate_;
+ BloomFilterConfig bloom_filter_config_;
DISALLOW_COPY_AND_ASSIGN(Aggregate);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index cacb08b..104cb52 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -48,56 +48,6 @@ namespace physical {
class HashJoin;
typedef std::shared_ptr<const HashJoin> HashJoinPtr;
-struct BloomFilterConfig {
- struct BuildSide {
- BuildSide(const expressions::AttributeReferencePtr &attribute_in)
- : attribute(attribute_in) {
- }
- expressions::AttributeReferencePtr attribute;
- };
- struct ProbeSide {
- ProbeSide(const expressions::AttributeReferencePtr &attribute_in,
- const expressions::AttributeReferencePtr &source_attribute_in,
- const physical::PhysicalPtr &builder_in)
- : attribute(attribute_in),
- source_attribute(source_attribute_in),
- builder(builder_in) {
- }
- expressions::AttributeReferencePtr attribute;
- expressions::AttributeReferencePtr source_attribute;
- PhysicalPtr builder;
- };
- BloomFilterConfig() {}
- BloomFilterConfig(const PhysicalPtr &builder_in)
- : builder(builder_in) {
- }
- BloomFilterConfig(const PhysicalPtr &builder_in,
- const std::vector<BuildSide> &build_side_bloom_filters_in,
- const std::vector<ProbeSide> &probe_side_bloom_filters_in)
- : builder(builder_in),
- build_side_bloom_filters(build_side_bloom_filters_in),
- probe_side_bloom_filters(probe_side_bloom_filters_in) {
- }
- void addBuildSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in) {
- for (const auto &build_bf : build_side_bloom_filters) {
- if (attribute_in == build_bf.attribute) {
- return;
- }
- }
- build_side_bloom_filters.emplace_back(attribute_in);
- }
- void addProbeSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in,
- const expressions::AttributeReferencePtr &source_attribute_in,
- const physical::PhysicalPtr &builder_in) {
- probe_side_bloom_filters.emplace_back(attribute_in,
- source_attribute_in,
- builder_in);
- }
- PhysicalPtr builder;
- std::vector<BuildSide> build_side_bloom_filters;
- std::vector<ProbeSide> probe_side_bloom_filters;
-};
-
/**
* @brief Physical hash join node.
*/
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/physical/Physical.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Physical.hpp b/query_optimizer/physical/Physical.hpp
index 721b987..389cd05 100644
--- a/query_optimizer/physical/Physical.hpp
+++ b/query_optimizer/physical/Physical.hpp
@@ -39,6 +39,56 @@ namespace physical {
class Physical;
typedef std::shared_ptr<const Physical> PhysicalPtr;
+struct BloomFilterConfig {
+ struct BuildSide {
+ BuildSide(const expressions::AttributeReferencePtr &attribute_in)
+ : attribute(attribute_in) {
+ }
+ expressions::AttributeReferencePtr attribute;
+ };
+ struct ProbeSide {
+ ProbeSide(const expressions::AttributeReferencePtr &attribute_in,
+ const expressions::AttributeReferencePtr &source_attribute_in,
+ const physical::PhysicalPtr &builder_in)
+ : attribute(attribute_in),
+ source_attribute(source_attribute_in),
+ builder(builder_in) {
+ }
+ expressions::AttributeReferencePtr attribute;
+ expressions::AttributeReferencePtr source_attribute;
+ PhysicalPtr builder;
+ };
+ BloomFilterConfig() {}
+ BloomFilterConfig(const PhysicalPtr &builder_in)
+ : builder(builder_in) {
+ }
+ BloomFilterConfig(const PhysicalPtr &builder_in,
+ const std::vector<BuildSide> &build_side_bloom_filters_in,
+ const std::vector<ProbeSide> &probe_side_bloom_filters_in)
+ : builder(builder_in),
+ build_side_bloom_filters(build_side_bloom_filters_in),
+ probe_side_bloom_filters(probe_side_bloom_filters_in) {
+ }
+ void addBuildSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in) {
+ for (const auto &build_bf : build_side_bloom_filters) {
+ if (attribute_in == build_bf.attribute) {
+ return;
+ }
+ }
+ build_side_bloom_filters.emplace_back(attribute_in);
+ }
+ void addProbeSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in,
+ const expressions::AttributeReferencePtr &source_attribute_in,
+ const physical::PhysicalPtr &builder_in) {
+ probe_side_bloom_filters.emplace_back(attribute_in,
+ source_attribute_in,
+ builder_in);
+ }
+ PhysicalPtr builder;
+ std::vector<BuildSide> build_side_bloom_filters;
+ std::vector<ProbeSide> probe_side_bloom_filters;
+};
+
/**
* @brief Base class for physical plan nodes.
*/
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/rules/AttachBloomFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp
index e3bdc36..f6602b8 100644
--- a/query_optimizer/rules/AttachBloomFilters.cpp
+++ b/query_optimizer/rules/AttachBloomFilters.cpp
@@ -51,24 +51,24 @@ P::PhysicalPtr AttachBloomFilters::apply(const P::PhysicalPtr &input) {
visitProducer(input, 0);
visitConsumer(input);
- for (const auto &info_vec_pair : consumers_) {
- std::cerr << "--------\n"
- << "Node " << info_vec_pair.first->getName()
- << " " << info_vec_pair.first << "\n";
-
- for (const auto &info : info_vec_pair.second) {
- std::cerr << info.attribute->attribute_alias();
- if (info.attribute->id() != info.source_attribute->id()) {
- std::cerr << "{FROM " << info.source_attribute->attribute_alias() << "}";
- }
- if (info.from_sibling) {
- std::cerr << " sibling";
- }
- std::cerr << " @" << info.source << "[" << info.depth << "]"
- << ": " << info.selectivity << "\n";
- }
- std::cerr << "********\n";
- }
+// for (const auto &info_vec_pair : consumers_) {
+// std::cerr << "--------\n"
+// << "Node " << info_vec_pair.first->getName()
+// << " " << info_vec_pair.first << "\n";
+//
+// for (const auto &info : info_vec_pair.second) {
+// std::cerr << info.attribute->attribute_alias();
+// if (info.attribute->id() != info.source_attribute->id()) {
+// std::cerr << "{FROM " << info.source_attribute->attribute_alias() << "}";
+// }
+// if (info.from_sibling) {
+// std::cerr << " sibling";
+// }
+// std::cerr << " @" << info.source << "[" << info.depth << "]"
+// << ": " << info.selectivity << "\n";
+// }
+// std::cerr << "********\n";
+// }
return visitAndAttach(input);
}
@@ -192,9 +192,20 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
info.attribute);
}
}
+ }
+ P::PhysicalPtr consumer_child = nullptr;
+ if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+ consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
+ }
+ if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
+ consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
+ }
+
+ if (consumer_child != nullptr) {
// Decide attaches
- if (cost_model_->estimateCardinality(consumer_child) > 200000000 &&
+ auto &consumer_bloom_filters = consumers_[consumer_child];
+ if (cost_model_->estimateCardinality(consumer_child) > 10000000 &&
!consumer_bloom_filters.empty()) {
std::map<E::AttributeReferencePtr, const BloomFilterInfo*> filters;
for (const auto &info : consumer_bloom_filters) {
@@ -240,10 +251,10 @@ P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &n
if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
const auto attach_it = attaches_.find(node);
if (attach_it != attaches_.end()) {
- for (const auto& item : attach_it->second.probe_side_bloom_filters) {
- std::cout << "Attach probe from " << item.builder
- << " to " << node << "\n";
- }
+// for (const auto& item : attach_it->second.probe_side_bloom_filters) {
+// std::cout << "Attach probe from " << item.builder
+// << " to " << node << "\n";
+// }
const P::HashJoinPtr hash_join =
std::static_pointer_cast<const P::HashJoin>(node);
@@ -259,6 +270,25 @@ P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &n
}
}
+ if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
+ const auto attach_it = attaches_.find(node);
+ if (attach_it != attaches_.end()) {
+// for (const auto& item : attach_it->second.probe_side_bloom_filters) {
+// std::cout << "Attach probe from " << item.builder
+// << " to " << node << "\n";
+// }
+
+ const P::AggregatePtr aggregate =
+ std::static_pointer_cast<const P::Aggregate>(node);
+ return P::Aggregate::Create(
+ aggregate->input(),
+ aggregate->grouping_expressions(),
+ aggregate->aggregate_expressions(),
+ aggregate->filter_predicate(),
+ attach_it->second);
+ }
+ }
+
if (has_changed) {
return node->copyWithNewChildren(new_children);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
index 42a7402..22485b2 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
@@ -150,7 +150,8 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
cost_model_->estimateCardinality(tables[i]),
cost_model_->estimateSelectivity(tables[i]),
CountSharedAttributes(join_group.referenced_attributes,
- tables[i]->getOutputAttributes()));
+ tables[i]->getOutputAttributes()),
+ tables[i]->getPhysicalType() == physical::PhysicalType::kAggregate);
}
// Auxiliary mapping info.
@@ -316,6 +317,7 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
selected_probe_table_info->estimated_num_output_attributes =
CountSharedAttributes(join_group.referenced_attributes,
output->getOutputAttributes());
+ selected_probe_table_info->is_aggregation = false;
remaining_tables.emplace(selected_probe_table_info);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
index a0e34ce..7a6fa81 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
@@ -75,12 +75,14 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
const physical::PhysicalPtr &table_in,
const std::size_t estimated_cardinality_in,
const double estimated_selectivity_in,
- const std::size_t estimated_num_output_attributes_in)
+ const std::size_t estimated_num_output_attributes_in,
+ const bool is_aggregation_in)
: table_info_id(table_info_id_in),
table(table_in),
estimated_cardinality(estimated_cardinality_in),
estimated_selectivity(estimated_selectivity_in),
- estimated_num_output_attributes(estimated_num_output_attributes_in) {
+ estimated_num_output_attributes(estimated_num_output_attributes_in),
+ is_aggregation(is_aggregation_in) {
}
const std::size_t table_info_id;
@@ -88,6 +90,7 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
std::size_t estimated_cardinality;
double estimated_selectivity;
std::size_t estimated_num_output_attributes;
+ bool is_aggregation;
};
struct JoinPair {
@@ -107,13 +110,17 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
return rhs_has_large_output;
}
-// const bool lhs_has_small_build =
-// !lhs_has_large_output && lhs.build->estimated_cardinality < 0x1000;
-// const bool rhs_has_small_build =
-// !rhs_has_large_output && rhs.build->estimated_cardinality < 0x1000;
-// if (lhs_has_small_build != rhs_has_small_build) {
-// return lhs_has_small_build;
-// }
+ const bool lhs_has_small_build =
+ !lhs_has_large_output && lhs.build->estimated_cardinality < 0x100;
+ const bool rhs_has_small_build =
+ !rhs_has_large_output && rhs.build->estimated_cardinality < 0x100;
+ if (lhs_has_small_build != rhs_has_small_build) {
+ return lhs_has_small_build;
+ }
+
+ if (lhs.probe->is_aggregation != rhs.probe->is_aggregation) {
+ return lhs.probe->is_aggregation;
+ }
if (lhs.probe->estimated_cardinality != rhs.probe->estimated_cardinality) {
return lhs.probe->estimated_cardinality < rhs.probe->estimated_cardinality;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 4878cf1..8c9e8b6 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -46,10 +46,13 @@
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "types/containers/Tuple.hpp"
+#include "utility/BloomFilterAdapter.hpp"
#include "glog/logging.h"
@@ -57,6 +60,8 @@ using std::unique_ptr;
namespace quickstep {
+DECLARE_int64(bloom_adapter_batch_size);
+
AggregationOperationState::AggregationOperationState(
const CatalogRelationSchema &input_relation,
const std::vector<const AggregateFunction*> &aggregate_functions,
@@ -64,12 +69,16 @@ AggregationOperationState::AggregationOperationState(
std::vector<bool> &&is_distinct,
std::vector<std::unique_ptr<const Scalar>> &&group_by,
const Predicate *predicate,
+ std::vector<const BloomFilter *> &&bloom_filters,
+ std::vector<attribute_id> &&bloom_filter_attribute_ids,
const std::size_t estimated_num_entries,
const HashTableImplType hash_table_impl_type,
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
StorageManager *storage_manager)
: input_relation_(input_relation),
predicate_(predicate),
+ bloom_filters_(std::move(bloom_filters)),
+ bloom_filter_attribute_ids_(std::move(bloom_filter_attribute_ids)),
group_by_list_(std::move(group_by)),
arguments_(std::move(arguments)),
is_distinct_(std::move(is_distinct)),
@@ -183,7 +192,8 @@ AggregationOperationState::AggregationOperationState(
AggregationOperationState* AggregationOperationState::ReconstructFromProto(
const serialization::AggregationOperationState &proto,
const CatalogDatabaseLite &database,
- StorageManager *storage_manager) {
+ StorageManager *storage_manager,
+ const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
DCHECK(ProtoIsValid(proto, database));
// Rebuild contructor arguments from their representation in 'proto'.
@@ -232,12 +242,25 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
database));
}
+ std::vector<const BloomFilter*> bloom_filter_vector;
+ std::vector<attribute_id> bloom_filter_attribute_ids;
+ for (int i = 0; i < proto.bloom_filters_size(); ++i) {
+ std::cerr << "Add bloom filter " << i << "\n";
+ // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
+ const auto bloom_filter_proto = proto.bloom_filters(i);
+ bloom_filter_vector.emplace_back(
+ bloom_filters[bloom_filter_proto.bloom_filter_id()].get());
+ bloom_filter_attribute_ids.emplace_back(bloom_filter_proto.attr_id());
+ }
+
return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
aggregate_functions,
std::move(arguments),
std::move(is_distinct),
std::move(group_by_expressions),
predicate.release(),
+ std::move(bloom_filter_vector),
+ std::move(bloom_filter_attribute_ids),
proto.estimated_num_entries(),
HashTableImplTypeFromProto(proto.hash_table_impl_type()),
distinctify_hash_table_impl_types,
@@ -340,6 +363,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
// tuples so that it can be reused across multiple aggregates (i.e. we only
// pay the cost of evaluating the predicate once).
std::unique_ptr<TupleIdSequence> reuse_matches;
+ if (predicate_) {
+ reuse_matches.reset(block->getMatchesForPredicate(predicate_.get()));
+ }
+
for (std::size_t agg_idx = 0;
agg_idx < handles_.size();
++agg_idx) {
@@ -358,7 +385,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
arguments_[agg_idx],
local_arguments_as_attributes,
{}, /* group_by */
- predicate_.get(),
distinctify_hashtables_[agg_idx].get(),
&reuse_matches,
nullptr /* reuse_group_by_vectors */);
@@ -369,7 +395,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
block->aggregate(*handles_[agg_idx],
arguments_[agg_idx],
local_arguments_as_attributes,
- predicate_.get(),
&reuse_matches));
}
}
@@ -391,6 +416,71 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
// GROUP BY expressions once).
std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
+ if (predicate_) {
+ reuse_matches.reset(block->getMatchesForPredicate(predicate_.get()));
+ }
+
+ if (bloom_filters_.size() > 0) {
+ const std::size_t num_tuples = block->getNumTuples();
+// std::cerr << "Before: "
+// << (reuse_matches ? reuse_matches->numTuples() : num_tuples)
+// << "\n";
+ std::unique_ptr<ValueAccessor> accessor;
+ if (reuse_matches) {
+ accessor.reset(
+ block->getTupleStorageSubBlock().createValueAccessor(reuse_matches.get()));
+ } else {
+ accessor.reset(
+ block->getTupleStorageSubBlock().createValueAccessor());
+ }
+ InvokeOnAnyValueAccessor(
+ accessor.get(),
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ std::unique_ptr<TupleIdSequence> filtered(new TupleIdSequence(num_tuples));
+
+ std::vector<std::size_t> attr_size_vector;
+ attr_size_vector.reserve(bloom_filter_attribute_ids_.size());
+ for (const auto &attr : bloom_filter_attribute_ids_) {
+ auto val_and_size =
+ accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, attr);
+ attr_size_vector.emplace_back(val_and_size.second);
+ }
+
+ std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter;
+ bloom_filter_adapter.reset(new BloomFilterAdapter(
+ bloom_filters_, bloom_filter_attribute_ids_, attr_size_vector));
+
+ std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size;
+ std::uint32_t num_tuples_left = accessor->getNumTuples();
+ std::vector<tuple_id> batch(num_tuples_left);
+
+ do {
+ std::uint32_t batch_size =
+ batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
+ for (std::size_t i = 0; i < batch_size; ++i) {
+ accessor->next();
+ batch.push_back(accessor->getCurrentPosition());
+ }
+
+ std::size_t num_hits = bloom_filter_adapter->bulkProbe<true>(accessor, batch);
+ for (std::size_t t = 0; t < num_hits; ++t){
+ filtered->set(batch[t], true);
+ }
+
+ batch.clear();
+ num_tuples_left -= batch_size;
+ batch_size_try = batch_size * 2;
+ } while (num_tuples_left > 0);
+
+ if (reuse_matches) {
+ reuse_matches->intersectWith(*filtered);
+ } else {
+ reuse_matches.reset(filtered.release());
+ }
+ });
+// std::cerr << "After: " << reuse_matches->numTuples() << "\n";
+ }
+
for (std::size_t agg_idx = 0;
agg_idx < handles_.size();
++agg_idx) {
@@ -402,7 +492,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
arguments_[agg_idx],
nullptr, /* arguments_as_attributes */
group_by_list_,
- predicate_.get(),
distinctify_hashtables_[agg_idx].get(),
&reuse_matches,
&reuse_group_by_vectors);
@@ -416,7 +505,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
block->aggregateGroupBy(*handles_[agg_idx],
arguments_[agg_idx],
group_by_list_,
- predicate_.get(),
agg_hash_table,
&reuse_matches,
&reuse_group_by_vectors);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 0199749..5db7325 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -33,6 +33,7 @@
#include "storage/HashTableBase.hpp"
#include "storage/HashTablePool.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "utility/BloomFilter.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
@@ -108,6 +109,8 @@ class AggregationOperationState {
std::vector<bool> &&is_distinct,
std::vector<std::unique_ptr<const Scalar>> &&group_by,
const Predicate *predicate,
+ std::vector<const BloomFilter *> &&bloom_filters,
+ std::vector<attribute_id> &&bloom_filter_attribute_ids,
const std::size_t estimated_num_entries,
const HashTableImplType hash_table_impl_type,
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
@@ -131,7 +134,8 @@ class AggregationOperationState {
static AggregationOperationState* ReconstructFromProto(
const serialization::AggregationOperationState &proto,
const CatalogDatabaseLite &database,
- StorageManager *storage_manager);
+ StorageManager *storage_manager,
+ const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters);
/**
* @brief Check whether a serialization::AggregationOperationState is
@@ -181,6 +185,10 @@ class AggregationOperationState {
// filter predicate (if any), and the list of GROUP BY expressions (if any).
const CatalogRelationSchema &input_relation_;
std::unique_ptr<const Predicate> predicate_;
+
+ std::vector<const BloomFilter*> bloom_filters_;
+ std::vector<attribute_id> bloom_filter_attribute_ids_;
+
std::vector<std::unique_ptr<const Scalar>> group_by_list_;
// Each individual aggregate in this operation has an AggregationHandle and
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/AggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index bf78e3a..165148e 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -42,4 +42,10 @@ message AggregationOperationState {
// Each DISTINCT aggregation has its distinctify hash table impl type.
repeated HashTableImplType distinctify_hash_table_impl_types = 7;
+
+ message BloomFilter {
+ required uint32 bloom_filter_id = 1;
+ required uint32 attr_id = 2;
+ }
+ repeated BloomFilter bloom_filters = 8;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index 2c526c2..04c2ca8 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -41,14 +41,12 @@
#include "types/TypedValue.hpp"
#include "utility/BloomFilter.hpp"
#include "utility/BloomFilterAdapter.hpp"
-#include "utility/EventProfiler.hpp"
#include "utility/HashPair.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
DECLARE_int64(bloom_adapter_batch_size);
-DECLARE_bool(adapt_bloom_filters);
/** \addtogroup Storage
* @{
@@ -1048,7 +1046,7 @@ class HashTable : public HashTableBase<resizable,
* @param probe_attribute_ids The vector of attribute ids to use for probing
* the bloom filter.
**/
- inline void addProbeSideAttributeIds(const attribute_id &probe_attribute_id) {
+ inline void addProbeSideAttributeId(const attribute_id probe_attribute_id) {
probe_attribute_ids_.push_back(probe_attribute_id);
}
@@ -2263,7 +2261,7 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
for (const auto &probe_attr : probe_attribute_ids_) {
auto val_and_size =
accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, probe_attr);
- attr_size_vector.push_back(val_and_size.second);
+ attr_size_vector.emplace_back(val_and_size.second);
}
bloom_filter_adapter.reset(new BloomFilterAdapter(
@@ -2280,30 +2278,18 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
std::uint32_t num_tuples_left = accessor->getNumTuples();
std::vector<tuple_id> batch(num_tuples_left);
- auto *container = simple_profiler.getContainer();
- auto *line = container->getEventLine(0);
-
do {
- const std::uint32_t batch_size =
+ std::uint32_t batch_size =
batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
for (std::size_t i = 0; i < batch_size; ++i) {
accessor->next();
batch.push_back(accessor->getCurrentPosition());
}
- line->emplace_back();
- std::size_t num_hits;
- if (FLAGS_adapt_bloom_filters) {
- num_hits = bloom_filter_adapter->bulkProbe<true>(accessor, batch);
- } else {
- num_hits = bloom_filter_adapter->bulkProbe<false>(accessor, batch);
- }
- line->back().setPayload(num_hits+0);
- line->back().endEvent();
-// std::size_t num_hits = batch_size;
+ std::size_t num_hits = bloom_filter_adapter->bulkProbe<true>(accessor, batch);
- for (std::size_t i = 0; i < num_hits; ++i){
- const tuple_id probe_tid = batch[i];
+ for (std::size_t t = 0; t < num_hits; ++t){
+ tuple_id probe_tid = batch[t];
TypedValue key = accessor->getTypedValueAtAbsolutePosition(key_attr_id, probe_tid);
if (check_for_null_keys && key.isNull()) {
continue;
@@ -2320,6 +2306,7 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
break;
}
}
+ batch.clear();
num_tuples_left -= batch_size;
batch_size_try = batch_size * 2;
} while (!accessor->iterationFinished());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index 0cf9f5e..90bc9f7 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -34,10 +34,10 @@ message HashTable {
required HashTableImplType hash_table_impl_type = 1;
repeated Type key_types = 2;
required uint64 estimated_num_entries = 3;
- message BloomFilterReference {
+ repeated uint32 build_side_bloom_filter_id = 4;
+ message ProbeSideBloomFilter {
required uint32 bloom_filter_id = 1;
required uint32 attr_id = 2;
}
- repeated BloomFilterReference build_side_bloom_filters = 4;
- repeated BloomFilterReference probe_side_bloom_filters = 5;
+ repeated ProbeSideBloomFilter probe_side_bloom_filters = 6;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 00a09c1..df2962a 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -318,11 +318,9 @@ class HashTableFactory {
// individual implementations of the hash table constructors.
// Check if there are any build side bloom filter defined on the hash table.
- if (proto.build_side_bloom_filters_size() > 0) {
- CHECK_EQ(1u, proto.build_side_bloom_filters_size());
+ if (proto.build_side_bloom_filter_id_size() > 0) {
hash_table->enableBuildSideBloomFilter();
- hash_table->setBuildSideBloomFilter(
- bloom_filters[proto.build_side_bloom_filters(0).bloom_filter_id()].get());
+ hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
}
// Check if there are any probe side bloom filters defined on the hash table.
@@ -335,8 +333,7 @@ class HashTableFactory {
hash_table->addProbeSideBloomFilter(
bloom_filters[probe_side_bloom_filter.bloom_filter_id()].get());
- // Add the attribute ids corresponding to this probe bloom filter.
- hash_table->addProbeSideAttributeIds(probe_side_bloom_filter.attr_id());
+ hash_table->addProbeSideAttributeId(probe_side_bloom_filter.attr_id());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index fdd438d..78aba7c 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -389,15 +389,7 @@ AggregationState* StorageBlock::aggregate(
const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
- const Predicate *predicate,
std::unique_ptr<TupleIdSequence> *reuse_matches) const {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this same
- // block.
- if (predicate && !*reuse_matches) {
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all the arguments to this aggregate are plain relation attributes,
// aggregate directly on a ValueAccessor from this block to avoid a copy.
@@ -418,7 +410,6 @@ void StorageBlock::aggregateGroupBy(
const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
AggregationStateHashTableBase *hash_table,
std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
@@ -440,14 +431,7 @@ void StorageBlock::aggregateGroupBy(
ColumnVectorsValueAccessor temp_result;
{
std::unique_ptr<ValueAccessor> accessor;
- if (predicate) {
- if (!*reuse_matches) {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this
- // same block.
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
+ if (reuse_matches) {
// Create a filtered ValueAccessor that only iterates over predicate
// matches.
accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
@@ -499,7 +483,6 @@ void StorageBlock::aggregateDistinct(
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
AggregationStateHashTableBase *distinctify_hash_table,
std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
@@ -514,14 +497,7 @@ void StorageBlock::aggregateDistinct(
ColumnVectorsValueAccessor temp_result;
{
std::unique_ptr<ValueAccessor> accessor;
- if (predicate) {
- if (!*reuse_matches) {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this
- // same block.
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
+ if (reuse_matches) {
// Create a filtered ValueAccessor that only iterates over predicate
// matches.
accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 3ae3812..3217fa2 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -410,7 +410,6 @@ class StorageBlock : public StorageBlockBase {
const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
- const Predicate *predicate,
std::unique_ptr<TupleIdSequence> *reuse_matches) const;
/**
@@ -460,7 +459,6 @@ class StorageBlock : public StorageBlockBase {
void aggregateGroupBy(const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
AggregationStateHashTableBase *hash_table,
std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>>
@@ -505,7 +503,6 @@ class StorageBlock : public StorageBlockBase {
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
AggregationStateHashTableBase *distinctify_hash_table,
std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
@@ -588,6 +585,8 @@ class StorageBlock : public StorageBlockBase {
**/
const std::size_t getNumTuples() const;
+ TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
+
private:
static TupleStorageSubBlock* CreateTupleStorageSubBlock(
const CatalogRelationSchema &relation,
@@ -627,8 +626,6 @@ class StorageBlock : public StorageBlockBase {
// StorageBlock's header.
bool rebuildIndexes(bool short_circuit);
- TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
-
std::unordered_map<attribute_id, TypedValue>* generateUpdatedValues(
const ValueAccessor &accessor,
const tuple_id tuple,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index 37fa790..4cc1b0f 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -29,6 +29,7 @@
#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/physical/PhysicalType.hpp"
@@ -155,6 +156,29 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
node_info.labels.emplace_back("RIGHT join attrs unique");
}
+ const auto &bf_config = hash_join->bloom_filter_config();
+ for (const auto &bf : bf_config.build_side_bloom_filters) {
+ node_info.labels.emplace_back(
+ std::string("[BF build] ") + bf.attribute->attribute_alias());
+ }
+ for (const auto &bf : bf_config.probe_side_bloom_filters) {
+ node_info.labels.emplace_back(
+ std::string("[BF probe] ") + bf.attribute->attribute_alias());
+ }
+
+ break;
+ }
+ case P::PhysicalType::kAggregate: {
+ const P::AggregatePtr aggregate =
+ std::static_pointer_cast<const P::Aggregate>(input);
+ node_info.labels.emplace_back(input->getName());
+
+ const auto &bf_config = aggregate->bloom_filter_config();
+ for (const auto &bf : bf_config.probe_side_bloom_filters) {
+ node_info.labels.emplace_back(
+ std::string("[BF probe] ") + bf.attribute->attribute_alias());
+ }
+
break;
}
default: {