You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/08 17:17:38 UTC

[35/39] incubator-quickstep git commit: Updates to build bloom filters

Updates to build bloom filters


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/abc9c023
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/abc9c023
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/abc9c023

Branch: refs/heads/LIP-for-tpch
Commit: abc9c023856b8c45afdebce77fb02a19edd2d645
Parents: 1602a8d
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Aug 8 12:08:32 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Mon Aug 8 12:14:36 2016 -0500

----------------------------------------------------------------------
 query_execution/QueryManagerBase.cpp            |  2 +-
 query_optimizer/ExecutionGenerator.cpp          | 26 ++++---
 query_optimizer/ExecutionHeuristics.cpp         |  4 +-
 query_optimizer/cost_model/SimpleCostModel.cpp  |  2 +-
 query_optimizer/rules/AttachBloomFilters.cpp    | 41 ++++++-----
 .../StarSchemaHashJoinOrderOptimization.cpp     |  6 +-
 relational_operators/BuildHashOperator.cpp      | 12 +++-
 relational_operators/BuildHashOperator.hpp      |  4 ++
 relational_operators/CreateIndexOperator.cpp    |  2 +-
 relational_operators/CreateIndexOperator.hpp    |  2 +-
 relational_operators/CreateTableOperator.cpp    |  2 +-
 relational_operators/CreateTableOperator.hpp    |  2 +-
 relational_operators/DropTableOperator.cpp      |  2 +-
 relational_operators/DropTableOperator.hpp      |  2 +-
 relational_operators/RelationalOperator.hpp     |  4 +-
 storage/HashTable.hpp                           | 76 ++++++++++++++------
 storage/StorageBlock.cpp                        | 12 ++--
 storage/StorageBlock.hpp                        |  2 +-
 utility/DAG.hpp                                 |  4 +-
 19 files changed, 129 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index c60e323..cc0b948 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -208,7 +208,7 @@ void QueryManagerBase::markOperatorFinished(const dag_node_index index) {
   query_exec_state_->setExecutionFinished(index);
 
   RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
-  op->updateCatalogOnCompletion();
+  op->actionOnCompletion();
 
   const relation_id output_rel = op->getOutputRelationID();
   for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 21c6e30..02deb3a 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -631,8 +631,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   bool any_probe_attributes_nullable = false;
   bool any_build_attributes_nullable = false;
 
-  bool skip_hash_join_optimization = false;
-
   const std::vector<E::AttributeReferencePtr> &left_join_attributes =
       physical_plan->left_join_attributes();
   for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) {
@@ -840,17 +838,15 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
 
   // Add heuristics for the Hash Join, if enabled.
-  if (FLAGS_optimize_joins && !skip_hash_join_optimization) {
-    execution_heuristics_->addHashJoinInfo(build_operator_index,
-                                           join_operator_index,
-                                           referenced_stored_build_relation,
-                                           referenced_stored_probe_relation,
-                                           bloom_filter_config,
-                                           std::move(build_side_bloom_filter_attribute_ids),
-                                           std::move(probe_side_bloom_filter_attribute_ids),
-                                           join_hash_table_index,
-                                           star_schema_cost_model_->estimateCardinality(build_physical));
-  }
+  execution_heuristics_->addHashJoinInfo(build_operator_index,
+                                         join_operator_index,
+                                         referenced_stored_build_relation,
+                                         referenced_stored_probe_relation,
+                                         bloom_filter_config,
+                                         std::move(build_side_bloom_filter_attribute_ids),
+                                         std::move(probe_side_bloom_filter_attribute_ids),
+                                         join_hash_table_index,
+                                         star_schema_cost_model_->estimateCardinality(build_physical));
 }
 
 void ExecutionGenerator::convertNestedLoopsJoin(
@@ -1443,7 +1439,9 @@ void ExecutionGenerator::convertAggregate(
     aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
   }
 
-  aggr_state_proto->set_estimated_num_entries(cost_model_->estimateCardinality(physical_plan));
+//  aggr_state_proto->set_estimated_num_entries(cost_model_->estimateCardinality(physical_plan));
+  aggr_state_proto->set_estimated_num_entries(
+      star_schema_cost_model_->estimateCardinality(physical_plan) * 10);
 
   const QueryPlan::DAGNodeIndex aggregation_operator_index =
       execution_plan_->addRelationalOperator(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/query_optimizer/ExecutionHeuristics.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.cpp b/query_optimizer/ExecutionHeuristics.cpp
index d5d7640..81e7362 100644
--- a/query_optimizer/ExecutionHeuristics.cpp
+++ b/query_optimizer/ExecutionHeuristics.cpp
@@ -70,8 +70,8 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
       auto *build_side_bloom_filter = hash_table_proto->add_build_side_bloom_filters();
       build_side_bloom_filter->set_bloom_filter_id(bloom_filter_id);
       build_side_bloom_filter->set_attr_id(info.build_side_bloom_filter_ids_[i]);
-      std::cerr << "Build " << build_side_bf.attribute->toString()
-                << " @" << bloom_filter_config.builder << "\n";
+//      std::cerr << "Build " << build_side_bf.attribute->toString()
+//                << " @" << bloom_filter_config.builder << "\n";
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index 45e2f00..f3d4fee 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -119,7 +119,7 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate(
     return 1;
   }
   return std::max(static_cast<std::size_t>(1),
-                  estimateCardinality(physical_plan->input()));
+                  estimateCardinality(physical_plan->input()) / 10);
 }
 
 std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/query_optimizer/rules/AttachBloomFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp
