You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/11 20:27:53 UTC
[13/16] incubator-quickstep git commit: Updates to build bloom filters
Updates to build bloom filters
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6f3129f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6f3129f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6f3129f7
Branch: refs/heads/LIP-for-tpch
Commit: 6f3129f723f8b76c818368c0f7c30393db6565af
Parents: ef4cd1e
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Aug 8 12:08:32 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 11 15:23:00 2016 -0500
----------------------------------------------------------------------
query_execution/QueryManagerBase.cpp | 2 +-
query_optimizer/ExecutionGenerator.cpp | 26 ++++---
query_optimizer/ExecutionHeuristics.cpp | 4 +-
query_optimizer/cost_model/SimpleCostModel.cpp | 2 +-
query_optimizer/rules/AttachBloomFilters.cpp | 41 ++++++-----
.../StarSchemaHashJoinOrderOptimization.cpp | 6 +-
relational_operators/BuildHashOperator.cpp | 12 +++-
relational_operators/BuildHashOperator.hpp | 4 ++
relational_operators/CreateIndexOperator.cpp | 2 +-
relational_operators/CreateIndexOperator.hpp | 2 +-
relational_operators/CreateTableOperator.cpp | 2 +-
relational_operators/CreateTableOperator.hpp | 2 +-
relational_operators/DropTableOperator.cpp | 2 +-
relational_operators/DropTableOperator.hpp | 2 +-
relational_operators/RelationalOperator.hpp | 4 +-
storage/HashTable.hpp | 76 ++++++++++++++------
storage/StorageBlock.cpp | 12 ++--
storage/StorageBlock.hpp | 2 +-
utility/DAG.hpp | 4 +-
19 files changed, 129 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index 8e37da8..d49ee91 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -209,7 +209,7 @@ void QueryManagerBase::markOperatorFinished(const dag_node_index index) {
query_exec_state_->setExecutionFinished(index);
RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
- op->updateCatalogOnCompletion();
+ op->actionOnCompletion();
const relation_id output_rel = op->getOutputRelationID();
for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 21c6e30..02deb3a 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -631,8 +631,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
bool any_probe_attributes_nullable = false;
bool any_build_attributes_nullable = false;
- bool skip_hash_join_optimization = false;
-
const std::vector<E::AttributeReferencePtr> &left_join_attributes =
physical_plan->left_join_attributes();
for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) {
@@ -840,17 +838,15 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
// Add heuristics for the Hash Join, if enabled.
- if (FLAGS_optimize_joins && !skip_hash_join_optimization) {
- execution_heuristics_->addHashJoinInfo(build_operator_index,
- join_operator_index,
- referenced_stored_build_relation,
- referenced_stored_probe_relation,
- bloom_filter_config,
- std::move(build_side_bloom_filter_attribute_ids),
- std::move(probe_side_bloom_filter_attribute_ids),
- join_hash_table_index,
- star_schema_cost_model_->estimateCardinality(build_physical));
- }
+ execution_heuristics_->addHashJoinInfo(build_operator_index,
+ join_operator_index,
+ referenced_stored_build_relation,
+ referenced_stored_probe_relation,
+ bloom_filter_config,
+ std::move(build_side_bloom_filter_attribute_ids),
+ std::move(probe_side_bloom_filter_attribute_ids),
+ join_hash_table_index,
+ star_schema_cost_model_->estimateCardinality(build_physical));
}
void ExecutionGenerator::convertNestedLoopsJoin(
@@ -1443,7 +1439,9 @@ void ExecutionGenerator::convertAggregate(
aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
}
- aggr_state_proto->set_estimated_num_entries(cost_model_->estimateCardinality(physical_plan));
+// aggr_state_proto->set_estimated_num_entries(cost_model_->estimateCardinality(physical_plan));
+ aggr_state_proto->set_estimated_num_entries(
+ star_schema_cost_model_->estimateCardinality(physical_plan) * 10);
const QueryPlan::DAGNodeIndex aggregation_operator_index =
execution_plan_->addRelationalOperator(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/query_optimizer/ExecutionHeuristics.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.cpp b/query_optimizer/ExecutionHeuristics.cpp
index d5d7640..81e7362 100644
--- a/query_optimizer/ExecutionHeuristics.cpp
+++ b/query_optimizer/ExecutionHeuristics.cpp
@@ -70,8 +70,8 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
auto *build_side_bloom_filter = hash_table_proto->add_build_side_bloom_filters();
build_side_bloom_filter->set_bloom_filter_id(bloom_filter_id);
build_side_bloom_filter->set_attr_id(info.build_side_bloom_filter_ids_[i]);
- std::cerr << "Build " << build_side_bf.attribute->toString()
- << " @" << bloom_filter_config.builder << "\n";
+// std::cerr << "Build " << build_side_bf.attribute->toString()
+// << " @" << bloom_filter_config.builder << "\n";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index 45e2f00..f3d4fee 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -119,7 +119,7 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate(
return 1;
}
return std::max(static_cast<std::size_t>(1),
- estimateCardinality(physical_plan->input()));
+ estimateCardinality(physical_plan->input()) / 10);
}
std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/query_optimizer/rules/AttachBloomFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp
index 10ed512..898d831 100644
--- a/query_optimizer/rules/AttachBloomFilters.cpp
+++ b/query_optimizer/rules/AttachBloomFilters.cpp
@@ -143,12 +143,11 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
// Bloom filters from parent
const auto &parent_bloom_filters = consumers_[node];
if (!parent_bloom_filters.empty()) {
-// if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
-// const P::HashJoinPtr hash_join =
-// std::static_pointer_cast<const P::HashJoin>(node);
+// P::HashJoinPtr hash_join;
+// if (P::SomeHashJoin::MatchesWithConditionalCast(node, &hash_join) &&
+// hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin) {
// const std::vector<const std::vector<E::AttributeReferencePtr>*> join_attributes =
// { &hash_join->left_join_attributes(), &hash_join->right_join_attributes() };
-//
// for (std::size_t i = 0; i < 2; ++i) {
// const auto child = hash_join->children()[i];
// std::unordered_set<E::ExprId> child_output_attribute_ids;
@@ -188,7 +187,7 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
// }
// consumers_.emplace(child, std::move(bloom_filters));
// }
-// }
+// } else {
for (const auto &child : node->children()) {
std::unordered_set<E::ExprId> child_output_attribute_ids;
for (const auto &attr : child->getOutputAttributes()) {
@@ -209,6 +208,7 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
}
consumers_.emplace(child, std::move(bloom_filters));
}
+// }
}
// Bloom filters from build side to probe side via HashJoin
@@ -260,14 +260,18 @@ void AttachBloomFilters::decideAttach(
}
P::PhysicalPtr consumer_child = nullptr;
- if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
- consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
- }
- if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
- consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
- }
- if (node->getPhysicalType() == P::PhysicalType::kSelection) {
- consumer_child = std::static_pointer_cast<const P::Selection>(node)->input();
+ switch (node->getPhysicalType()) {
+ case P::PhysicalType::kHashJoin:
+ consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
+ break;
+ case P::PhysicalType::kAggregate:
+ consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
+ break;
+ case P::PhysicalType::kSelection:
+ consumer_child = std::static_pointer_cast<const P::Selection>(node)->input();
+ break;
+ default:
+ break;
}
if (consumer_child != nullptr) {
@@ -320,7 +324,7 @@ P::PhysicalPtr AttachBloomFilters::performAttach(const physical::PhysicalPtr &no
const auto attach_it = attaches_.find(node);
if (attach_it != attaches_.end()) {
// for (const auto& item : attach_it->second.probe_side_bloom_filters) {
-// std::cout << "Attach probe from " << item.builder
+// std::cerr << "Attach probe from " << item.builder
// << " to " << node << "\n";
// }
@@ -342,14 +346,14 @@ P::PhysicalPtr AttachBloomFilters::performAttach(const physical::PhysicalPtr &no
const auto attach_it = attaches_.find(node);
if (attach_it != attaches_.end()) {
// for (const auto& item : attach_it->second.probe_side_bloom_filters) {
-// std::cout << "Attach probe from " << item.builder
+// std::cerr << "Attach probe from " << item.builder
// << " to " << node << "\n";
// }
const P::AggregatePtr aggregate =
std::static_pointer_cast<const P::Aggregate>(node);
return P::Aggregate::Create(
- aggregate->input(),
+ new_children[0],
aggregate->grouping_expressions(),
aggregate->aggregate_expressions(),
aggregate->filter_predicate(),
@@ -361,14 +365,14 @@ P::PhysicalPtr AttachBloomFilters::performAttach(const physical::PhysicalPtr &no
const auto attach_it = attaches_.find(node);
if (attach_it != attaches_.end()) {
// for (const auto& item : attach_it->second.probe_side_bloom_filters) {
-// std::cout << "Attach probe from " << item.builder
+// std::cerr << "Attach probe from " << item.builder
// << " to " << node << "\n";
// }
const P::SelectionPtr selection =
std::static_pointer_cast<const P::Selection>(node);
return P::Selection::Create(
- selection->input(),
+ new_children[0],
selection->project_expressions(),
selection->filter_predicate(),
attach_it->second);
@@ -378,7 +382,6 @@ P::PhysicalPtr AttachBloomFilters::performAttach(const physical::PhysicalPtr &no
if (has_changed) {
return node->copyWithNewChildren(new_children);
}
-
return node;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
index 9e8d794..1e38f63 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
@@ -254,9 +254,9 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
TableInfo *selected_probe_table_info = best_join->probe;
TableInfo *selected_build_table_info = best_join->build;
- std::cerr << "card: " << selected_probe_table_info->estimated_cardinality << "\n";
- std::cerr << "card: " << selected_build_table_info->estimated_cardinality << "\n";
- std::cerr << "--------\n";
+// std::cerr << "card: " << selected_probe_table_info->estimated_cardinality << "\n";
+// std::cerr << "card: " << selected_build_table_info->estimated_cardinality << "\n";
+// std::cerr << "--------\n";
if (!best_join->build_side_unique &&
selected_probe_table_info->estimated_cardinality < selected_build_table_info->estimated_cardinality) {
std::swap(selected_probe_table_info, selected_build_table_info);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/relational_operators/BuildHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp
index 465621c..b4e20e4 100644
--- a/relational_operators/BuildHashOperator.cpp
+++ b/relational_operators/BuildHashOperator.cpp
@@ -68,7 +68,10 @@ bool BuildHashOperator::getAllWorkOrders(
tmb::MessageBus *bus) {
DCHECK(query_context != nullptr);
- JoinHashTable *hash_table = query_context->getJoinHashTable(hash_table_index_);
+ if (hash_table_ == nullptr) {
+ hash_table_ = query_context->getJoinHashTable(hash_table_index_);
+ }
+
if (input_relation_is_stored_) {
if (!started_) {
for (const block_id input_block_id : input_relation_block_ids_) {
@@ -78,7 +81,7 @@ bool BuildHashOperator::getAllWorkOrders(
join_key_attributes_,
any_join_key_attributes_nullable_,
input_block_id,
- hash_table,
+ hash_table_,
storage_manager),
op_index_);
}
@@ -94,7 +97,7 @@ bool BuildHashOperator::getAllWorkOrders(
join_key_attributes_,
any_join_key_attributes_nullable_,
input_relation_block_ids_[num_workorders_generated_],
- hash_table,
+ hash_table_,
storage_manager),
op_index_);
++num_workorders_generated_;
@@ -140,6 +143,9 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id
return proto;
}
+void BuildHashOperator::actionOnCompletion() {
+ hash_table_->finalizeBuildSideThreadLocalBloomFilters();
+}
void BuildHashWorkOrder::execute() {
BlockReference block(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index 4a80a8a..15b23f5 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -89,6 +89,7 @@ class BuildHashOperator : public RelationalOperator {
join_key_attributes_(join_key_attributes),
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
hash_table_index_(hash_table_index),
+ hash_table_(nullptr),
input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
: std::vector<block_id>()),
num_workorders_generated_(0),
@@ -124,6 +125,8 @@ class BuildHashOperator : public RelationalOperator {
partially_filled_blocks->end());
}
+ void actionOnCompletion() override;
+
private:
/**
* @brief Create Work Order proto.
@@ -137,6 +140,7 @@ class BuildHashOperator : public RelationalOperator {
const std::vector<attribute_id> join_key_attributes_;
const bool any_join_key_attributes_nullable_;
const QueryContext::join_hash_table_id hash_table_index_;
+ JoinHashTable *hash_table_;
std::vector<block_id> input_relation_block_ids_;
std::vector<block_id>::size_type num_workorders_generated_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/relational_operators/CreateIndexOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateIndexOperator.cpp b/relational_operators/CreateIndexOperator.cpp
index ab3624c..dd311bf 100644
--- a/relational_operators/CreateIndexOperator.cpp
+++ b/relational_operators/CreateIndexOperator.cpp
@@ -33,7 +33,7 @@ bool CreateIndexOperator::getAllWorkOrders(WorkOrdersContainer *container,
return true;
}
-void CreateIndexOperator::updateCatalogOnCompletion() {
+void CreateIndexOperator::actionOnCompletion() {
relation_->addIndex(index_name_, std::move(index_description_));
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/relational_operators/CreateIndexOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateIndexOperator.hpp b/relational_operators/CreateIndexOperator.hpp
index fa992c9..1f6775a 100644
--- a/relational_operators/CreateIndexOperator.hpp
+++ b/relational_operators/CreateIndexOperator.hpp
@@ -90,7 +90,7 @@ class CreateIndexOperator : public RelationalOperator {
return true;
}
- void updateCatalogOnCompletion() override;
+ void actionOnCompletion() override;
private:
CatalogRelation *relation_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/relational_operators/CreateTableOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateTableOperator.cpp b/relational_operators/CreateTableOperator.cpp
index 261bec1..4cc615b 100644
--- a/relational_operators/CreateTableOperator.cpp
+++ b/relational_operators/CreateTableOperator.cpp
@@ -36,7 +36,7 @@ bool CreateTableOperator::getAllWorkOrders(
return true;
}
-void CreateTableOperator::updateCatalogOnCompletion() {
+void CreateTableOperator::actionOnCompletion() {
database_->addRelation(relation_.release());
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/relational_operators/CreateTableOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateTableOperator.hpp b/relational_operators/CreateTableOperator.hpp
index 3a2e29b..9b889ca 100644
--- a/relational_operators/CreateTableOperator.hpp
+++ b/relational_operators/CreateTableOperator.hpp
@@ -89,7 +89,7 @@ class CreateTableOperator : public RelationalOperator {
return true;
}
- void updateCatalogOnCompletion() override;
+ void actionOnCompletion() override;
private:
std::unique_ptr<CatalogRelation> relation_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/relational_operators/DropTableOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DropTableOperator.cpp b/relational_operators/DropTableOperator.cpp
index 5cd5ebc..a5e3d7c 100644
--- a/relational_operators/DropTableOperator.cpp
+++ b/relational_operators/DropTableOperator.cpp
@@ -80,7 +80,7 @@ bool DropTableOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *containe
return work_generated_;
}
-void DropTableOperator::updateCatalogOnCompletion() {
+void DropTableOperator::actionOnCompletion() {
const relation_id rel_id = relation_.getID();
if (only_drop_blocks_) {
database_->getRelationByIdMutable(rel_id)->clearBlocks();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/relational_operators/DropTableOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DropTableOperator.hpp b/relational_operators/DropTableOperator.hpp
index e713bd6..534a266 100644
--- a/relational_operators/DropTableOperator.hpp
+++ b/relational_operators/DropTableOperator.hpp
@@ -89,7 +89,7 @@ class DropTableOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void updateCatalogOnCompletion() override;
+ void actionOnCompletion() override;
private:
const CatalogRelation &relation_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index f0303e5..ac5dd54 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -114,11 +114,11 @@ class RelationalOperator {
virtual bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) = 0;
/**
- * @brief Update Catalog upon the completion of this RelationalOperator, if
+ * @brief Perform action upon the completion of this RelationalOperator, if
* necessary.
*
**/
- virtual void updateCatalogOnCompletion() {
+ virtual void actionOnCompletion() {
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index 3538181..2ef9359 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -23,7 +23,9 @@
#include <atomic>
#include <cstddef>
#include <cstdlib>
+#include <map>
#include <memory>
+#include <thread>
#include <type_traits>
#include <vector>
@@ -37,6 +39,7 @@
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "threading/SpinSharedMutex.hpp"
+#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "utility/BloomFilter.hpp"
@@ -1028,6 +1031,17 @@ class HashTable : public HashTableBase<resizable,
build_attribute_ids_.push_back(build_attribute_id);
}
+ inline void finalizeBuildSideThreadLocalBloomFilters() {
+ if (has_build_side_bloom_filter_) {
+ for (const auto &thread_local_bf_pair : thread_local_bloom_filters_) {
+ for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) {
+ build_bloom_filters_[i]->bitwiseOr(
+ thread_local_bf_pair.second[i].get());
+ }
+ }
+ }
+ }
+
/**
* @brief This function adds a pointer to the list of bloom filters to be
* used during the probe phase of this hash table.
@@ -1338,6 +1352,8 @@ class HashTable : public HashTableBase<resizable,
bool has_build_side_bloom_filter_ = false;
bool has_probe_side_bloom_filter_ = false;
std::vector<BloomFilter *> build_bloom_filters_;
+ std::map<std::thread::id, std::vector<std::unique_ptr<BloomFilter>>> thread_local_bloom_filters_;
+ SpinMutex bloom_filter_mutex_;
std::vector<attribute_id> build_attribute_ids_;
std::vector<const BloomFilter*> probe_bloom_filters_;
std::vector<attribute_id> probe_attribute_ids_;
@@ -1487,22 +1503,19 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
}
}
+ BloomFilter *thread_local_bloom_filter = nullptr;
if (has_build_side_bloom_filter_) {
- for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) {
- auto *build_bloom_filter = build_bloom_filters_[i];
- std::unique_ptr<BloomFilter> thread_local_bloom_filter(
- new BloomFilter(build_bloom_filter->getNumberOfHashes(),
- build_bloom_filter->getBitArraySize()));
- const auto &build_attr = build_attribute_ids_[i];
- const std::size_t attr_size =
- accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, build_attr).second;
- while (accessor->next()) {
- thread_local_bloom_filter->insertUnSafe(
- static_cast<const std::uint8_t *>(accessor->getUntypedValue(build_attr)),
- attr_size);
- }
- build_bloom_filter->bitwiseOr(thread_local_bloom_filter.get());
- accessor->beginIteration();
+ const auto tid = std::this_thread::get_id();
+ SpinMutexLock lock(bloom_filter_mutex_);
+ auto bf_it = thread_local_bloom_filters_.find(tid);
+ if (bf_it == thread_local_bloom_filters_.end()) {
+ auto &bf_vector = thread_local_bloom_filters_[tid];
+ bf_vector.emplace_back(
+ std::make_unique<BloomFilter>(build_bloom_filters_[0]->getNumberOfHashes(),
+ build_bloom_filters_[0]->getBitArraySize()));
+ thread_local_bloom_filter = bf_vector[0].get();
+ } else {
+ thread_local_bloom_filter = bf_it->second[0].get();
}
}
@@ -1521,6 +1534,11 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
variable_size,
(*functor)(*accessor),
using_prealloc ? &prealloc_state : nullptr);
+ // Insert into bloom filter, if enabled.
+ if (has_build_side_bloom_filter_) {
+ thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
+ key.getDataSize());
+ }
if (result == HashTablePutResult::kDuplicateKey) {
DEBUG_ASSERT(!using_prealloc);
return result;
@@ -1546,6 +1564,11 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
variable_size,
(*functor)(*accessor),
using_prealloc ? &prealloc_state : nullptr);
+ // Insert into bloom filter, if enabled.
+ if (has_build_side_bloom_filter_) {
+ thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
+ key.getDataSize());
+ }
if (result != HashTablePutResult::kOK) {
return result;
}
@@ -1618,12 +1641,26 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
}
if (has_build_side_bloom_filter_) {
+ const auto tid = std::this_thread::get_id();
+ std::vector<std::unique_ptr<BloomFilter>> *thread_local_bf_vector;
+ {
+ SpinMutexLock lock(bloom_filter_mutex_);
+ auto bf_it = thread_local_bloom_filters_.find(tid);
+ if (bf_it == thread_local_bloom_filters_.end()) {
+ thread_local_bf_vector = &thread_local_bloom_filters_[tid];
+ for (const auto &build_side_bf : build_bloom_filters_) {
+ thread_local_bf_vector->emplace_back(
+ std::make_unique<BloomFilter>(build_side_bf->getNumberOfHashes(),
+ build_side_bf->getBitArraySize()));
+ }
+ } else {
+ thread_local_bf_vector = &bf_it->second;
+ }
+ }
+
for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) {
- auto *build_bloom_filter = build_bloom_filters_[i];
- std::unique_ptr<BloomFilter> thread_local_bloom_filter(
- new BloomFilter(build_bloom_filter->getNumberOfHashes(),
- build_bloom_filter->getBitArraySize()));
const auto &build_attr = build_attribute_ids_[i];
+ BloomFilter *thread_local_bloom_filter = (*thread_local_bf_vector)[i].get();
const std::size_t attr_size =
accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, build_attr).second;
while (accessor->next()) {
@@ -1631,7 +1668,6 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
static_cast<const std::uint8_t *>(accessor->getUntypedValue(build_attr)),
attr_size);
}
- build_bloom_filter->bitwiseOr(thread_local_bloom_filter.get());
accessor->beginIteration();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 74ff5b6..7bbba9a 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -1326,12 +1326,16 @@ bool StorageBlock::rebuildIndexes(bool short_circuit) {
}
TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate,
- const TupleIdSequence *sequence) const {
+ const TupleIdSequence *filter) const {
if (predicate == nullptr) {
- return tuple_store_->getExistenceMap();
+ TupleIdSequence *sequence = tuple_store_->getExistenceMap();
+ if (filter != nullptr) {
+ sequence->intersectWith(*filter);
+ }
+ return sequence;
}
- std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor(sequence));
+ std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor());
std::unique_ptr<TupleIdSequence> existence_map;
if (!tuple_store_->isPacked()) {
existence_map.reset(tuple_store_->getExistenceMap());
@@ -1341,7 +1345,7 @@ TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate
indices_consistent_);
return predicate->getAllMatches(value_accessor.get(),
&sub_blocks_ref,
- nullptr,
+ filter,
existence_map.get());
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 5cca51c..4284ea1 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -593,7 +593,7 @@ class StorageBlock : public StorageBlockBase {
const std::size_t getNumTuples() const;
TupleIdSequence* getMatchesForPredicate(const Predicate *predicate,
- const TupleIdSequence *sequence = nullptr) const;
+ const TupleIdSequence *filter = nullptr) const;
private:
static TupleStorageSubBlock* CreateTupleStorageSubBlock(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6f3129f7/utility/DAG.hpp
----------------------------------------------------------------------
diff --git a/utility/DAG.hpp b/utility/DAG.hpp
index a1f2619..1d1caa1 100644
--- a/utility/DAG.hpp
+++ b/utility/DAG.hpp
@@ -293,8 +293,8 @@ class DAG {
* node at node_index.
**/
inline void addDependent(const size_type_nodes node_index, const LinkMetadataT &link_metadata) {
- DCHECK(dependents_with_metadata_.find(node_index) == dependents_with_metadata_.end());
- dependents_with_metadata_.emplace(node_index, link_metadata);
+ // DCHECK(dependents_with_metadata_.find(node_index) == dependents_with_metadata_.end());
+ dependents_with_metadata_[node_index] = link_metadata;
}
/**