You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/04 01:56:40 UTC
[11/13] incubator-quickstep git commit: Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/rules/AttachBloomFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp
new file mode 100644
index 0000000..03a42a0
--- /dev/null
+++ b/query_optimizer/rules/AttachBloomFilters.cpp
@@ -0,0 +1,308 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_optimizer/rules/AttachBloomFilters.hpp"
+
+#include <memory>
+#include <set>
+#include <unordered_set>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr AttachBloomFilters::apply(const P::PhysicalPtr &input) {
+ DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+ cost_model_.reset(
+ new cost::StarSchemaSimpleCostModel(
+ std::static_pointer_cast<const P::TopLevelPlan>(input)->shared_subplans()));
+
+ visitProducer(input, 0);
+ visitConsumer(input);
+
+// for (const auto &info_vec_pair : consumers_) {
+// std::cerr << "--------\n"
+// << "Node " << info_vec_pair.first->getName()
+// << " " << info_vec_pair.first << "\n";
+//
+// for (const auto &info : info_vec_pair.second) {
+// std::cerr << info.attribute->attribute_alias();
+// if (info.attribute->id() != info.source_attribute->id()) {
+// std::cerr << "{FROM " << info.source_attribute->attribute_alias() << "}";
+// }
+// if (info.from_sibling) {
+// std::cerr << " sibling";
+// }
+// std::cerr << " @" << info.source << "[" << info.depth << "]"
+// << ": " << info.selectivity << "\n";
+// }
+// std::cerr << "********\n";
+// }
+
+ return visitAndAttach(input);
+}
+
+void AttachBloomFilters::visitProducer(const P::PhysicalPtr &node, const int depth) {
+ for (const P::PhysicalPtr &child : node->children()) {
+ visitProducer(child, depth+1);
+ }
+
+ std::vector<BloomFilterInfo> bloom_filters;
+
+ if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+ const P::HashJoinPtr &hash_join =
+ std::static_pointer_cast<const P::HashJoin>(node);
+ const P::PhysicalPtr &build_node = hash_join->right();
+ double selectivity = cost_model_->estimateSelectivity(build_node);
+ if (selectivity < 1.0) {
+ auto &build_node_info = producers_[build_node];
+ for (const auto &attr : hash_join->right_join_attributes()) {
+ build_node_info.emplace_back(node, attr, depth, selectivity, false);
+ }
+ }
+ }
+
+ const std::vector<E::AttributeReferencePtr> output_attributes(
+ node->getOutputAttributes());
+ std::unordered_set<E::ExprId> output_attribute_ids;
+ for (const auto &attr : output_attributes) {
+ output_attribute_ids.emplace(attr->id());
+ }
+
+ // First check inherited bloom filters
+ std::vector<const BloomFilterInfo*> candidates;
+ switch (node->getPhysicalType()) {
+ case P::PhysicalType::kAggregate:
+ case P::PhysicalType::kSelection:
+ case P::PhysicalType::kHashJoin: {
+ for (const P::PhysicalPtr &child : node->children()) {
+ for (const BloomFilterInfo &info : producers_[child]) {
+ candidates.emplace_back(&info);
+ }
+ }
+ }
+ default:
+ break;
+ }
+
+ for (const BloomFilterInfo *info : candidates) {
+ if (output_attribute_ids.find(info->attribute->id()) != output_attribute_ids.end()) {
+ bloom_filters.emplace_back(
+ info->source, info->attribute, info->depth, info->selectivity, false);
+ }
+ }
+
+ // Self-produced bloom filters
+// double selectivity = cost_model_->estimateSelectivity(node);
+// if (selectivity < 1.0) {
+// for (const auto &attr : output_attributes) {
+// bloom_filters.emplace_back(node, attr, depth, selectivity, false);
+// }
+// }
+
+ producers_.emplace(node, std::move(bloom_filters));
+}
+
+void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
+ std::vector<BloomFilterInfo> bloom_filters;
+
+ // Bloom filters from parent
+ const auto &parent_bloom_filters = consumers_[node];
+ if (!parent_bloom_filters.empty()) {
+ for (const auto &child : node->children()) {
+ std::unordered_set<E::ExprId> child_output_attribute_ids;
+ for (const auto &attr : child->getOutputAttributes()) {
+ child_output_attribute_ids.emplace(attr->id());
+ }
+
+ std::vector<BloomFilterInfo> bloom_filters;
+ for (const auto &info : parent_bloom_filters) {
+ if (child_output_attribute_ids.find(info.attribute->id())
+ != child_output_attribute_ids.end()) {
+ bloom_filters.emplace_back(info.source,
+ info.attribute,
+ info.depth,
+ info.selectivity,
+ false,
+ info.source_attribute);
+ }
+ }
+ consumers_.emplace(child, std::move(bloom_filters));
+ }
+ }
+
+ // Bloom filters from build side to probe side via HashJoin
+ if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+ const P::HashJoinPtr hash_join =
+ std::static_pointer_cast<const P::HashJoin>(node);
+ if (hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin ||
+ hash_join->join_type() == P::HashJoin::JoinType::kLeftSemiJoin) {
+ const P::PhysicalPtr &producer_child = hash_join->right();
+ const P::PhysicalPtr &consumer_child = hash_join->left();
+ std::unordered_map<E::ExprId, E::AttributeReferencePtr> join_attribute_pairs;
+ for (std::size_t i = 0; i < hash_join->left_join_attributes().size(); ++i) {
+ const E::AttributeReferencePtr probe_join_attribute =
+ hash_join->left_join_attributes()[i];
+ const E::AttributeReferencePtr build_join_attribute =
+ hash_join->right_join_attributes()[i];
+ join_attribute_pairs.emplace(build_join_attribute->id(),
+ probe_join_attribute);
+ }
+
+ auto &consumer_bloom_filters = consumers_[consumer_child];
+ for (const auto &info : producers_[producer_child]) {
+ const auto pair_it = join_attribute_pairs.find(info.attribute->id());
+ if (pair_it != join_attribute_pairs.end()) {
+ consumer_bloom_filters.emplace_back(info.source,
+ pair_it->second,
+ info.depth,
+ info.selectivity,
+ true,
+ info.attribute);
+ }
+ }
+ }
+ }
+
+ P::PhysicalPtr consumer_child = nullptr;
+ if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+ consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
+ }
+ if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
+ consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
+ }
+
+ if (consumer_child != nullptr) {
+ // Decide attaches
+ auto &consumer_bloom_filters = consumers_[consumer_child];
+ if (cost_model_->estimateCardinality(consumer_child) > 10000000 &&
+ !consumer_bloom_filters.empty()) {
+ std::map<E::AttributeReferencePtr, const BloomFilterInfo*> filters;
+ for (const auto &info : consumer_bloom_filters) {
+ auto it = filters.find(info.attribute);
+ if (it == filters.end()) {
+ filters.emplace(info.attribute, &info);
+ } else {
+ if (BloomFilterInfo::isBetterThan(&info, it->second)) {
+ it->second = &info;
+ }
+ }
+ }
+
+ auto &probe_attaches = getBloomFilterConfig(node);
+ for (const auto &pair : filters) {
+ auto &build_attaches = getBloomFilterConfig(pair.second->source);
+ build_attaches.addBuildSideBloomFilter(
+ pair.second->source_attribute);
+ probe_attaches.addProbeSideBloomFilter(
+ pair.first,
+ pair.second->source_attribute,
+ pair.second->source);
+ }
+ }
+ }
+
+ for (const auto &child : node->children()) {
+ visitConsumer(child);
+ }
+}
+
+P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &node) {
+ std::vector<P::PhysicalPtr> new_children;
+ bool has_changed = false;
+ for (const auto &child : node->children()) {
+ P::PhysicalPtr new_child = visitAndAttach(child);
+ if (new_child != child) {
+ has_changed = true;
+ }
+ new_children.emplace_back(new_child);
+ }
+
+ if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+ const auto attach_it = attaches_.find(node);
+ if (attach_it != attaches_.end()) {
+// for (const auto& item : attach_it->second.probe_side_bloom_filters) {
+// std::cout << "Attach probe from " << item.builder
+// << " to " << node << "\n";
+// }
+
+ const P::HashJoinPtr hash_join =
+ std::static_pointer_cast<const P::HashJoin>(node);
+ return P::HashJoin::Create(
+ new_children[0],
+ new_children[1],
+ hash_join->left_join_attributes(),
+ hash_join->right_join_attributes(),
+ hash_join->residual_predicate(),
+ hash_join->project_expressions(),
+ hash_join->join_type(),
+ attach_it->second);
+ }
+ }
+
+ if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
+ const auto attach_it = attaches_.find(node);
+ if (attach_it != attaches_.end()) {
+// for (const auto& item : attach_it->second.probe_side_bloom_filters) {
+// std::cout << "Attach probe from " << item.builder
+// << " to " << node << "\n";
+// }
+
+ const P::AggregatePtr aggregate =
+ std::static_pointer_cast<const P::Aggregate>(node);
+ return P::Aggregate::Create(
+ aggregate->input(),
+ aggregate->grouping_expressions(),
+ aggregate->aggregate_expressions(),
+ aggregate->filter_predicate(),
+ attach_it->second);
+ }
+ }
+
+ if (has_changed) {
+ return node->copyWithNewChildren(new_children);
+ }
+
+ return node;
+}
+
+P::BloomFilterConfig& AttachBloomFilters::getBloomFilterConfig(const physical::PhysicalPtr &node) {
+ if (attaches_.find(node) == attaches_.end()) {
+ attaches_.emplace(node, node);
+ }
+ return attaches_[node];
+}
+
+} // namespace optimizer
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/rules/AttachBloomFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.hpp b/query_optimizer/rules/AttachBloomFilters.hpp
new file mode 100644
index 0000000..e4437f7
--- /dev/null
+++ b/query_optimizer/rules/AttachBloomFilters.hpp
@@ -0,0 +1,118 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ * @{
+ */
+
+/**
+ * @brief TODO
+ */
+class AttachBloomFilters : public Rule<physical::Physical> {
+ public:
+ AttachBloomFilters() {}
+
+ ~AttachBloomFilters() override {}
+
+ std::string getName() const override {
+ return "AttachBloomFilters";
+ }
+
+ physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+ struct BloomFilterInfo {
+ BloomFilterInfo(const physical::PhysicalPtr &source_in,
+ const expressions::AttributeReferencePtr &attribute_in,
+ const int depth_in,
+ const double selectivity_in,
+ const bool from_sibling_in,
+ const expressions::AttributeReferencePtr &source_attribute_in = nullptr)
+ : source(source_in),
+ attribute(attribute_in),
+ depth(depth_in),
+ selectivity(selectivity_in),
+ from_sibling(from_sibling_in),
+ source_attribute(
+ source_attribute_in == nullptr
+ ? attribute_in
+ : source_attribute_in) {
+
+ }
+ static bool isBetterThan(const BloomFilterInfo *a,
+ const BloomFilterInfo *b) {
+ if (a->selectivity == b->selectivity) {
+ return a->depth > b->depth;
+ } else {
+ return a->selectivity < b->selectivity;
+ }
+ }
+ physical::PhysicalPtr source;
+ expressions::AttributeReferencePtr attribute;
+ int depth;
+ double selectivity;
+ bool from_sibling;
+ expressions::AttributeReferencePtr source_attribute;
+ };
+
+ void visitProducer(const physical::PhysicalPtr &node, const int depth);
+
+ void visitConsumer(const physical::PhysicalPtr &node);
+
+ physical::PhysicalPtr visitAndAttach(const physical::PhysicalPtr &node);
+
+ physical::BloomFilterConfig &getBloomFilterConfig(const physical::PhysicalPtr &node);
+
+ std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+ std::map<physical::PhysicalPtr, std::vector<BloomFilterInfo>> producers_;
+ std::map<physical::PhysicalPtr, std::vector<BloomFilterInfo>> consumers_;
+ std::map<physical::PhysicalPtr, physical::BloomFilterConfig> attaches_;
+
+ DISALLOW_COPY_AND_ASSIGN(AttachBloomFilters);
+};
+
+/** @} */
+
+} // namespace optimizer
+} // namespace quickstep
+
+#endif /* QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_ */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 04a9814..6b248f4 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -18,6 +18,7 @@
add_subdirectory(tests)
# Declare micro-libs:
+add_library(quickstep_queryoptimizer_rules_AttachBloomFilters AttachBloomFilters.cpp AttachBloomFilters.hpp)
add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
@@ -36,6 +37,20 @@ add_library(quickstep_queryoptimizer_rules_UnnestSubqueries UnnestSubqueries.cpp
# Link dependencies:
+target_link_libraries(quickstep_queryoptimizer_rules_AttachBloomFilters
+ quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+ quickstep_queryoptimizer_expressions_AttributeReference
+ quickstep_queryoptimizer_expressions_ExprId
+ quickstep_queryoptimizer_expressions_NamedExpression
+ quickstep_queryoptimizer_expressions_PatternMatcher
+ quickstep_queryoptimizer_expressions_Predicate
+ quickstep_queryoptimizer_physical_HashJoin
+ quickstep_queryoptimizer_physical_PatternMatcher
+ quickstep_queryoptimizer_physical_Physical
+ quickstep_queryoptimizer_physical_PhysicalType
+ quickstep_queryoptimizer_physical_TopLevelPlan
+ quickstep_queryoptimizer_rules_Rule
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_rules_BottomUpRule
glog
quickstep_queryoptimizer_rules_Rule
@@ -127,6 +142,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOpti
quickstep_queryoptimizer_physical_PhysicalType
quickstep_queryoptimizer_physical_TopLevelPlan
quickstep_queryoptimizer_rules_Rule
+ quickstep_utility_DisjointTreeForest
quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_rules_SwapProbeBuild
quickstep_queryoptimizer_costmodel_SimpleCostModel
@@ -187,6 +203,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_UpdateExpression
# Module all-in-one library:
add_library(quickstep_queryoptimizer_rules ../../empty_src.cpp OptimizerRulesModule.hpp)
target_link_libraries(quickstep_queryoptimizer_rules
+ quickstep_queryoptimizer_rules_AttachBloomFilters
quickstep_queryoptimizer_rules_BottomUpRule
quickstep_queryoptimizer_rules_CollapseProject
quickstep_queryoptimizer_rules_GenerateJoins
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
index 9770606..cfbb5d1 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
@@ -31,6 +31,7 @@
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/physical/PhysicalType.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "utility/DisjointTreeForest.hpp"
#include "glog/logging.h"
@@ -72,6 +73,9 @@ P::PhysicalPtr StarSchemaHashJoinOrderOptimization::applyInternal(const P::Physi
JoinGroupInfo *join_group = nullptr;
if (parent_join_group == nullptr || !is_valid_cascading_hash_join) {
new_join_group.reset(new JoinGroupInfo());
+ for (const auto &attr : input->getReferencedAttributes()) {
+ new_join_group->referenced_attributes.emplace(attr->id());
+ }
join_group = new_join_group.get();
} else {
join_group = parent_join_group;
@@ -144,7 +148,10 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
i,
tables[i],
cost_model_->estimateCardinality(tables[i]),
- cost_model_->estimateSelectivity(tables[i]));
+ cost_model_->estimateSelectivity(tables[i]),
+ CountSharedAttributes(join_group.referenced_attributes,
+ tables[i]->getOutputAttributes()),
+ tables[i]->getPhysicalType() == physical::PhysicalType::kAggregate);
}
// Auxiliary mapping info.
@@ -161,9 +168,19 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
}
}
- // Create a join graph where tables are vertices, and add an edge between vertices
- // t1 and t2 for each join predicate t1.x = t2.y
- std::vector<std::unordered_set<std::size_t>> join_graph(table_info_storage.size());
+ std::set<TableInfo*> remaining_tables;
+ for (auto &table_info : table_info_storage) {
+ remaining_tables.emplace(&table_info);
+ }
+
+ DisjointTreeForest<E::ExprId> join_attribute_forest;
+ for (const auto &attr_id_pair : join_group.join_attribute_pairs) {
+ join_attribute_forest.makeSet(attr_id_pair.first);
+ join_attribute_forest.makeSet(attr_id_pair.second);
+ join_attribute_forest.merge(attr_id_pair.first, attr_id_pair.second);
+ }
+
+ std::map<std::size_t, std::map<std::size_t, E::ExprId>> join_attribute_groups;
for (const auto &attr_id_pair : join_group.join_attribute_pairs) {
DCHECK(attribute_id_to_table_info_index_map.find(attr_id_pair.first)
!= attribute_id_to_table_info_index_map.end());
@@ -176,128 +193,169 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
attribute_id_to_table_info_index_map[attr_id_pair.second];
DCHECK_NE(first_table_idx, second_table_idx);
- table_info_storage[first_table_idx].join_attribute_pairs.emplace(
- attr_id_pair.first, attr_id_pair.second);
- table_info_storage[second_table_idx].join_attribute_pairs.emplace(
- attr_id_pair.second, attr_id_pair.first);
-
- join_graph[first_table_idx].emplace(second_table_idx);
- join_graph[second_table_idx].emplace(first_table_idx);
- }
-
- std::set<TableInfo*, TableInfoPtrLessComparator> table_info_ordered_by_priority;
- for (std::size_t i = 0; i < table_info_storage.size(); ++i) {
- table_info_ordered_by_priority.emplace(&table_info_storage[i]);
+ DCHECK_EQ(join_attribute_forest.find(attr_id_pair.first),
+ join_attribute_forest.find(attr_id_pair.second));
+ const std::size_t attr_group_id = join_attribute_forest.find(attr_id_pair.first);
+ auto &attr_group = join_attribute_groups[attr_group_id];
+ attr_group.emplace(first_table_idx, attr_id_pair.first);
+ attr_group.emplace(second_table_idx, attr_id_pair.second);
}
- // Contruct hash join tree.
while (true) {
- TableInfo *first_table_info = *table_info_ordered_by_priority.begin();
- table_info_ordered_by_priority.erase(
- table_info_ordered_by_priority.begin());
- const std::size_t first_table_info_id = first_table_info->table_info_id;
-
- TableInfo *second_table_info = nullptr;
- std::set<TableInfo*, TableInfoPtrLessComparator>::iterator second_table_info_it;
- for (auto candidate_table_info_it = table_info_ordered_by_priority.begin();
- candidate_table_info_it != table_info_ordered_by_priority.end();
- ++candidate_table_info_it) {
- TableInfo *candidate_table_info = *candidate_table_info_it;
- const std::size_t candidate_table_info_id = candidate_table_info->table_info_id;
-
- if (join_graph[first_table_info_id].find(candidate_table_info_id)
- == join_graph[first_table_info_id].end() &&
- join_graph[candidate_table_info_id].find(first_table_info_id)
- == join_graph[candidate_table_info_id].end()) {
- continue;
- } else if (second_table_info == nullptr) {
- second_table_info = candidate_table_info;
- second_table_info_it = candidate_table_info_it;
- }
-
- bool is_likely_many_to_many_join = false;
- for (const auto join_attr_pair : first_table_info->join_attribute_pairs) {
- if (candidate_table_info->joined_attribute_set.find(join_attr_pair.second)
- != candidate_table_info->joined_attribute_set.end()) {
- is_likely_many_to_many_join = true;
- break;
- }
- }
- for (const auto join_attr_pair : candidate_table_info->join_attribute_pairs) {
- if (first_table_info->joined_attribute_set.find(join_attr_pair.second)
- != first_table_info->joined_attribute_set.end()) {
- is_likely_many_to_many_join = true;
- break;
+ // TODO(jianqiao): design better data structure to improve efficiency here.
+ std::unique_ptr<JoinPair> best_join = nullptr;
+ for (TableInfo *probe_table_info : remaining_tables) {
+ for (TableInfo *build_table_info : remaining_tables) {
+ if (probe_table_info != build_table_info) {
+ std::vector<E::AttributeReferencePtr> build_attrs;
+ const std::size_t probe_table_id = probe_table_info->table_info_id;
+ const std::size_t build_table_id = build_table_info->table_info_id;
+ for (const auto &attr_group_pair : join_attribute_groups) {
+ const auto &attr_group = attr_group_pair.second;
+ auto probe_it = attr_group.find(probe_table_id);
+ auto build_it = attr_group.find(build_table_id);
+ if (probe_it != attr_group.end() && build_it != attr_group.end()) {
+ build_attrs.emplace_back(
+ attribute_id_to_reference_map.at(build_it->second));
+ }
+ }
+ if (!build_attrs.empty()
+ && build_table_info->table->impliesUniqueAttributes(build_attrs)) {
+ std::unique_ptr<JoinPair> new_join(
+ new JoinPair(probe_table_info, build_table_info));
+ if (best_join == nullptr || new_join->isBetterThan(*best_join)) {
+// if (best_join != nullptr) {
+// std::cerr << "(" << best_join->probe->estimated_selectivity
+// << ", " << best_join->probe->estimated_cardinality << ")"
+// << " -- "
+// << "(" << best_join->build->estimated_selectivity
+// << ", " << best_join->build->estimated_cardinality << ")"
+// << "\n";
+// std::cerr << "REPLACED WITH\n";
+// }
+// std::cerr << "(" << new_join->probe->estimated_selectivity
+// << ", " << new_join->probe->estimated_cardinality << ")"
+// << " -- "
+// << "(" << new_join->build->estimated_selectivity
+// << ", " << new_join->build->estimated_cardinality << ")"
+// << "\n****\n";
+ best_join.reset(new_join.release());
+ }
+ }
}
}
- if (!is_likely_many_to_many_join) {
- second_table_info = candidate_table_info;
- second_table_info_it = candidate_table_info_it;
- break;
- }
}
- DCHECK(second_table_info != nullptr);
- table_info_ordered_by_priority.erase(second_table_info_it);
- const P::PhysicalPtr &left_child = first_table_info->table;
- const P::PhysicalPtr &right_child = second_table_info->table;
+ TableInfo *selected_probe_table_info = nullptr;
+ TableInfo *selected_build_table_info = nullptr;
+
+ if (best_join != nullptr) {
+ selected_probe_table_info = best_join->probe;
+ selected_build_table_info = best_join->build;
+ }
+
+ // TODO(jianqiao): Handle the case when there is no primary key-foreign key information available.
+ CHECK(selected_probe_table_info != nullptr);
+ CHECK(selected_build_table_info != nullptr);
+
+// std::cerr << selected_probe_table_info->estimated_selectivity
+// << " -- "
+// << selected_build_table_info->estimated_selectivity
+// << "\n";
+
+// std::cerr << selected_probe_table_info->estimated_num_output_attributes
+// << " -- "
+// << selected_build_table_info->estimated_num_output_attributes
+// << "\n";
+
+ remaining_tables.erase(selected_probe_table_info);
+ remaining_tables.erase(selected_build_table_info);
+
+ const P::PhysicalPtr &probe_child = selected_probe_table_info->table;
+ const P::PhysicalPtr &build_child = selected_build_table_info->table;
std::vector<E::NamedExpressionPtr> output_attributes;
- for (const E::AttributeReferencePtr &left_attr : left_child->getOutputAttributes()) {
- output_attributes.emplace_back(left_attr);
+ for (const E::AttributeReferencePtr &probe_attr : probe_child->getOutputAttributes()) {
+ output_attributes.emplace_back(probe_attr);
}
- for (const E::AttributeReferencePtr &right_attr : right_child->getOutputAttributes()) {
- output_attributes.emplace_back(right_attr);
+ for (const E::AttributeReferencePtr &build_attr : build_child->getOutputAttributes()) {
+ output_attributes.emplace_back(build_attr);
}
- std::vector<E::AttributeReferencePtr> left_join_attributes;
- std::vector<E::AttributeReferencePtr> right_join_attributes;
- std::unordered_set<expressions::ExprId> new_joined_attribute_set;
- for (const auto &join_attr_pair : first_table_info->join_attribute_pairs) {
- if (second_table_info->join_attribute_pairs.find(join_attr_pair.second)
- != second_table_info->join_attribute_pairs.end()) {
- left_join_attributes.emplace_back(
- attribute_id_to_reference_map[join_attr_pair.first]);
- right_join_attributes.emplace_back(
- attribute_id_to_reference_map[join_attr_pair.second]);
-
- new_joined_attribute_set.emplace(join_attr_pair.first);
- new_joined_attribute_set.emplace(join_attr_pair.second);
+ std::vector<E::AttributeReferencePtr> probe_attributes;
+ std::vector<E::AttributeReferencePtr> build_attributes;
+ const std::size_t probe_table_id = selected_probe_table_info->table_info_id;
+ const std::size_t build_table_id = selected_build_table_info->table_info_id;
+ for (const auto &attr_group_pair : join_attribute_groups) {
+ const auto &attr_group = attr_group_pair.second;
+ auto probe_it = attr_group.find(probe_table_id);
+ auto build_it = attr_group.find(build_table_id);
+ if (probe_it != attr_group.end() && build_it != attr_group.end()) {
+ probe_attributes.emplace_back(
+ attribute_id_to_reference_map.at(probe_it->second));
+ build_attributes.emplace_back(
+ attribute_id_to_reference_map.at(build_it->second));
}
}
- DCHECK_GE(left_join_attributes.size(), static_cast<std::size_t>(1));
- if (table_info_ordered_by_priority.size() > 0) {
+ if (remaining_tables.size() > 0) {
P::PhysicalPtr output =
- P::HashJoin::Create(left_child,
- right_child,
- left_join_attributes,
- right_join_attributes,
+ P::HashJoin::Create(probe_child,
+ build_child,
+ probe_attributes,
+ build_attributes,
nullptr,
output_attributes,
P::HashJoin::JoinType::kInnerJoin);
- second_table_info->table = output;
+// P::PhysicalPtr output;
+// if (selected_build_table_info->estimated_num_output_attributes >= 4 &&
+// selected_probe_table_info->estimated_num_output_attributes < 4) {
+// output = P::HashJoin::Create(build_child,
+// probe_child,
+// build_attributes,
+// probe_attributes,
+// nullptr,
+// output_attributes,
+// P::HashJoin::JoinType::kInnerJoin);
+// } else {
+// output = P::HashJoin::Create(probe_child,
+// build_child,
+// probe_attributes,
+// build_attributes,
+// nullptr,
+// output_attributes,
+// P::HashJoin::JoinType::kInnerJoin);
+// }
+
+ selected_probe_table_info->table = output;
// TODO(jianqiao): Cache the estimated cardinality for each plan in cost
// model to avoid duplicated estimation.
- second_table_info->estimated_cardinality = cost_model_->estimateCardinality(output);
-
- second_table_info->join_attribute_pairs.insert(first_table_info->join_attribute_pairs.begin(),
- first_table_info->join_attribute_pairs.end());
- second_table_info->joined_attribute_set.insert(first_table_info->joined_attribute_set.begin(),
- first_table_info->joined_attribute_set.end());
- second_table_info->joined_attribute_set.insert(new_joined_attribute_set.begin(),
- new_joined_attribute_set.end());
- table_info_ordered_by_priority.emplace(second_table_info);
-
- join_graph[second_table_info->table_info_id].insert(join_graph[first_table_info_id].begin(),
- join_graph[first_table_info_id].end());
-
+ selected_probe_table_info->estimated_cardinality = cost_model_->estimateCardinality(output);
+ selected_probe_table_info->estimated_selectivity = cost_model_->estimateSelectivity(output);
+
+ selected_probe_table_info->estimated_num_output_attributes =
+ CountSharedAttributes(join_group.referenced_attributes,
+ output->getOutputAttributes());
+ selected_probe_table_info->is_aggregation = false;
+
+ remaining_tables.emplace(selected_probe_table_info);
+
+ // Update join attribute groups.
+ for (auto &attr_group_pair : join_attribute_groups) {
+ auto &attr_group = attr_group_pair.second;
+ auto build_it = attr_group.find(build_table_id);
+ if (build_it != attr_group.end()) {
+ const E::ExprId attr_id = build_it->second;
+ attr_group.erase(build_it);
+ attr_group.emplace(probe_table_id, attr_id);
+ }
+ }
} else {
- return P::HashJoin::Create(left_child,
- right_child,
- left_join_attributes,
- right_join_attributes,
+ return P::HashJoin::Create(probe_child,
+ build_child,
+ probe_attributes,
+ build_attributes,
residual_predicate,
project_expressions,
P::HashJoin::JoinType::kInnerJoin);
@@ -305,5 +363,18 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
}
}
+std::size_t StarSchemaHashJoinOrderOptimization::CountSharedAttributes(
+ const std::unordered_set<expressions::ExprId> &attr_set1,
+ const std::vector<expressions::AttributeReferencePtr> &attr_set2) {
+ std::size_t cnt = 0;
+ for (const auto &attr : attr_set2) {
+ if (attr_set1.find(attr->id()) != attr_set1.end()) {
+ ++cnt;
+ }
+ }
+ return cnt;
+}
+
+
} // namespace optimizer
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
index deddffd..33d95a5 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
@@ -62,6 +62,7 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
* @brief A group of tables to form a hash join tree.
*/
struct JoinGroupInfo {
+ std::unordered_set<expressions::ExprId> referenced_attributes;
std::vector<physical::PhysicalPtr> tables;
std::vector<std::pair<expressions::ExprId, expressions::ExprId>> join_attribute_pairs;
};
@@ -70,49 +71,84 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
* @brief Auxiliary information of a table for the optimizer.
*/
struct TableInfo {
- TableInfo(const std::size_t in_table_info_id,
- const physical::PhysicalPtr &in_table,
- const std::size_t in_estimated_cardinality,
- const double in_estimated_selectivity)
- : table_info_id(in_table_info_id),
- table(in_table),
- estimated_cardinality(in_estimated_cardinality),
- estimated_selectivity(in_estimated_selectivity) {
+ TableInfo(const std::size_t table_info_id_in,
+ const physical::PhysicalPtr &table_in,
+ const std::size_t estimated_cardinality_in,
+ const double estimated_selectivity_in,
+ const std::size_t estimated_num_output_attributes_in,
+ const bool is_aggregation_in)
+ : table_info_id(table_info_id_in),
+ table(table_in),
+ estimated_cardinality(estimated_cardinality_in),
+ estimated_selectivity(estimated_selectivity_in),
+ estimated_num_output_attributes(estimated_num_output_attributes_in),
+ is_aggregation(is_aggregation_in) {
}
const std::size_t table_info_id;
physical::PhysicalPtr table;
std::size_t estimated_cardinality;
double estimated_selectivity;
- std::unordered_multimap<expressions::ExprId, expressions::ExprId> join_attribute_pairs;
- std::unordered_set<expressions::ExprId> joined_attribute_set;
+ std::size_t estimated_num_output_attributes;
+ bool is_aggregation;
};
- /**
- * @brief Comparator that compares the join priorities between two tables.
- */
- struct TableInfoPtrLessComparator {
- inline bool operator() (const TableInfo *lhs, const TableInfo *rhs) {
- bool swapped = false;
- if (lhs->estimated_cardinality > rhs->estimated_cardinality) {
- std::swap(lhs, rhs);
- swapped = true;
+ struct JoinPair {
+ JoinPair(TableInfo *probe_in, TableInfo *build_in)
+ : probe(probe_in), build(build_in) {
+ }
+
+ inline bool isBetterThan(const JoinPair &rhs) const {
+ const auto &lhs = *this;
+ const bool lhs_has_large_output =
+ lhs.build->estimated_num_output_attributes
+ + lhs.probe->estimated_num_output_attributes > 5;
+ const bool rhs_has_large_output =
+ rhs.build->estimated_num_output_attributes
+ + rhs.probe->estimated_num_output_attributes > 5;
+ if (lhs_has_large_output || rhs_has_large_output) {
+ if (lhs_has_large_output != rhs_has_large_output) {
+ return rhs_has_large_output;
+ }
+ double lhs_selectivity =
+ lhs.build->estimated_selectivity * lhs.probe->estimated_selectivity;
+ double rhs_selectivity =
+ rhs.build->estimated_selectivity * rhs.probe->estimated_selectivity;
+ if (lhs_selectivity != rhs_selectivity) {
+ return lhs_selectivity < rhs_selectivity;
+ }
}
- if (lhs->estimated_selectivity < rhs->estimated_selectivity) {
- return !swapped;
- } else if (lhs->estimated_cardinality < 1000u &&
- rhs->estimated_cardinality > 10000u &&
- lhs->estimated_selectivity < rhs->estimated_selectivity * 1.5) {
- return !swapped;
- } else if (lhs->estimated_selectivity > rhs->estimated_selectivity) {
- return swapped;
- } else if (lhs->estimated_cardinality != rhs->estimated_cardinality) {
- return !swapped;
+ const bool lhs_has_small_build =
+ !lhs_has_large_output && lhs.build->estimated_cardinality < 0x100;
+ const bool rhs_has_small_build =
+ !rhs_has_large_output && rhs.build->estimated_cardinality < 0x100;
+ if (lhs_has_small_build != rhs_has_small_build) {
+ return lhs_has_small_build;
+ }
+
+ if (lhs.probe->is_aggregation != rhs.probe->is_aggregation) {
+ return lhs.probe->is_aggregation;
+ }
+
+ if (lhs.probe->estimated_cardinality != rhs.probe->estimated_cardinality) {
+ return lhs.probe->estimated_cardinality < rhs.probe->estimated_cardinality;
+ }
+ if (lhs.build->estimated_selectivity != rhs.build->estimated_selectivity) {
+ return lhs.build->estimated_selectivity < rhs.build->estimated_selectivity;
+ }
+ if (lhs.build->estimated_cardinality != rhs.build->estimated_cardinality) {
+ return lhs.build->estimated_cardinality < rhs.build->estimated_cardinality;
+ }
+ if (lhs.probe->table != rhs.probe->table) {
+ return lhs.probe->table < rhs.probe->table;
} else {
- return swapped ^ (lhs->table < rhs->table);
+ return lhs.build->table < rhs.build->table;
}
}
+
+ TableInfo *probe;
+ TableInfo *build;
};
physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input,
@@ -123,6 +159,10 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
const expressions::PredicatePtr &residual_predicate,
const std::vector<expressions::NamedExpressionPtr> &project_expressions);
+ static std::size_t CountSharedAttributes(
+ const std::unordered_set<expressions::ExprId> &attr_set1,
+ const std::vector<expressions::AttributeReferencePtr> &attr_set2);
+
std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
DISALLOW_COPY_AND_ASSIGN(StarSchemaHashJoinOrderOptimization);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionHeuristics_unittest.cpp b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
index 815c13e..ac0adea 100644
--- a/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
+++ b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
@@ -70,7 +70,8 @@ class ExecutionHeuristicsTest : public ::testing::Test {
probe_relation,
std::move(build_attribute_ids),
std::move(probe_attribute_ids),
- join_hash_table_id);
+ join_hash_table_id,
+ build_relation->estimateTupleCardinality());
}
QueryPlan::DAGNodeIndex createDummyBuildHashOperator(QueryPlan *query_plan,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 667df1e..16c0d82 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -59,6 +59,11 @@ using std::vector;
namespace quickstep {
+DEFINE_int64(bloom_adapter_batch_size, 64,
+ "Number of tuples to probe in bulk in Bloom filter adapter.");
+DEFINE_bool(adapt_bloom_filters, true,
+ "Whether to adaptively adjust the ordering of bloom filters.");
+
namespace {
// Functor passed to HashTable::getAllFromValueAccessor() to collect matching
@@ -75,6 +80,11 @@ class MapBasedJoinedTupleCollector {
joined_tuples_[tref.block].emplace_back(tref.tuple, accessor.getCurrentPosition());
}
+ inline void operator()(const tuple_id probe_tid,
+ const TupleReference &build_tref) {
+ joined_tuples_[build_tref.block].emplace_back(build_tref.tuple, probe_tid);
+ }
+
// Get a mutable pointer to the collected map of joined tuple ID pairs. The
// key is inner block_id, values are vectors of joined tuple ID pairs with
// tuple ID from the inner block on the left and the outer block on the
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 235bfe4..cf680f6 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -307,8 +307,9 @@ class HashInnerJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
- : WorkOrder(query_id),
+ StorageManager *storage_manager,
+ const int op_index = -1)
+ : WorkOrder(query_id, op_index),
build_relation_(build_relation),
probe_relation_(probe_relation),
join_key_attributes_(join_key_attributes),
@@ -354,8 +355,9 @@ class HashInnerJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
- : WorkOrder(query_id),
+ StorageManager *storage_manager,
+ const int op_index = -1)
+ : WorkOrder(query_id, op_index),
build_relation_(build_relation),
probe_relation_(probe_relation),
join_key_attributes_(std::move(join_key_attributes)),
@@ -435,8 +437,9 @@ class HashSemiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
- : WorkOrder(query_id),
+ StorageManager *storage_manager,
+ const int op_index = -1)
+ : WorkOrder(query_id, op_index),
build_relation_(build_relation),
probe_relation_(probe_relation),
join_key_attributes_(join_key_attributes),
@@ -482,8 +485,9 @@ class HashSemiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
- : WorkOrder(query_id),
+ StorageManager *storage_manager,
+ const int op_index = -1)
+ : WorkOrder(query_id, op_index),
build_relation_(build_relation),
probe_relation_(probe_relation),
join_key_attributes_(std::move(join_key_attributes)),
@@ -559,8 +563,9 @@ class HashAntiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
- : WorkOrder(query_id),
+ StorageManager *storage_manager,
+ const int op_index = -1)
+ : WorkOrder(query_id, op_index),
build_relation_(build_relation),
probe_relation_(probe_relation),
join_key_attributes_(join_key_attributes),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index df195cc..4eb6b3a 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -299,16 +299,23 @@ class WorkOrder {
return query_id_;
}
+ inline const int getOperatorIndex() const {
+ return op_index_;
+ }
+
protected:
/**
* @brief Constructor.
*
* @param query_id The ID of the query to which this WorkOrder belongs.
**/
- explicit WorkOrder(const std::size_t query_id)
- : query_id_(query_id) {}
+ explicit WorkOrder(const std::size_t query_id,
+ const int op_index = -1)
+ : query_id_(query_id),
+ op_index_(op_index) {}
const std::size_t query_id_;
+ const int op_index_;
// A vector of preferred NUMA node IDs where this workorder should be executed.
// These node IDs typically indicate the NUMA node IDs of the input(s) of the
// workorder. Derived classes should ensure that there are no duplicate entries
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 4878cf1..668164c 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -46,10 +46,13 @@
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "types/containers/Tuple.hpp"
+#include "utility/BloomFilterAdapter.hpp"
#include "glog/logging.h"
@@ -57,6 +60,8 @@ using std::unique_ptr;
namespace quickstep {
+DECLARE_int64(bloom_adapter_batch_size);
+
AggregationOperationState::AggregationOperationState(
const CatalogRelationSchema &input_relation,
const std::vector<const AggregateFunction*> &aggregate_functions,
@@ -64,12 +69,16 @@ AggregationOperationState::AggregationOperationState(
std::vector<bool> &&is_distinct,
std::vector<std::unique_ptr<const Scalar>> &&group_by,
const Predicate *predicate,
+ std::vector<const BloomFilter *> &&bloom_filters,
+ std::vector<attribute_id> &&bloom_filter_attribute_ids,
const std::size_t estimated_num_entries,
const HashTableImplType hash_table_impl_type,
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
StorageManager *storage_manager)
: input_relation_(input_relation),
predicate_(predicate),
+ bloom_filters_(std::move(bloom_filters)),
+ bloom_filter_attribute_ids_(std::move(bloom_filter_attribute_ids)),
group_by_list_(std::move(group_by)),
arguments_(std::move(arguments)),
is_distinct_(std::move(is_distinct)),
@@ -183,7 +192,8 @@ AggregationOperationState::AggregationOperationState(
AggregationOperationState* AggregationOperationState::ReconstructFromProto(
const serialization::AggregationOperationState &proto,
const CatalogDatabaseLite &database,
- StorageManager *storage_manager) {
+ StorageManager *storage_manager,
+ const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
DCHECK(ProtoIsValid(proto, database));
// Rebuild contructor arguments from their representation in 'proto'.
@@ -232,12 +242,24 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
database));
}
+ std::vector<const BloomFilter*> bloom_filter_vector;
+ std::vector<attribute_id> bloom_filter_attribute_ids;
+ for (int i = 0; i < proto.bloom_filters_size(); ++i) {
+ // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
+ const auto bloom_filter_proto = proto.bloom_filters(i);
+ bloom_filter_vector.emplace_back(
+ bloom_filters[bloom_filter_proto.bloom_filter_id()].get());
+ bloom_filter_attribute_ids.emplace_back(bloom_filter_proto.attr_id());
+ }
+
return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
aggregate_functions,
std::move(arguments),
std::move(is_distinct),
std::move(group_by_expressions),
predicate.release(),
+ std::move(bloom_filter_vector),
+ std::move(bloom_filter_attribute_ids),
proto.estimated_num_entries(),
HashTableImplTypeFromProto(proto.hash_table_impl_type()),
distinctify_hash_table_impl_types,
@@ -340,6 +362,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
// tuples so that it can be reused across multiple aggregates (i.e. we only
// pay the cost of evaluating the predicate once).
std::unique_ptr<TupleIdSequence> reuse_matches;
+ if (predicate_) {
+ reuse_matches.reset(block->getMatchesForPredicate(predicate_.get()));
+ }
+
for (std::size_t agg_idx = 0;
agg_idx < handles_.size();
++agg_idx) {
@@ -358,7 +384,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
arguments_[agg_idx],
local_arguments_as_attributes,
{}, /* group_by */
- predicate_.get(),
distinctify_hashtables_[agg_idx].get(),
&reuse_matches,
nullptr /* reuse_group_by_vectors */);
@@ -369,7 +394,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
block->aggregate(*handles_[agg_idx],
arguments_[agg_idx],
local_arguments_as_attributes,
- predicate_.get(),
&reuse_matches));
}
}
@@ -391,6 +415,72 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
// GROUP BY expressions once).
std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
+ if (predicate_) {
+ reuse_matches.reset(block->getMatchesForPredicate(predicate_.get()));
+ }
+
+ if (bloom_filters_.size() > 0) {
+ const std::size_t num_tuples = block->getNumTuples();
+// std::cerr << "Before: " << num_tuples << " -- "
+// << (reuse_matches ? reuse_matches->numTuples() : num_tuples)
+// << "\n";
+ std::unique_ptr<ValueAccessor> accessor;
+ if (reuse_matches) {
+ accessor.reset(
+ block->getTupleStorageSubBlock().createValueAccessor(reuse_matches.get()));
+ } else {
+ accessor.reset(
+ block->getTupleStorageSubBlock().createValueAccessor());
+ }
+ InvokeOnAnyValueAccessor(
+ accessor.get(),
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ std::unique_ptr<TupleIdSequence> filtered(new TupleIdSequence(num_tuples));
+
+ std::vector<std::size_t> attr_size_vector;
+ attr_size_vector.reserve(bloom_filter_attribute_ids_.size());
+ for (const auto &attr : bloom_filter_attribute_ids_) {
+ auto val_and_size =
+ accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, attr);
+ attr_size_vector.emplace_back(val_and_size.second);
+ }
+
+ std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter;
+ bloom_filter_adapter.reset(new BloomFilterAdapter(
+ bloom_filters_, bloom_filter_attribute_ids_, attr_size_vector));
+
+ std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size;
+ std::uint32_t num_tuples_left = accessor->getNumTuples();
+ std::vector<tuple_id> batch(num_tuples_left);
+
+ do {
+ std::uint32_t batch_size =
+ batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
+ for (std::size_t i = 0; i < batch_size; ++i) {
+ accessor->next();
+ batch.push_back(accessor->getCurrentPosition());
+ }
+
+ std::size_t num_hits =
+ bloom_filter_adapter->bulkProbe<true>(accessor, batch, batch_size);
+ for (std::size_t t = 0; t < num_hits; ++t){
+ filtered->set(batch[t], true);
+ }
+
+ batch.clear();
+ num_tuples_left -= batch_size;
+ batch_size_try = batch_size * 2;
+ } while (num_tuples_left > 0);
+
+ if (reuse_matches) {
+ reuse_matches->intersectWith(*filtered);
+ } else {
+ reuse_matches.reset(filtered.release());
+ }
+ });
+// std::cerr << "After: " << reuse_matches->numTuples() << "\n";
+ }
+
for (std::size_t agg_idx = 0;
agg_idx < handles_.size();
++agg_idx) {
@@ -402,7 +492,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
arguments_[agg_idx],
nullptr, /* arguments_as_attributes */
group_by_list_,
- predicate_.get(),
distinctify_hashtables_[agg_idx].get(),
&reuse_matches,
&reuse_group_by_vectors);
@@ -416,7 +505,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
block->aggregateGroupBy(*handles_[agg_idx],
arguments_[agg_idx],
group_by_list_,
- predicate_.get(),
agg_hash_table,
&reuse_matches,
&reuse_group_by_vectors);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 0199749..5db7325 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -33,6 +33,7 @@
#include "storage/HashTableBase.hpp"
#include "storage/HashTablePool.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "utility/BloomFilter.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
@@ -108,6 +109,8 @@ class AggregationOperationState {
std::vector<bool> &&is_distinct,
std::vector<std::unique_ptr<const Scalar>> &&group_by,
const Predicate *predicate,
+ std::vector<const BloomFilter *> &&bloom_filters,
+ std::vector<attribute_id> &&bloom_filter_attribute_ids,
const std::size_t estimated_num_entries,
const HashTableImplType hash_table_impl_type,
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
@@ -131,7 +134,8 @@ class AggregationOperationState {
static AggregationOperationState* ReconstructFromProto(
const serialization::AggregationOperationState &proto,
const CatalogDatabaseLite &database,
- StorageManager *storage_manager);
+ StorageManager *storage_manager,
+ const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters);
/**
* @brief Check whether a serialization::AggregationOperationState is
@@ -181,6 +185,10 @@ class AggregationOperationState {
// filter predicate (if any), and the list of GROUP BY expressions (if any).
const CatalogRelationSchema &input_relation_;
std::unique_ptr<const Predicate> predicate_;
+
+ std::vector<const BloomFilter*> bloom_filters_;
+ std::vector<attribute_id> bloom_filter_attribute_ids_;
+
std::vector<std::unique_ptr<const Scalar>> group_by_list_;
// Each individual aggregate in this operation has an AggregationHandle and
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/AggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index bf78e3a..165148e 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -42,4 +42,10 @@ message AggregationOperationState {
// Each DISTINCT aggregation has its distinctify hash table impl type.
repeated HashTableImplType distinctify_hash_table_impl_types = 7;
+
+ message BloomFilter {
+ required uint32 bloom_filter_id = 1;
+ required uint32 attr_id = 2;
+ }
+ repeated BloomFilter bloom_filters = 8;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/BasicColumnStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/BasicColumnStoreValueAccessor.hpp b/storage/BasicColumnStoreValueAccessor.hpp
index 759e187..7907fd5 100644
--- a/storage/BasicColumnStoreValueAccessor.hpp
+++ b/storage/BasicColumnStoreValueAccessor.hpp
@@ -18,6 +18,8 @@
#ifndef QUICKSTEP_STORAGE_BASIC_COLUMN_STORE_VALUE_ACCESSOR_HPP_
#define QUICKSTEP_STORAGE_BASIC_COLUMN_STORE_VALUE_ACCESSOR_HPP_
+#include <cstddef>
+#include <utility>
#include <vector>
#include "catalog/CatalogRelationSchema.hpp"
@@ -43,7 +45,8 @@ class BasicColumnStoreValueAccessorHelper {
: relation_(relation),
num_tuples_(num_tuples),
column_stripes_(column_stripes),
- column_null_bitmaps_(column_null_bitmaps) {
+ column_null_bitmaps_(column_null_bitmaps),
+ attr_max_lengths_(relation.getMaximumAttributeByteLengths()) {
}
inline tuple_id numPackedTuples() const {
@@ -61,9 +64,23 @@ class BasicColumnStoreValueAccessorHelper {
return nullptr;
}
- // TODO(chasseur): Consider cacheing the byte lengths of attributes.
- return static_cast<const char*>(column_stripes_[attr])
- + (tuple * relation_.getAttributeById(attr)->getType().maximumByteLength());
+ return static_cast<const char*>(column_stripes_[attr]) + (tuple * attr_max_lengths_[attr]);
+ }
+
+ template <bool check_null>
+ inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple,
+ const attribute_id attr) const {
+ DEBUG_ASSERT(tuple < num_tuples_);
+ DEBUG_ASSERT(relation_.hasAttributeWithId(attr));
+ if (check_null
+ && (!column_null_bitmaps_.elementIsNull(attr))
+ && column_null_bitmaps_[attr].getBit(tuple)) {
+ return std::make_pair(nullptr, 0);
+ }
+
+ const std::size_t attr_length = attr_max_lengths_[attr];
+ return std::make_pair(static_cast<const char*>(column_stripes_[attr]) + (tuple * attr_length),
+ attr_length);
}
inline TypedValue getAttributeValueTyped(const tuple_id tuple,
@@ -80,6 +97,7 @@ class BasicColumnStoreValueAccessorHelper {
const tuple_id num_tuples_;
const std::vector<void*> &column_stripes_;
const PtrVector<BitVector<false>, true> &column_null_bitmaps_;
+ const std::vector<std::size_t> &attr_max_lengths_;
DISALLOW_COPY_AND_ASSIGN(BasicColumnStoreValueAccessorHelper);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/BloomFilterIndexSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/BloomFilterIndexSubBlock.cpp b/storage/BloomFilterIndexSubBlock.cpp
index e806217..a40f69f 100644
--- a/storage/BloomFilterIndexSubBlock.cpp
+++ b/storage/BloomFilterIndexSubBlock.cpp
@@ -55,7 +55,6 @@ BloomFilterIndexSubBlock::BloomFilterIndexSubBlock(const TupleStorageSubBlock &t
sub_block_memory_size),
is_initialized_(false),
is_consistent_(false),
- random_seed_(kBloomFilterSeed),
bit_array_size_in_bytes_(description.GetExtension(
BloomFilterIndexSubBlockDescription::bloom_filter_size)) {
CHECK(DescriptionIsValid(relation_, description_))
@@ -74,8 +73,7 @@ BloomFilterIndexSubBlock::BloomFilterIndexSubBlock(const TupleStorageSubBlock &t
const std::uint32_t salt_count = description.GetExtension(BloomFilterIndexSubBlockDescription::number_of_hashes);
// Initialize the bloom_filter_ data structure to operate on bit_array.
- bloom_filter_.reset(new BloomFilter(random_seed_,
- salt_count,
+ bloom_filter_.reset(new BloomFilter(salt_count,
bit_array_size_in_bytes_,
bit_array_.get(),
is_bloom_filter_initialized));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/BloomFilterIndexSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/BloomFilterIndexSubBlock.hpp b/storage/BloomFilterIndexSubBlock.hpp
index 4925673..8c81156 100644
--- a/storage/BloomFilterIndexSubBlock.hpp
+++ b/storage/BloomFilterIndexSubBlock.hpp
@@ -65,11 +65,6 @@ class BloomFilterIndexSubBlock : public IndexSubBlock {
kSelectivityNone
};
- /**
- * @brief A random seed to initialize the bloom filter hash functions.
- **/
- static const std::uint64_t kBloomFilterSeed = 0xA5A5A5A55A5A5A5AULL;
-
BloomFilterIndexSubBlock(const TupleStorageSubBlock &tuple_store,
const IndexSubBlockDescription &description,
const bool new_block,
@@ -179,7 +174,6 @@ class BloomFilterIndexSubBlock : public IndexSubBlock {
private:
bool is_initialized_;
bool is_consistent_;
- const std::uint64_t random_seed_;
const std::uint64_t bit_array_size_in_bytes_;
std::vector<attribute_id> indexed_attribute_ids_;
std::unique_ptr<unsigned char> bit_array_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 582effd..777a888 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -677,6 +677,8 @@ target_link_libraries(quickstep_storage_HashTable
quickstep_types_Type
quickstep_types_TypedValue
quickstep_utility_BloomFilter
+ quickstep_utility_BloomFilterAdapter
+ quickstep_utility_EventProfiler
quickstep_utility_HashPair
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_HashTableBase
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/CompressedColumnStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/CompressedColumnStoreValueAccessor.hpp b/storage/CompressedColumnStoreValueAccessor.hpp
index 64eb315..984dea3 100644
--- a/storage/CompressedColumnStoreValueAccessor.hpp
+++ b/storage/CompressedColumnStoreValueAccessor.hpp
@@ -52,6 +52,7 @@ class CompressedColumnStoreValueAccessorHelper {
const PtrVector<BitVector<false>, true> &uncompressed_column_null_bitmaps)
: relation_(relation),
num_tuples_(num_tuples),
+ attr_max_lengths_(relation.getMaximumAttributeByteLengths()),
compression_info_(compression_info),
dictionary_coded_attributes_(dictionary_coded_attributes),
truncated_attributes_(truncated_attributes),
@@ -84,6 +85,26 @@ class CompressedColumnStoreValueAccessorHelper {
}
}
+ template <bool check_null>
+ inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple,
+ const attribute_id attr) const {
+ if (dictionary_coded_attributes_[attr]) {
+ return dictionaries_.atUnchecked(attr).getUntypedValueAndByteLengthForCode<check_null>(
+ getCode(tuple, attr));
+ } else if (truncated_attributes_[attr]) {
+ if (truncated_attribute_is_int_[attr]) {
+ int_buffer_ = getCode(tuple, attr);
+ return std::make_pair(&int_buffer_, sizeof(int_buffer_));
+ } else {
+ long_buffer_ = getCode(tuple, attr);
+ return std::make_pair(&long_buffer_, sizeof(long_buffer_));
+ }
+ } else {
+ return std::make_pair(getAttributePtr<check_null>(tuple, attr),
+ attr_max_lengths_[attr]);
+ }
+ }
+
inline TypedValue getAttributeValueTyped(const tuple_id tuple,
const attribute_id attr) const {
if (dictionary_coded_attributes_[attr]) {
@@ -138,6 +159,7 @@ class CompressedColumnStoreValueAccessorHelper {
const CatalogRelationSchema &relation_;
const tuple_id num_tuples_;
+ const std::vector<std::size_t> &attr_max_lengths_;
const CompressedBlockInfo &compression_info_;
const std::vector<bool> &dictionary_coded_attributes_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/CompressedPackedRowStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/CompressedPackedRowStoreValueAccessor.hpp b/storage/CompressedPackedRowStoreValueAccessor.hpp
index 024b0ec..7058aec 100644
--- a/storage/CompressedPackedRowStoreValueAccessor.hpp
+++ b/storage/CompressedPackedRowStoreValueAccessor.hpp
@@ -58,6 +58,7 @@ class CompressedPackedRowStoreValueAccessorHelper {
num_tuples_(num_tuples),
tuple_length_bytes_(tuple_length_bytes),
attribute_offsets_(attribute_offsets),
+ attr_max_lengths_(relation.getMaximumAttributeByteLengths()),
compression_info_(compression_info),
dictionary_coded_attributes_(dictionary_coded_attributes),
truncated_attributes_(truncated_attributes),
@@ -92,6 +93,26 @@ class CompressedPackedRowStoreValueAccessorHelper {
}
}
+ template <bool check_null>
+ inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple,
+ const attribute_id attr) const {
+ if (dictionary_coded_attributes_[attr]) {
+ return dictionaries_.atUnchecked(attr).getUntypedValueAndByteLengthForCode<check_null>(
+ getCode(tuple, attr));
+ } else if (truncated_attributes_[attr]) {
+ if (truncated_attribute_is_int_[attr]) {
+ int_buffer_ = getCode(tuple, attr);
+ return std::make_pair(&int_buffer_, sizeof(int_buffer_));
+ } else {
+ long_buffer_ = getCode(tuple, attr);
+ return std::make_pair(&long_buffer_, sizeof(long_buffer_));
+ }
+ } else {
+ return std::make_pair(getAttributePtr<check_null>(tuple, attr),
+ attr_max_lengths_[attr]);
+ }
+ }
+
inline TypedValue getAttributeValueTyped(const tuple_id tuple,
const attribute_id attr) const {
if (dictionary_coded_attributes_[attr]) {
@@ -150,6 +171,7 @@ class CompressedPackedRowStoreValueAccessorHelper {
const tuple_id num_tuples_;
const std::size_t tuple_length_bytes_;
const std::vector<std::size_t> &attribute_offsets_;
+ const std::vector<std::size_t> &attr_max_lengths_;
const CompressedBlockInfo &compression_info_;
const std::vector<bool> &dictionary_coded_attributes_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index be31fd9..6e3dc96 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -23,6 +23,7 @@
#include <atomic>
#include <cstddef>
#include <cstdlib>
+#include <memory>
#include <type_traits>
#include <vector>
@@ -39,11 +40,14 @@
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "utility/BloomFilter.hpp"
+#include "utility/BloomFilterAdapter.hpp"
#include "utility/HashPair.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
+DECLARE_int64(bloom_adapter_batch_size);
+
/** \addtogroup Storage
* @{
*/
@@ -1016,8 +1020,12 @@ class HashTable : public HashTableBase<resizable,
*
* @param bloom_filter The pointer to the bloom filter.
**/
- inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) {
- build_bloom_filter_ = bloom_filter;
+ inline void addBuildSideBloomFilter(BloomFilter *bloom_filter) {
+ build_bloom_filters_.emplace_back(bloom_filter);
+ }
+
+ inline void addBuildSideAttributeId(const attribute_id build_attribute_id) {
+ build_attribute_ids_.push_back(build_attribute_id);
}
/**
@@ -1042,8 +1050,8 @@ class HashTable : public HashTableBase<resizable,
* @param probe_attribute_ids The vector of attribute ids to use for probing
* the bloom filter.
**/
- inline void addProbeSideAttributeIds(std::vector<attribute_id> &&probe_attribute_ids) {
- probe_attribute_ids_.push_back(probe_attribute_ids);
+ inline void addProbeSideAttributeId(const attribute_id probe_attribute_id) {
+ probe_attribute_ids_.push_back(probe_attribute_id);
}
protected:
@@ -1329,9 +1337,10 @@ class HashTable : public HashTableBase<resizable,
// Data structures used for bloom filter optimized semi-joins.
bool has_build_side_bloom_filter_ = false;
bool has_probe_side_bloom_filter_ = false;
- BloomFilter *build_bloom_filter_;
+ std::vector<BloomFilter *> build_bloom_filters_;
+ std::vector<attribute_id> build_attribute_ids_;
std::vector<const BloomFilter*> probe_bloom_filters_;
- std::vector<std::vector<attribute_id>> probe_attribute_ids_;
+ std::vector<attribute_id> probe_attribute_ids_;
DISALLOW_COPY_AND_ASSIGN(HashTable);
};
@@ -1477,12 +1486,26 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
&prealloc_state);
}
}
- std::unique_ptr<BloomFilter> thread_local_bloom_filter;
+
if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter.reset(new BloomFilter(build_bloom_filter_->getRandomSeed(),
- build_bloom_filter_->getNumberOfHashes(),
- build_bloom_filter_->getBitArraySize()));
+ for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) {
+ auto *build_bloom_filter = build_bloom_filters_[i];
+ std::unique_ptr<BloomFilter> thread_local_bloom_filter(
+ new BloomFilter(build_bloom_filter->getNumberOfHashes(),
+ build_bloom_filter->getBitArraySize()));
+ const auto &build_attr = build_attribute_ids_[i];
+ const std::size_t attr_size =
+ accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, build_attr).second;
+ while (accessor->next()) {
+ thread_local_bloom_filter->insertUnSafe(
+ static_cast<const std::uint8_t *>(accessor->getUntypedValue(build_attr)),
+ attr_size);
+ }
+ build_bloom_filter->bitwiseOr(thread_local_bloom_filter.get());
+ accessor->beginIteration();
+ }
}
+
if (resizable) {
while (result == HashTablePutResult::kOutOfSpace) {
{
@@ -1498,11 +1521,6 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
variable_size,
(*functor)(*accessor),
using_prealloc ? &prealloc_state : nullptr);
- // Insert into bloom filter, if enabled.
- if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
- key.getDataSize());
- }
if (result == HashTablePutResult::kDuplicateKey) {
DEBUG_ASSERT(!using_prealloc);
return result;
@@ -1528,20 +1546,11 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
variable_size,
(*functor)(*accessor),
using_prealloc ? &prealloc_state : nullptr);
- // Insert into bloom filter, if enabled.
- if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
- key.getDataSize());
- }
if (result != HashTablePutResult::kOK) {
return result;
}
}
}
- // Update the build side bloom filter with thread local copy, if available.
- if (has_build_side_bloom_filter_) {
- build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get());
- }
return HashTablePutResult::kOK;
});
@@ -1607,6 +1616,26 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
&prealloc_state);
}
}
+
+ if (has_build_side_bloom_filter_) {
+ for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) {
+ auto *build_bloom_filter = build_bloom_filters_[i];
+ std::unique_ptr<BloomFilter> thread_local_bloom_filter(
+ new BloomFilter(build_bloom_filter->getNumberOfHashes(),
+ build_bloom_filter->getBitArraySize()));
+ const auto &build_attr = build_attribute_ids_[i];
+ const std::size_t attr_size =
+ accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, build_attr).second;
+ while (accessor->next()) {
+ thread_local_bloom_filter->insertUnSafe(
+ static_cast<const std::uint8_t *>(accessor->getUntypedValue(build_attr)),
+ attr_size);
+ }
+ build_bloom_filter->bitwiseOr(thread_local_bloom_filter.get());
+ accessor->beginIteration();
+ }
+ }
+
if (resizable) {
while (result == HashTablePutResult::kOutOfSpace) {
{
@@ -2229,6 +2258,7 @@ inline std::size_t HashTable<ValueT, resizable, serializable, force_key_copy, al
}
}
+
template <typename ValueT,
bool resizable,
bool serializable,
@@ -2246,42 +2276,85 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
InvokeOnAnyValueAccessor(
accessor,
[&](auto *accessor) -> void { // NOLINT(build/c++11)
- while (accessor->next()) {
- // Probe any bloom filters, if enabled.
- if (has_probe_side_bloom_filter_) {
- DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size());
- // Check if the key is contained in the BloomFilters or not.
- bool bloom_miss = false;
- for (std::size_t i = 0; i < probe_bloom_filters_.size() && !bloom_miss; ++i) {
- const BloomFilter *bloom_filter = probe_bloom_filters_[i];
- for (const attribute_id &attr_id : probe_attribute_ids_[i]) {
- TypedValue bloom_key = accessor->getTypedValue(attr_id);
- if (!bloom_filter->contains(static_cast<const std::uint8_t*>(bloom_key.getDataPtr()),
- bloom_key.getDataSize())) {
- bloom_miss = true;
+ std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter;
+ if (has_probe_side_bloom_filter_) {
+ // Find (and cache) the size of each attribute in the probe lists.
+ // NOTE(nav): This code uses the accessor to get the size,
+ // and hence only works if there's at least one tuple.
+ std::vector<std::size_t> attr_size_vector;
+ attr_size_vector.reserve(probe_attribute_ids_.size());
+ for (const auto &probe_attr : probe_attribute_ids_) {
+ auto val_and_size =
+ accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, probe_attr);
+// std::cerr << "BF attr size = " << val_and_size.second << "\n";
+ attr_size_vector.emplace_back(val_and_size.second);
+ }
+
+ bloom_filter_adapter.reset(new BloomFilterAdapter(
+ probe_bloom_filters_, probe_attribute_ids_, attr_size_vector));
+
+ // We want to have large batch sizes for cache efficiency while probeing,
+ // but small batch sizes to ensure that the adaptation logic kicks in
+ // (and does early). We use exponentially increasing batch sizes to
+ // achieve a balance between the two.
+ //
+ // We also keep track of num_tuples_left in the block, to ensure that
+ // we don't reserve an unnecessarily large vector.
+ std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size;
+ std::uint32_t num_tuples_left = accessor->getNumTuples();
+ std::vector<tuple_id> batch(num_tuples_left);
+
+ do {
+ std::uint32_t batch_size =
+ batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
+ for (std::size_t i = 0; i < batch_size; ++i) {
+ accessor->next();
+ batch.push_back(accessor->getCurrentPosition());
+ }
+
+ std::size_t num_hits =
+ bloom_filter_adapter->bulkProbe<true>(accessor, batch, batch_size);
+
+ for (std::size_t t = 0; t < num_hits; ++t){
+ tuple_id probe_tid = batch[t];
+ TypedValue key = accessor->getTypedValueAtAbsolutePosition(key_attr_id, probe_tid);
+ if (check_for_null_keys && key.isNull()) {
+ continue;
+ }
+ const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
+ : key.getHash();
+ const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
+ : true_hash;
+ std::size_t entry_num = 0;
+ const ValueT *value;
+ while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
+ (*functor)(probe_tid, *value);
+ if (!allow_duplicate_keys)
break;
- }
}
}
- if (bloom_miss) {
- continue; // On a bloom filter miss, probing the hash table can be skipped.
- }
- }
+ batch.clear();
+ num_tuples_left -= batch_size;
+ batch_size_try = batch_size * 2;
+ } while (!accessor->iterationFinished());
+ }
- TypedValue key = accessor->getTypedValue(key_attr_id);
- if (check_for_null_keys && key.isNull()) {
- continue;
- }
- const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
- : key.getHash();
- const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
- : true_hash;
- std::size_t entry_num = 0;
- const ValueT *value;
- while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
- (*functor)(*accessor, *value);
- if (!allow_duplicate_keys) {
- break;
+ else { // no Bloom filters to probe
+ while(accessor->next()) {
+ TypedValue key = accessor->getTypedValue(key_attr_id);
+ if (check_for_null_keys && key.isNull()) {
+ continue;
+ }
+ const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
+ : key.getHash();
+ const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
+ : true_hash;
+ std::size_t entry_num = 0;
+ const ValueT *value;
+ while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
+ (*functor)(*accessor, *value);
+ if (!allow_duplicate_keys)
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index 7f00f29..6eabf60 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -34,10 +34,10 @@ message HashTable {
required HashTableImplType hash_table_impl_type = 1;
repeated Type key_types = 2;
required uint64 estimated_num_entries = 3;
- repeated uint32 build_side_bloom_filter_id = 4;
- message ProbeSideBloomFilter {
- required uint32 probe_side_bloom_filter_id = 1;
- repeated uint32 probe_side_attr_ids = 2;
+ message BloomFilter {
+ required uint32 bloom_filter_id = 1;
+ required uint32 attr_id = 2;
}
- repeated ProbeSideBloomFilter probe_side_bloom_filters = 6;
+ repeated BloomFilter probe_side_bloom_filters = 4;
+ repeated BloomFilter build_side_bloom_filters = 5;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/43ed533b/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 34baaeb..fbb3d41 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -318,9 +318,15 @@ class HashTableFactory {
// individual implementations of the hash table constructors.
// Check if there are any build side bloom filter defined on the hash table.
- if (proto.build_side_bloom_filter_id_size() > 0) {
+ if (proto.build_side_bloom_filters_size() > 0) {
hash_table->enableBuildSideBloomFilter();
- hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
+ for (int j = 0; j < proto.build_side_bloom_filters_size(); ++j) {
+ const auto build_side_bloom_filter = proto.build_side_bloom_filters(j);
+ hash_table->addBuildSideBloomFilter(
+ bloom_filters[build_side_bloom_filter.bloom_filter_id()].get());
+
+ hash_table->addBuildSideAttributeId(build_side_bloom_filter.attr_id());
+ }
}
// Check if there are any probe side bloom filters defined on the hash table.
@@ -330,15 +336,10 @@ class HashTableFactory {
for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
// Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
- hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
-
- // Add the attribute ids corresponding to this probe bloom filter.
- std::vector<attribute_id> probe_attribute_ids;
- for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
- const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
- probe_attribute_ids.push_back(probe_attribute_id);
- }
- hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
+ hash_table->addProbeSideBloomFilter(
+ bloom_filters[probe_side_bloom_filter.bloom_filter_id()].get());
+
+ hash_table->addProbeSideAttributeId(probe_side_bloom_filter.attr_id());
}
}