index 10ed512..898d831 100644
--- a/query_optimizer/rules/AttachBloomFilters.cpp
+++ b/query_optimizer/rules/AttachBloomFilters.cpp
@@ -143,12 +143,11 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
   // Bloom filters from parent
   const auto &parent_bloom_filters = consumers_[node];
   if (!parent_bloom_filters.empty()) {
-//    if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
-//      const P::HashJoinPtr hash_join =
-//          std::static_pointer_cast<const P::HashJoin>(node);
+//    P::HashJoinPtr hash_join;
+//    if (P::SomeHashJoin::MatchesWithConditionalCast(node, &hash_join) &&
+//        hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin) {
 //      const std::vector<const std::vector<E::AttributeReferencePtr>*> join_attributes =
 //          { &hash_join->left_join_attributes(), &hash_join->right_join_attributes() };
-//
 //      for (std::size_t i = 0; i < 2; ++i) {
 //        const auto child = hash_join->children()[i];
 //        std::unordered_set<E::ExprId> child_output_attribute_ids;
@@ -188,7 +187,7 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
 //        }
 //        consumers_.emplace(child, std::move(bloom_filters));
 //      }
-//    }
+//    } else {
     for (const auto &child : node->children()) {
       std::unordered_set<E::ExprId> child_output_attribute_ids;
       for (const auto &attr : child->getOutputAttributes()) {
@@ -209,6 +208,7 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
       }
       consumers_.emplace(child, std::move(bloom_filters));
     }
+//    }
   }
 
   // Bloom filters from build side to probe side via HashJoin
@@ -260,14 +260,18 @@ void AttachBloomFilters::decideAttach(
   }
 
   P::PhysicalPtr consumer_child = nullptr;
-  if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
-    consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
-  }
-  if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
-    consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
-  }
-  if (node->getPhysicalType() == P::PhysicalType::kSelection) {
-    consumer_child = std::static_pointer_cast<const P::Selection>(node)->input();
+  switch (node->getPhysicalType()) {
+    case P::PhysicalType::kHashJoin:
+      consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
+      break;
+    case P::PhysicalType::kAggregate:
+      consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
+      break;
+    case P::PhysicalType::kSelection:
+      consumer_child = std::static_pointer_cast<const P::Selection>(node)->input();
+      break;
+    default:
+      break;
   }
 
   if (consumer_child != nullptr) {
@@ -320,7 +324,7 @@ P::PhysicalPtr AttachBloomFilters::performAttach(const physical::PhysicalPtr &no
     const auto attach_it = attaches_.find(node);
     if (attach_it != attaches_.end()) {
 //      for (const auto& item : attach_it->second.probe_side_bloom_filters) {
-//        std::cout << "Attach probe from " << item.builder
+//        std::cerr << "Attach probe from " << item.builder
 //                  << " to " << node << "\n";
 //      }
 
@@ -342,14 +346,14 @@ P::PhysicalPtr AttachBloomFilters::performAttach(const physical::PhysicalPtr &no
     const auto attach_it = attaches_.find(node);
     if (attach_it != attaches_.end()) {
 //      for (const auto& item : attach_it->second.probe_side_bloom_filters) {
-//        std::cout << "Attach probe from " << item.builder
+//        std::cerr << "Attach probe from " << item.builder
 //                  << " to " << node << "\n";
 //      }
 
       const P::AggregatePtr aggregate =
           std::static_pointer_cast<const P::Aggregate>(node);
       return P::Aggregate::Create(
-          aggregate->input(),
+          new_children[0],
           aggregate->grouping_expressions(),
           aggregate->aggregate_expressions(),
           aggregate->filter_predicate(),
@@ -361,14 +365,14 @@ P::PhysicalPtr AttachBloomFilters::performAttach(const physical::PhysicalPtr &no
     const auto attach_it = attaches_.find(node);
     if (attach_it != attaches_.end()) {
 //      for (const auto& item : attach_it->second.probe_side_bloom_filters) {
-//        std::cout << "Attach probe from " << item.builder
+//        std::cerr << "Attach probe from " << item.builder
 //                  << " to " << node << "\n";
 //      }
 
       const P::SelectionPtr selection =
           std::static_pointer_cast<const P::Selection>(node);
       return P::Selection::Create(
-          selection->input(),
+          new_children[0],
           selection->project_expressions(),
           selection->filter_predicate(),
           attach_it->second);
@@ -378,7 +382,6 @@ P::PhysicalPtr AttachBloomFilters::performAttach(const physical::PhysicalPtr &no
   if (has_changed) {
     return node->copyWithNewChildren(new_children);
   }
-
   return node;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
index 9e8d794..1e38f63 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
@@ -254,9 +254,9 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
 
     TableInfo *selected_probe_table_info = best_join->probe;
     TableInfo *selected_build_table_info = best_join->build;
-    std::cerr << "card: " << selected_probe_table_info->estimated_cardinality << "\n";
-    std::cerr << "card: " << selected_build_table_info->estimated_cardinality << "\n";
-    std::cerr << "--------\n";
+//    std::cerr << "card: " << selected_probe_table_info->estimated_cardinality << "\n";
+//    std::cerr << "card: " << selected_build_table_info->estimated_cardinality << "\n";
+//    std::cerr << "--------\n";
     if (!best_join->build_side_unique &&
         selected_probe_table_info->estimated_cardinality < selected_build_table_info->estimated_cardinality) {
       std::swap(selected_probe_table_info, selected_build_table_info);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/relational_operators/BuildHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp
index 465621c..b4e20e4 100644
--- a/relational_operators/BuildHashOperator.cpp
+++ b/relational_operators/BuildHashOperator.cpp
@@ -68,7 +68,10 @@ bool BuildHashOperator::getAllWorkOrders(
     tmb::MessageBus *bus) {
   DCHECK(query_context != nullptr);
 
-  JoinHashTable *hash_table = query_context->getJoinHashTable(hash_table_index_);
+  if (hash_table_ == nullptr) {
+    hash_table_ = query_context->getJoinHashTable(hash_table_index_);
+  }
+
   if (input_relation_is_stored_) {
     if (!started_) {
       for (const block_id input_block_id : input_relation_block_ids_) {
@@ -78,7 +81,7 @@ bool BuildHashOperator::getAllWorkOrders(
                                    join_key_attributes_,
                                    any_join_key_attributes_nullable_,
                                    input_block_id,
-                                   hash_table,
+                                   hash_table_,
                                    storage_manager),
             op_index_);
       }
@@ -94,7 +97,7 @@ bool BuildHashOperator::getAllWorkOrders(
               join_key_attributes_,
               any_join_key_attributes_nullable_,
               input_relation_block_ids_[num_workorders_generated_],
-              hash_table,
+              hash_table_,
               storage_manager),
           op_index_);
       ++num_workorders_generated_;
@@ -140,6 +143,9 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id
   return proto;
 }
 
+void BuildHashOperator::actionOnCompletion() {
+  hash_table_->finalizeBuildSideThreadLocalBloomFilters();
+}
 
 void BuildHashWorkOrder::execute() {
   BlockReference block(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index 4a80a8a..15b23f5 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -89,6 +89,7 @@ class BuildHashOperator : public RelationalOperator {
       join_key_attributes_(join_key_attributes),
       any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
       hash_table_index_(hash_table_index),
+      hash_table_(nullptr),
       input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
                                                          : std::vector<block_id>()),
       num_workorders_generated_(0),
@@ -124,6 +125,8 @@ class BuildHashOperator : public RelationalOperator {
                                      partially_filled_blocks->end());
   }
 
+  void actionOnCompletion() override;
+
  private:
   /**
    * @brief Create Work Order proto.
@@ -137,6 +140,7 @@ class BuildHashOperator : public RelationalOperator {
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
   const QueryContext::join_hash_table_id hash_table_index_;
+  JoinHashTable *hash_table_;
 
   std::vector<block_id> input_relation_block_ids_;
   std::vector<block_id>::size_type num_workorders_generated_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/relational_operators/CreateIndexOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateIndexOperator.cpp b/relational_operators/CreateIndexOperator.cpp
index ab3624c..dd311bf 100644
--- a/relational_operators/CreateIndexOperator.cpp
+++ b/relational_operators/CreateIndexOperator.cpp
@@ -33,7 +33,7 @@ bool CreateIndexOperator::getAllWorkOrders(WorkOrdersContainer *container,
   return true;
 }
 
-void CreateIndexOperator::updateCatalogOnCompletion() {
+void CreateIndexOperator::actionOnCompletion() {
   relation_->addIndex(index_name_, std::move(index_description_));
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/relational_operators/CreateIndexOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateIndexOperator.hpp b/relational_operators/CreateIndexOperator.hpp
index fa992c9..1f6775a 100644
--- a/relational_operators/CreateIndexOperator.hpp
+++ b/relational_operators/CreateIndexOperator.hpp
@@ -90,7 +90,7 @@ class CreateIndexOperator : public RelationalOperator {
     return true;
   }
 
-  void updateCatalogOnCompletion() override;
+  void actionOnCompletion() override;
 
  private:
   CatalogRelation *relation_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/relational_operators/CreateTableOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateTableOperator.cpp b/relational_operators/CreateTableOperator.cpp
index 261bec1..4cc615b 100644
--- a/relational_operators/CreateTableOperator.cpp
+++ b/relational_operators/CreateTableOperator.cpp
@@ -36,7 +36,7 @@ bool CreateTableOperator::getAllWorkOrders(
   return true;
 }
 
-void CreateTableOperator::updateCatalogOnCompletion() {
+void CreateTableOperator::actionOnCompletion() {
   database_->addRelation(relation_.release());
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/relational_operators/CreateTableOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateTableOperator.hpp b/relational_operators/CreateTableOperator.hpp
index 3a2e29b..9b889ca 100644
--- a/relational_operators/CreateTableOperator.hpp
+++ b/relational_operators/CreateTableOperator.hpp
@@ -89,7 +89,7 @@ class CreateTableOperator : public RelationalOperator {
     return true;
   }
 
-  void updateCatalogOnCompletion() override;
+  void actionOnCompletion() override;
 
  private:
   std::unique_ptr<CatalogRelation> relation_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/relational_operators/DropTableOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DropTableOperator.cpp b/relational_operators/DropTableOperator.cpp
index 5cd5ebc..a5e3d7c 100644
--- a/relational_operators/DropTableOperator.cpp
+++ b/relational_operators/DropTableOperator.cpp
@@ -80,7 +80,7 @@ bool DropTableOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *containe
   return work_generated_;
 }
 
-void DropTableOperator::updateCatalogOnCompletion() {
+void DropTableOperator::actionOnCompletion() {
   const relation_id rel_id = relation_.getID();
   if (only_drop_blocks_) {
     database_->getRelationByIdMutable(rel_id)->clearBlocks();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/relational_operators/DropTableOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DropTableOperator.hpp b/relational_operators/DropTableOperator.hpp
index e713bd6..534a266 100644
--- a/relational_operators/DropTableOperator.hpp
+++ b/relational_operators/DropTableOperator.hpp
@@ -89,7 +89,7 @@ class DropTableOperator : public RelationalOperator {
 
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
-  void updateCatalogOnCompletion() override;
+  void actionOnCompletion() override;
 
  private:
   const CatalogRelation &relation_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index f0303e5..ac5dd54 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -114,11 +114,11 @@ class RelationalOperator {
   virtual bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) = 0;
 
   /**
-   * @brief Update Catalog upon the completion of this RelationalOperator, if
+   * @brief Perform action upon the completion of this RelationalOperator, if
    *        necessary.
    *
    **/
-  virtual void updateCatalogOnCompletion() {
+  virtual void actionOnCompletion() {
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index 3538181..2ef9359 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -23,7 +23,9 @@
 #include <atomic>
 #include <cstddef>
 #include <cstdlib>
+#include <map>
 #include <memory>
+#include <thread>
 #include <type_traits>
 #include <vector>
 
@@ -37,6 +39,7 @@
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "threading/SpinSharedMutex.hpp"
+#include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/BloomFilter.hpp"
@@ -1028,6 +1031,17 @@ class HashTable : public HashTableBase<resizable,
     build_attribute_ids_.push_back(build_attribute_id);
   }
 
+  inline void finalizeBuildSideThreadLocalBloomFilters() {
+    if (has_build_side_bloom_filter_) {
+      for (const auto &thread_local_bf_pair : thread_local_bloom_filters_) {
+        for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) {
+          build_bloom_filters_[i]->bitwiseOr(
+              thread_local_bf_pair.second[i].get());
+        }
+      }
+    }
+  }
+
   /**
    * @brief This function adds a pointer to the list of bloom filters to be
    *        used during the probe phase of this hash table.
@@ -1338,6 +1352,8 @@ class HashTable : public HashTableBase<resizable,
   bool has_build_side_bloom_filter_ = false;
   bool has_probe_side_bloom_filter_ = false;
   std::vector<BloomFilter *> build_bloom_filters_;
+  std::map<std::thread::id, std::vector<std::unique_ptr<BloomFilter>>> thread_local_bloom_filters_;
+  SpinMutex bloom_filter_mutex_;
   std::vector<attribute_id> build_attribute_ids_;
   std::vector<const BloomFilter*> probe_bloom_filters_;
   std::vector<attribute_id> probe_attribute_ids_;
@@ -1487,22 +1503,19 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
       }
     }
 
+    BloomFilter *thread_local_bloom_filter = nullptr;
     if (has_build_side_bloom_filter_) {
-      for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) {
-        auto *build_bloom_filter = build_bloom_filters_[i];
-        std::unique_ptr<BloomFilter> thread_local_bloom_filter(
-            new BloomFilter(build_bloom_filter->getNumberOfHashes(),
-                            build_bloom_filter->getBitArraySize()));
-        const auto &build_attr = build_attribute_ids_[i];
-        const std::size_t attr_size =
-            accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, build_attr).second;
-        while (accessor->next()) {
-          thread_local_bloom_filter->insertUnSafe(
-              static_cast<const std::uint8_t *>(accessor->getUntypedValue(build_attr)),
-              attr_size);
-        }
-        build_bloom_filter->bitwiseOr(thread_local_bloom_filter.get());
-        accessor->beginIteration();
+      const auto tid = std::this_thread::get_id();
+      SpinMutexLock lock(bloom_filter_mutex_);
+      auto bf_it = thread_local_bloom_filters_.find(tid);
+      if (bf_it == thread_local_bloom_filters_.end()) {
+        auto &bf_vector = thread_local_bloom_filters_[tid];
+        bf_vector.emplace_back(
+            std::make_unique<BloomFilter>(build_bloom_filters_[0]->getNumberOfHashes(),
+                                          build_bloom_filters_[0]->getBitArraySize()));
+        thread_local_bloom_filter = bf_vector[0].get();
+      } else {
+        thread_local_bloom_filter = bf_it->second[0].get();
       }
     }
 
@@ -1521,6 +1534,11 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                        variable_size,
                                        (*functor)(*accessor),
                                        using_prealloc ? &prealloc_state : nullptr);
+            // Insert into bloom filter, if enabled.
+            if (has_build_side_bloom_filter_) {
+              thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
+                                                      key.getDataSize());
+            }
             if (result == HashTablePutResult::kDuplicateKey) {
               DEBUG_ASSERT(!using_prealloc);
               return result;
@@ -1546,6 +1564,11 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                    variable_size,
                                    (*functor)(*accessor),
                                    using_prealloc ? &prealloc_state : nullptr);
+        // Insert into bloom filter, if enabled.
+        if (has_build_side_bloom_filter_) {
+          thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
+                                                  key.getDataSize());
+        }
         if (result != HashTablePutResult::kOK) {
           return result;
         }
@@ -1618,12 +1641,26 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
     }
 
     if (has_build_side_bloom_filter_) {
+      const auto tid = std::this_thread::get_id();
+      std::vector<std::unique_ptr<BloomFilter>> *thread_local_bf_vector;
+      {
+        SpinMutexLock lock(bloom_filter_mutex_);
+        auto bf_it = thread_local_bloom_filters_.find(tid);
+        if (bf_it == thread_local_bloom_filters_.end()) {
+          thread_local_bf_vector = &thread_local_bloom_filters_[tid];
+          for (const auto &build_side_bf : build_bloom_filters_) {
+            thread_local_bf_vector->emplace_back(
+                std::make_unique<BloomFilter>(build_side_bf->getNumberOfHashes(),
+                                              build_side_bf->getBitArraySize()));
+          }
+        } else {
+          thread_local_bf_vector = &bf_it->second;
+        }
+      }
+
       for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) {
-        auto *build_bloom_filter = build_bloom_filters_[i];
-        std::unique_ptr<BloomFilter> thread_local_bloom_filter(
-            new BloomFilter(build_bloom_filter->getNumberOfHashes(),
-                            build_bloom_filter->getBitArraySize()));
         const auto &build_attr = build_attribute_ids_[i];
+        BloomFilter *thread_local_bloom_filter = (*thread_local_bf_vector)[i].get();
         const std::size_t attr_size =
             accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, build_attr).second;
         while (accessor->next()) {
@@ -1631,7 +1668,6 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
               static_cast<const std::uint8_t *>(accessor->getUntypedValue(build_attr)),
               attr_size);
         }
-        build_bloom_filter->bitwiseOr(thread_local_bloom_filter.get());
         accessor->beginIteration();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 74ff5b6..7bbba9a 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -1326,12 +1326,16 @@ bool StorageBlock::rebuildIndexes(bool short_circuit) {
 }
 
 TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate,
-                                                      const TupleIdSequence *sequence) const {
+                                                      const TupleIdSequence *filter) const {
   if (predicate == nullptr) {
-    return tuple_store_->getExistenceMap();
+    TupleIdSequence *sequence = tuple_store_->getExistenceMap();
+    if (filter != nullptr) {
+      sequence->intersectWith(*filter);
+    }
+    return sequence;
   }
 
-  std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor(sequence));
+  std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor());
   std::unique_ptr<TupleIdSequence> existence_map;
   if (!tuple_store_->isPacked()) {
     existence_map.reset(tuple_store_->getExistenceMap());
@@ -1341,7 +1345,7 @@ TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate
                                     indices_consistent_);
   return predicate->getAllMatches(value_accessor.get(),
                                   &sub_blocks_ref,
-                                  nullptr,
+                                  filter,
                                   existence_map.get());
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 5cca51c..4284ea1 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -593,7 +593,7 @@ class StorageBlock : public StorageBlockBase {
   const std::size_t getNumTuples() const;
 
   TupleIdSequence* getMatchesForPredicate(const Predicate *predicate,
-                                          const TupleIdSequence *sequence = nullptr) const;
+                                          const TupleIdSequence *filter = nullptr) const;
 
  private:
   static TupleStorageSubBlock* CreateTupleStorageSubBlock(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/abc9c023/utility/DAG.hpp
----------------------------------------------------------------------
diff --git a/utility/DAG.hpp b/utility/DAG.hpp
index a1f2619..1d1caa1 100644
--- a/utility/DAG.hpp
+++ b/utility/DAG.hpp
@@ -293,8 +293,8 @@ class DAG {
      *                      node at node_index.
      **/
      inline void addDependent(const size_type_nodes node_index, const LinkMetadataT &link_metadata) {
-       DCHECK(dependents_with_metadata_.find(node_index) == dependents_with_metadata_.end());
-       dependents_with_metadata_.emplace(node_index, link_metadata);
+       // DCHECK(dependents_with_metadata_.find(node_index) == dependents_with_metadata_.end());
+       dependents_with_metadata_[node_index] = link_metadata;
      }
 
     /**