You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/04 23:06:49 UTC

incubator-quickstep git commit: Attach bloom filters to select

Repository: incubator-quickstep
Updated Branches:
  refs/heads/LIP-for-tpch 5e22b396c -> f0311c431


Attach bloom filters to select


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f0311c43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f0311c43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f0311c43

Branch: refs/heads/LIP-for-tpch
Commit: f0311c4315aad935487d5a3a8d501544624ff53e
Parents: 5e22b39
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Aug 4 18:05:13 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 4 18:05:13 2016 -0500

----------------------------------------------------------------------
 expressions/Expressions.proto                   |   9 +-
 expressions/predicate/BloomFiltersPredicate.cpp |  68 +++++++++++
 expressions/predicate/BloomFiltersPredicate.hpp |  94 +++++++++++++++
 query_optimizer/ExecutionGenerator.cpp          |  15 +++
 query_optimizer/ExecutionHeuristics.cpp         |  24 ++++
 query_optimizer/ExecutionHeuristics.hpp         |  28 +++++
 query_optimizer/physical/Selection.hpp          |  16 ++-
 query_optimizer/rules/AttachBloomFilters.cpp    | 117 +++++++++++++++---
 query_optimizer/rules/AttachBloomFilters.hpp    |   6 +-
 .../StarSchemaHashJoinOrderOptimization.cpp     |  34 +++---
 .../StarSchemaHashJoinOrderOptimization.hpp     |  14 ++-
 relational_operators/SelectOperator.cpp         |  57 ++++++++-
 relational_operators/SelectOperator.hpp         |  25 ++++
 storage/AggregationOperationState.cpp           |   2 +
 storage/StorageBlock.cpp                        | 119 ++++++++++++++++++-
 storage/StorageBlock.hpp                        |   8 +-
 utility/PlanVisualizer.cpp                      |  22 +++-
 utility/PlanVisualizer.hpp                      |   1 +
 18 files changed, 604 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/expressions/Expressions.proto
----------------------------------------------------------------------
diff --git a/expressions/Expressions.proto b/expressions/Expressions.proto
index ca688b5..29fc978 100644
--- a/expressions/Expressions.proto
+++ b/expressions/Expressions.proto
@@ -28,10 +28,11 @@ message Predicate {
   enum PredicateType {
     TRUE = 0;
     FALSE = 1;
-    COMPARISON = 2;
-    NEGATION = 3;
-    CONJUNCTION = 4;
-    DISJUNCTION = 5;
+    BLOOM_FILTERS = 2;
+    COMPARISON = 3;
+    NEGATION = 4;
+    CONJUNCTION = 5;
+    DISJUNCTION = 6;
   }
 
   required PredicateType predicate_type = 1;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/expressions/predicate/BloomFiltersPredicate.cpp
----------------------------------------------------------------------
diff --git a/expressions/predicate/BloomFiltersPredicate.cpp b/expressions/predicate/BloomFiltersPredicate.cpp
new file mode 100644
index 0000000..17ff796
--- /dev/null
+++ b/expressions/predicate/BloomFiltersPredicate.cpp
@@ -0,0 +1,68 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "expressions/predicate/BloomFiltersPredicate.hpp"
+
+#include "expressions/Expressions.pb.h"
+#include "expressions/predicate/Predicate.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ValueAccessor;
+
+serialization::Predicate BloomFiltersPredicate::getProto() const {
+  serialization::Predicate proto;
+  proto.set_predicate_type(serialization::Predicate::BLOOM_FILTERS);
+  return proto;
+}
+
+Predicate* BloomFiltersPredicate::clone() const {
+  LOG(FATAL) << "Not implemented\n";
+  return nullptr;
+}
+
+bool BloomFiltersPredicate::matchesForSingleTuple(const ValueAccessor &accessor,
+                                                  const tuple_id tuple) const {
+  LOG(FATAL) << "Not implemented\n";
+  return false;
+}
+
+bool BloomFiltersPredicate::matchesForJoinedTuples(
+    const ValueAccessor &left_accessor,
+    const relation_id left_relation_id,
+    const tuple_id left_tuple_id,
+    const ValueAccessor &right_accessor,
+    const relation_id right_relation_id,
+    const tuple_id right_tuple_id) const {
+  LOG(FATAL) << "Not implemented\n";
+  return false;
+}
+
+TupleIdSequence* BloomFiltersPredicate::getAllMatches(
+    ValueAccessor *accessor,
+    const SubBlocksReference *sub_blocks_ref,
+    const TupleIdSequence *filter,
+    const TupleIdSequence *existence_map) const {
+  LOG(FATAL) << "Not implemented\n";
+  return nullptr;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/expressions/predicate/BloomFiltersPredicate.hpp
----------------------------------------------------------------------
diff --git a/expressions/predicate/BloomFiltersPredicate.hpp b/expressions/predicate/BloomFiltersPredicate.hpp
new file mode 100644
index 0000000..3c3acf4
--- /dev/null
+++ b/expressions/predicate/BloomFiltersPredicate.hpp
@@ -0,0 +1,94 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_PREDICATE_BLOOM_FILTERS_PREDICATE_HPP_
+#define QUICKSTEP_EXPRESSIONS_PREDICATE_BLOOM_FILTERS_PREDICATE_HPP_
+
+#include <memory>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/Expressions.pb.h"
+#include "expressions/predicate/Predicate.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/BloomFilter.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class TupleIdSequence;
+class ValueAccessor;
+
+struct SubBlocksReference;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+class BloomFiltersPredicate : public Predicate {
+ public:
+  BloomFiltersPredicate() {
+  }
+
+  ~BloomFiltersPredicate() override {
+  }
+
+  serialization::Predicate getProto() const override;
+
+  Predicate* clone() const override;
+
+  PredicateType getPredicateType() const override {
+    return kBloomFilters;
+  }
+
+  bool matchesForSingleTuple(const ValueAccessor &accessor,
+                             const tuple_id tuple) const override;
+
+  bool matchesForJoinedTuples(
+      const ValueAccessor &left_accessor,
+      const relation_id left_relation_id,
+      const tuple_id left_tuple_id,
+      const ValueAccessor &right_accessor,
+      const relation_id right_relation_id,
+      const tuple_id right_tuple_id) const override;
+
+  TupleIdSequence* getAllMatches(ValueAccessor *accessor,
+                                 const SubBlocksReference *sub_blocks_ref,
+                                 const TupleIdSequence *filter,
+                                 const TupleIdSequence *existence_map) const override;
+
+  void addBloomFilter(const BloomFilter *bloom_filter) {
+    bloom_filters_.emplace_back(bloom_filter);
+  }
+
+  void addAttributeId(const attribute_id probe_attribute_id) {
+    bloom_filter_attribute_ids_.push_back(probe_attribute_id);
+  }
+
+ private:
+  std::vector<const BloomFilter *> bloom_filters_;
+  std::vector<attribute_id> bloom_filter_attribute_ids_;
+
+  friend class PredicateTest;
+
+  DISALLOW_COPY_AND_ASSIGN(BloomFiltersPredicate);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_PREDICATE_BLOOM_FILTERS_PREDICATE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index d589f58..04e9734 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -507,6 +507,16 @@ void ExecutionGenerator::convertSelection(
     }
   }
 
+  const P::BloomFilterConfig &bloom_filter_config =
+      physical_selection->bloom_filter_config();
+  std::vector<attribute_id> bloom_filter_attribute_ids;
+
+  for (const auto &bf : bloom_filter_config.probe_side_bloom_filters) {
+    const CatalogAttribute *bf_catalog_attribute
+        = attribute_substitution_map_[bf.attribute->id()];
+    bloom_filter_attribute_ids.emplace_back(bf_catalog_attribute->getID());
+  }
+
   // Convert the project expressions proto.
   const QueryContext::scalar_group_id project_expressions_group_index =
       query_context_proto_->scalar_groups_size();
@@ -572,6 +582,11 @@ void ExecutionGenerator::convertSelection(
       std::forward_as_tuple(select_index,
                             output_relation));
   temporary_relation_info_vec_.emplace_back(select_index, output_relation);
+
+  execution_heuristics_->addSelectInfo(select_index,
+                                       bloom_filter_config,
+                                       std::move(bloom_filter_attribute_ids),
+                                       op);
 }
 
 void ExecutionGenerator::convertSharedSubplanReference(const physical::SharedSubplanReferencePtr &physical_plan) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/query_optimizer/ExecutionHeuristics.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.cpp b/query_optimizer/ExecutionHeuristics.cpp
index 7d12745..ce9127f 100644
--- a/query_optimizer/ExecutionHeuristics.cpp
+++ b/query_optimizer/ExecutionHeuristics.cpp
@@ -125,6 +125,30 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
                                       true /* is_pipeline_breaker */);
     }
   }
+
+  for (const auto &info : selects_) {
+    const auto &bloom_filter_config = info.bloom_filter_config_;
+
+    for (std::size_t i = 0; i < info.bloom_filter_ids_.size(); ++i) {
+      const auto &bf =
+          bloom_filter_config.probe_side_bloom_filters[i];
+      std::cerr << "Select probe " << bf.attribute->toString()
+                << " @" << bf.builder << "\n";
+
+      const auto &build_side_info =
+           bloom_filter_map.at(
+               std::make_pair(bf.source_attribute->id(),
+                              bf.builder));
+      info.select_operator_->addBloomFilter(
+          build_side_info.first, info.bloom_filter_ids_[i]);
+//      std::cerr << "Select probe attr_id = "
+//                << info.bloom_filter_ids_[i] << "\n";
+
+      query_plan->addDirectDependency(info.select_operator_index_,
+                                      build_side_info.second,
+                                      true /* is_pipeline_breaker */);
+    }
+  }
 }
 
 void ExecutionHeuristics::setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/query_optimizer/ExecutionHeuristics.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.hpp b/query_optimizer/ExecutionHeuristics.hpp
index 0755124..ed0eeda 100644
--- a/query_optimizer/ExecutionHeuristics.hpp
+++ b/query_optimizer/ExecutionHeuristics.hpp
@@ -26,6 +26,7 @@
 #include "query_execution/QueryContext.pb.h"
 #include "query_optimizer/QueryPlan.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
+#include "relational_operators/SelectOperator.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -110,6 +111,22 @@ class ExecutionHeuristics {
     const QueryContext::aggregation_state_id aggregate_state_id_;
   };
 
+  struct SelectInfo {
+    SelectInfo(const QueryPlan::DAGNodeIndex select_operator_index,
+               const physical::BloomFilterConfig &bloom_filter_config,
+               std::vector<attribute_id> &&bloom_filter_ids,
+               SelectOperator *select_operator)
+        : select_operator_index_(select_operator_index),
+          bloom_filter_config_(bloom_filter_config),
+          bloom_filter_ids_(bloom_filter_ids),
+          select_operator_(select_operator) {
+    }
+
+    const QueryPlan::DAGNodeIndex select_operator_index_;
+    const physical::BloomFilterConfig &bloom_filter_config_;
+    const std::vector<attribute_id> bloom_filter_ids_;
+    SelectOperator *select_operator_;
+  };
 
   /**
    * @brief Constructor.
@@ -159,6 +176,16 @@ class ExecutionHeuristics {
                              aggregate_state_id);
   }
 
+  inline void addSelectInfo(const QueryPlan::DAGNodeIndex select_operator_index,
+                            const physical::BloomFilterConfig &bloom_filter_config,
+                            std::vector<attribute_id> &&bloom_filter_ids,
+                            SelectOperator *select_operator) {
+    selects_.emplace_back(select_operator_index,
+                          bloom_filter_config,
+                          std::move(bloom_filter_ids),
+                          select_operator);
+  }
+
   /**
    * @brief Optimize the execution plan based on heuristics generated
    *        during physical plan to execution plan conversion.
@@ -182,6 +209,7 @@ class ExecutionHeuristics {
  private:
   std::vector<HashJoinInfo> hash_joins_;
   std::vector<AggregateInfo> aggregates_;
+  std::vector<SelectInfo> selects_;
 
   DISALLOW_COPY_AND_ASSIGN(ExecutionHeuristics);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/query_optimizer/physical/Selection.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Selection.hpp b/query_optimizer/physical/Selection.hpp
index 68cae65..767c0b3 100644
--- a/query_optimizer/physical/Selection.hpp
+++ b/query_optimizer/physical/Selection.hpp
@@ -87,6 +87,10 @@ class Selection : public Physical {
   bool impliesUniqueAttributes(
       const std::vector<expressions::AttributeReferencePtr> &attributes) const override;
 
+  const BloomFilterConfig &bloom_filter_config() const {
+    return bloom_filter_config_;
+  }
+
   /**
    * @brief Creates a Selection.
    *
@@ -98,9 +102,10 @@ class Selection : public Physical {
   static SelectionPtr Create(
       const PhysicalPtr &input,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
-      const expressions::PredicatePtr &filter_predicate) {
+      const expressions::PredicatePtr &filter_predicate,
+      const BloomFilterConfig bloom_filter_config = BloomFilterConfig()) {
     return SelectionPtr(
-        new Selection(input, project_expressions, filter_predicate));
+        new Selection(input, project_expressions, filter_predicate, bloom_filter_config));
   }
 
   /**
@@ -141,15 +146,18 @@ class Selection : public Physical {
   Selection(
       const PhysicalPtr &input,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
-      const expressions::PredicatePtr &filter_predicate)
+      const expressions::PredicatePtr &filter_predicate,
+      const BloomFilterConfig &bloom_filter_config)
       : project_expressions_(project_expressions),
-        filter_predicate_(filter_predicate) {
+        filter_predicate_(filter_predicate),
+        bloom_filter_config_(bloom_filter_config) {
     addChild(input);
   }
 
   std::vector<expressions::NamedExpressionPtr> project_expressions_;
   // Can be NULL. If NULL, the filter predicate is treated as the literal true.
   expressions::PredicatePtr filter_predicate_;
+  BloomFilterConfig bloom_filter_config_;
 
   DISALLOW_COPY_AND_ASSIGN(Selection);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/query_optimizer/rules/AttachBloomFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp
index 03a42a0..10ed512 100644
--- a/query_optimizer/rules/AttachBloomFilters.cpp
+++ b/query_optimizer/rules/AttachBloomFilters.cpp
@@ -17,6 +17,8 @@
 
 #include "query_optimizer/rules/AttachBloomFilters.hpp"
 
+#include <algorithm>
+#include <iterator>
 #include <memory>
 #include <set>
 #include <unordered_set>
@@ -34,8 +36,6 @@
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
 
-#include "glog/logging.h"
-
 namespace quickstep {
 namespace optimizer {
 
@@ -70,7 +70,10 @@ P::PhysicalPtr AttachBloomFilters::apply(const P::PhysicalPtr &input) {
 //    std::cerr << "********\n";
 //  }
 
-  return visitAndAttach(input);
+  std::set<E::ExprId> used_bloom_filters;
+  decideAttach(input, &used_bloom_filters);
+
+  return performAttach(input);
 }
 
 void AttachBloomFilters::visitProducer(const P::PhysicalPtr &node, const int depth) {
@@ -140,6 +143,52 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
   // Bloom filters from parent
   const auto &parent_bloom_filters = consumers_[node];
   if (!parent_bloom_filters.empty()) {
+//    if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+//      const P::HashJoinPtr hash_join =
+//          std::static_pointer_cast<const P::HashJoin>(node);
+//      const std::vector<const std::vector<E::AttributeReferencePtr>*> join_attributes =
+//          { &hash_join->left_join_attributes(), &hash_join->right_join_attributes() };
+//
+//      for (std::size_t i = 0; i < 2; ++i) {
+//        const auto child = hash_join->children()[i];
+//        std::unordered_set<E::ExprId> child_output_attribute_ids;
+//        for (const auto &attr : child->getOutputAttributes()) {
+//          child_output_attribute_ids.emplace(attr->id());
+//        }
+//
+//        std::unordered_map<E::ExprId, E::AttributeReferencePtr> join_attribute_map;
+//        for (std::size_t k = 0; k < hash_join->left_join_attributes().size(); ++k) {
+//          join_attribute_map.emplace(
+//              join_attributes[1-i]->at(k)->id(),
+//              join_attributes[i]->at(k));
+//        }
+//
+//        std::vector<BloomFilterInfo> bloom_filters;
+//        for (const auto &info : parent_bloom_filters) {
+//          if (child_output_attribute_ids.find(info.attribute->id())
+//                  != child_output_attribute_ids.end()) {
+//            bloom_filters.emplace_back(info.source,
+//                                       info.attribute,
+//                                       info.depth,
+//                                       info.selectivity,
+//                                       false,
+//                                       info.source_attribute);
+//          } else {
+//            auto attr_it = join_attribute_map.find(info.attribute->id());
+//            if (attr_it != join_attribute_map.end()) {
+//              bloom_filters.emplace_back(info.source,
+//                                         attr_it->second,
+//                                         info.depth,
+//                                         info.selectivity,
+//                                         false,
+//                                         info.source_attribute);
+//
+//            }
+//          }
+//        }
+//        consumers_.emplace(child, std::move(bloom_filters));
+//      }
+//    }
     for (const auto &child : node->children()) {
       std::unordered_set<E::ExprId> child_output_attribute_ids;
       for (const auto &attr : child->getOutputAttributes()) {
@@ -195,6 +244,21 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
     }
   }
 
+  for (const auto &child : node->children()) {
+    visitConsumer(child);
+  }
+}
+
+void AttachBloomFilters::decideAttach(
+    const P::PhysicalPtr &node,
+    std::set<E::ExprId> *used_bloom_filters) {
+  for (const auto &child : node->children()) {
+    std::set<E::ExprId> child_bloom_filters;
+    decideAttach(child, &child_bloom_filters);
+    used_bloom_filters->insert(child_bloom_filters.begin(),
+                               child_bloom_filters.end());
+  }
+
   P::PhysicalPtr consumer_child = nullptr;
   if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
     consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
@@ -202,6 +266,9 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
   if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
     consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
   }
+  if (node->getPhysicalType() == P::PhysicalType::kSelection) {
+    consumer_child = std::static_pointer_cast<const P::Selection>(node)->input();
+  }
 
   if (consumer_child != nullptr) {
     // Decide attaches
@@ -222,27 +289,27 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
 
       auto &probe_attaches = getBloomFilterConfig(node);
       for (const auto &pair : filters) {
-        auto &build_attaches = getBloomFilterConfig(pair.second->source);
-        build_attaches.addBuildSideBloomFilter(
-            pair.second->source_attribute);
-        probe_attaches.addProbeSideBloomFilter(
-            pair.first,
-            pair.second->source_attribute,
-            pair.second->source);
+        const E::ExprId source_attr_id = pair.second->source_attribute->id();
+        if (used_bloom_filters->find(source_attr_id) == used_bloom_filters->end()) {
+          auto &build_attaches = getBloomFilterConfig(pair.second->source);
+          build_attaches.addBuildSideBloomFilter(
+              pair.second->source_attribute);
+          probe_attaches.addProbeSideBloomFilter(
+              pair.first,
+              pair.second->source_attribute,
+              pair.second->source);
+          used_bloom_filters->emplace(source_attr_id);
+        }
       }
     }
   }
-
-  for (const auto &child : node->children()) {
-    visitConsumer(child);
-  }
 }
 
-P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &node) {
+P::PhysicalPtr AttachBloomFilters::performAttach(const physical::PhysicalPtr &node) {
   std::vector<P::PhysicalPtr> new_children;
   bool has_changed = false;
   for (const auto &child : node->children()) {
-    P::PhysicalPtr new_child = visitAndAttach(child);
+    P::PhysicalPtr new_child = performAttach(child);
     if (new_child != child) {
       has_changed = true;
     }
@@ -290,6 +357,24 @@ P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &n
     }
   }
 
+  if (node->getPhysicalType() == P::PhysicalType::kSelection) {
+    const auto attach_it = attaches_.find(node);
+    if (attach_it != attaches_.end()) {
+//      for (const auto& item : attach_it->second.probe_side_bloom_filters) {
+//        std::cout << "Attach probe from " << item.builder
+//                  << " to " << node << "\n";
+//      }
+
+      const P::SelectionPtr selection =
+          std::static_pointer_cast<const P::Selection>(node);
+      return P::Selection::Create(
+          selection->input(),
+          selection->project_expressions(),
+          selection->filter_predicate(),
+          attach_it->second);
+    }
+  }
+
   if (has_changed) {
     return node->copyWithNewChildren(new_children);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/query_optimizer/rules/AttachBloomFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.hpp b/query_optimizer/rules/AttachBloomFilters.hpp
index e4437f7..5131bdd 100644
--- a/query_optimizer/rules/AttachBloomFilters.hpp
+++ b/query_optimizer/rules/AttachBloomFilters.hpp
@@ -21,6 +21,7 @@
 #include <algorithm>
 #include <cstddef>
 #include <memory>
+#include <set>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -97,7 +98,10 @@ class AttachBloomFilters : public Rule<physical::Physical> {
 
   void visitConsumer(const physical::PhysicalPtr &node);
 
-  physical::PhysicalPtr visitAndAttach(const physical::PhysicalPtr &node);
+  void decideAttach(const physical::PhysicalPtr &node,
+                    std::set<expressions::ExprId> *used_bloom_filters);
+
+  physical::PhysicalPtr performAttach(const physical::PhysicalPtr &node);
 
   physical::BloomFilterConfig &getBloomFilterConfig(const physical::PhysicalPtr &node);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
index cfbb5d1..b7e7910 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
@@ -219,11 +219,14 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
                   attribute_id_to_reference_map.at(build_it->second));
             }
           }
-          if (!build_attrs.empty()
-              && build_table_info->table->impliesUniqueAttributes(build_attrs)) {
-            std::unique_ptr<JoinPair> new_join(
-                new JoinPair(probe_table_info, build_table_info));
-            if (best_join == nullptr || new_join->isBetterThan(*best_join)) {
+
+          const bool build_side_unique =
+              !build_attrs.empty() && build_table_info->table->impliesUniqueAttributes(build_attrs);
+          std::unique_ptr<JoinPair> new_join(
+              new JoinPair(probe_table_info,
+                           build_table_info,
+                           build_side_unique));
+          if (best_join == nullptr || new_join->isBetterThan(*best_join)) {
 //              if (best_join != nullptr) {
 //                std::cerr << "(" << best_join->probe->estimated_selectivity
 //                          << ", " << best_join->probe->estimated_cardinality << ")"
@@ -239,25 +242,24 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
 //                        << "(" << new_join->build->estimated_selectivity
 //                        << ", " << new_join->build->estimated_cardinality << ")"
 //                        << "\n****\n";
-              best_join.reset(new_join.release());
-            }
+            best_join.reset(new_join.release());
           }
         }
       }
     }
 
-    TableInfo *selected_probe_table_info = nullptr;
-    TableInfo *selected_build_table_info = nullptr;
+    CHECK(best_join != nullptr);
 
-    if (best_join != nullptr) {
-      selected_probe_table_info = best_join->probe;
-      selected_build_table_info = best_join->build;
+    TableInfo *selected_probe_table_info = best_join->probe;
+    TableInfo *selected_build_table_info = best_join->build;
+    std::cerr << "card: " << selected_probe_table_info->estimated_cardinality << "\n";
+    std::cerr << "card: " << selected_build_table_info->estimated_cardinality << "\n";
+    std::cerr << "--------\n";
+    if (!best_join->build_side_unique &&
+        selected_probe_table_info->estimated_cardinality < selected_build_table_info->estimated_cardinality) {
+      std::swap(selected_probe_table_info, selected_build_table_info);
     }
 
-    // TODO(jianqiao): Handle the case when there is no primary key-foreign key information available.
-    CHECK(selected_probe_table_info != nullptr);
-    CHECK(selected_build_table_info != nullptr);
-
 //    std::cerr << selected_probe_table_info->estimated_selectivity
 //              << " -- "
 //              << selected_build_table_info->estimated_selectivity

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
index 33d95a5..6be57cc 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
@@ -94,12 +94,21 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
   };
 
   struct JoinPair {
-    JoinPair(TableInfo *probe_in, TableInfo *build_in)
-        : probe(probe_in), build(build_in) {
+    JoinPair(TableInfo *probe_in,
+             TableInfo *build_in,
+             const bool build_side_unique_in)
+        : probe(probe_in),
+          build(build_in),
+          build_side_unique(build_side_unique_in) {
     }
 
     inline bool isBetterThan(const JoinPair &rhs) const {
       const auto &lhs = *this;
+
+      if (lhs.build_side_unique != rhs.build_side_unique) {
+        return lhs.build_side_unique;
+      }
+
       const bool lhs_has_large_output =
           lhs.build->estimated_num_output_attributes
               + lhs.probe->estimated_num_output_attributes > 5;
@@ -149,6 +158,7 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
 
     TableInfo *probe;
     TableInfo *build;
+    const bool build_side_unique;
   };
 
   physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index eb6277e..5b55cfe 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -40,6 +40,8 @@ class Predicate;
 void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
                                    StorageManager *storage_manager,
                                    const Predicate *predicate,
+                                   const std::vector<const BloomFilter *> &bloom_filters,
+                                   const std::vector<attribute_id> &bloom_filter_attribute_ids,
                                    const std::vector<std::unique_ptr<const Scalar>> *selection,
                                    InsertDestination *output_destination) {
   if (input_relation_is_stored_) {
@@ -48,6 +50,8 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
                                                         input_relation_,
                                                         input_block_id,
                                                         predicate,
+                                                        bloom_filters,
+                                                        bloom_filter_attribute_ids,
                                                         simple_projection_,
                                                         simple_selection_,
                                                         selection,
@@ -63,6 +67,8 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
               input_relation_,
               input_relation_block_ids_[num_workorders_generated_],
               predicate,
+              bloom_filters,
+              bloom_filter_attribute_ids,
               simple_projection_,
               simple_selection_,
               selection,
@@ -78,6 +84,8 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
 void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
                                                  StorageManager *storage_manager,
                                                  const Predicate *predicate,
+                                                 const std::vector<const BloomFilter *> &bloom_filters,
+                                                 const std::vector<attribute_id> &bloom_filter_attribute_ids,
                                                  const std::vector<std::unique_ptr<const Scalar>> *selection,
                                                  InsertDestination *output_destination) {
   DCHECK(placement_scheme_ != nullptr);
@@ -92,6 +100,8 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
                 input_relation_,
                 input_block_id,
                 predicate,
+                bloom_filters,
+                bloom_filter_attribute_ids,
                 simple_projection_,
                 simple_selection_,
                 selection,
@@ -113,6 +123,8 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
                 input_relation_,
                 block_in_partition,
                 predicate,
+                bloom_filters,
+                bloom_filter_attribute_ids,
                 simple_projection_,
                 simple_selection_,
                 selection,
@@ -135,6 +147,15 @@ bool SelectOperator::getAllWorkOrders(
     tmb::MessageBus *bus) {
   DCHECK(query_context != nullptr);
 
+  if (bloom_filters_ == nullptr) {
+    bloom_filters_.reset(new std::vector<const BloomFilter*>());
+    for (const auto bloom_filter_id : bloom_filter_ids_) {
+      // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
+      bloom_filters_->emplace_back(
+          query_context->getBloomFilter(bloom_filter_id));
+    }
+  }
+
   const Predicate *predicate =
       query_context->getPredicate(predicate_index_);
   const std::vector<std::unique_ptr<const Scalar>> *selection =
@@ -149,11 +170,23 @@ bool SelectOperator::getAllWorkOrders(
       if (input_relation_.hasPartitionScheme()) {
 #ifdef QUICKSTEP_HAVE_LIBNUMA
         if (input_relation_.hasNUMAPlacementScheme()) {
-          addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+          addPartitionAwareWorkOrders(container,
+                                      storage_manager,
+                                      predicate,
+                                      *bloom_filters_,
+                                      bloom_filter_attribute_ids_,
+                                      selection,
+                                      output_destination);
         }
 #endif
       } else {
-        addWorkOrders(container, storage_manager, predicate, selection, output_destination);
+        addWorkOrders(container,
+                      storage_manager,
+                      predicate,
+                      *bloom_filters_,
+                      bloom_filter_attribute_ids_,
+                      selection,
+                      output_destination);
       }
       started_ = true;
     }
@@ -162,11 +195,23 @@ bool SelectOperator::getAllWorkOrders(
     if (input_relation_.hasPartitionScheme()) {
 #ifdef QUICKSTEP_HAVE_LIBNUMA
         if (input_relation_.hasNUMAPlacementScheme()) {
-          addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+          addPartitionAwareWorkOrders(container,
+                                      storage_manager,
+                                      predicate,
+                                      *bloom_filters_,
+                                      bloom_filter_attribute_ids_,
+                                      selection,
+                                      output_destination);
         }
 #endif
     } else {
-        addWorkOrders(container, storage_manager, predicate, selection, output_destination);
+        addWorkOrders(container,
+                      storage_manager,
+                      predicate,
+                      *bloom_filters_,
+                      bloom_filter_attribute_ids_,
+                      selection,
+                      output_destination);
     }
     return done_feeding_input_relation_;
   }
@@ -220,10 +265,14 @@ void SelectWorkOrder::execute() {
   if (simple_projection_) {
     block->selectSimple(simple_selection_,
                         predicate_,
+                        bloom_filters_,
+                        bloom_filter_attribute_ids_,
                         output_destination_);
   } else {
     block->select(*DCHECK_NOTNULL(selection_),
                   predicate_,
+                  bloom_filters_,
+                  bloom_filter_attribute_ids_,
                   output_destination_);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index 764dfa3..1528b10 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -35,6 +35,7 @@
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/BloomFilterAdapter.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -45,6 +46,7 @@ namespace tmb { class MessageBus; }
 
 namespace quickstep {
 
+class BloomFitler;
 class CatalogRelationSchema;
 class InsertDestination;
 class Predicate;
@@ -244,15 +246,25 @@ class SelectOperator : public RelationalOperator {
     return output_relation_.getID();
   }
 
+  void addBloomFilter(const QueryContext::bloom_filter_id bloom_filter_id,
+                      const attribute_id bloom_filter_attribute_id) {
+    bloom_filter_ids_.emplace_back(bloom_filter_id);
+    bloom_filter_attribute_ids_.emplace_back(bloom_filter_attribute_id);
+  }
+
   void addWorkOrders(WorkOrdersContainer *container,
                      StorageManager *storage_manager,
                      const Predicate *predicate,
+                     const std::vector<const BloomFilter *> &bloom_filters,
+                     const std::vector<attribute_id> &bloom_filter_attribute_ids,
                      const std::vector<std::unique_ptr<const Scalar>> *selection,
                      InsertDestination *output_destination);
 
   void addPartitionAwareWorkOrders(WorkOrdersContainer *container,
                                    StorageManager *storage_manager,
                                    const Predicate *predicate,
+                                   const std::vector<const BloomFilter *> &bloom_filters,
+                                   const std::vector<attribute_id> &bloom_filter_attribute_ids,
                                    const std::vector<std::unique_ptr<const Scalar>> *selection,
                                    InsertDestination *output_destination);
 
@@ -268,6 +280,9 @@ class SelectOperator : public RelationalOperator {
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
   const QueryContext::predicate_id predicate_index_;
+  std::vector<QueryContext::bloom_filter_id> bloom_filter_ids_;
+  std::vector<attribute_id> bloom_filter_attribute_ids_;
+  std::unique_ptr<std::vector<const BloomFilter*>> bloom_filters_;
 
   const QueryContext::scalar_group_id selection_index_;
   const std::vector<attribute_id> simple_selection_;
@@ -321,6 +336,8 @@ class SelectWorkOrder : public WorkOrder {
                   const CatalogRelationSchema &input_relation,
                   const block_id input_block_id,
                   const Predicate *predicate,
+                  const std::vector<const BloomFilter *> &bloom_filters,
+                  const std::vector<attribute_id> &bloom_filter_attribute_ids,
                   const bool simple_projection,
                   const std::vector<attribute_id> &simple_selection,
                   const std::vector<std::unique_ptr<const Scalar>> *selection,
@@ -331,6 +348,8 @@ class SelectWorkOrder : public WorkOrder {
         input_relation_(input_relation),
         input_block_id_(input_block_id),
         predicate_(predicate),
+        bloom_filters_(bloom_filters),
+        bloom_filter_attribute_ids_(bloom_filter_attribute_ids),
         simple_projection_(simple_projection),
         simple_selection_(simple_selection),
         selection_(selection),
@@ -363,6 +382,8 @@ class SelectWorkOrder : public WorkOrder {
                   const CatalogRelationSchema &input_relation,
                   const block_id input_block_id,
                   const Predicate *predicate,
+                  const std::vector<const BloomFilter *> &bloom_filters,
+                  const std::vector<attribute_id> &bloom_filter_attribute_ids,
                   const bool simple_projection,
                   std::vector<attribute_id> &&simple_selection,
                   const std::vector<std::unique_ptr<const Scalar>> *selection,
@@ -373,6 +394,8 @@ class SelectWorkOrder : public WorkOrder {
         input_relation_(input_relation),
         input_block_id_(input_block_id),
         predicate_(predicate),
+        bloom_filters_(bloom_filters),
+        bloom_filter_attribute_ids_(bloom_filter_attribute_ids),
         simple_projection_(simple_projection),
         simple_selection_(std::move(simple_selection)),
         selection_(selection),
@@ -397,6 +420,8 @@ class SelectWorkOrder : public WorkOrder {
   const CatalogRelationSchema &input_relation_;
   const block_id input_block_id_;
   const Predicate *predicate_;
+  const std::vector<const BloomFilter *> &bloom_filters_;
+  const std::vector<attribute_id> &bloom_filter_attribute_ids_;
 
   const bool simple_projection_;
   const std::vector<attribute_id> simple_selection_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 668164c..079d0ea 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -54,6 +54,8 @@
 #include "types/containers/Tuple.hpp"
 #include "utility/BloomFilterAdapter.hpp"
 
+#include "gflags/gflags.h"
+
 #include "glog/logging.h"
 
 using std::unique_ptr;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 78aba7c..75cde68 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -59,8 +59,12 @@
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/containers/Tuple.hpp"
 #include "types/operations/comparisons/ComparisonUtil.hpp"
+#include "utility/BloomFilter.hpp"
+#include "utility/BloomFilterAdapter.hpp"
 #include "utility/Macros.hpp"
 
+#include "gflags/gflags.h"
+
 #include "glog/logging.h"
 
 #ifdef QUICKSTEP_HAVE_BITWEAVING
@@ -78,6 +82,8 @@ using std::vector;
 
 namespace quickstep {
 
+DECLARE_int64(bloom_adapter_batch_size);
+
 class Type;
 
 StorageBlock::StorageBlock(const CatalogRelationSchema &relation,
@@ -341,6 +347,8 @@ void StorageBlock::sample(const bool is_block_sample,
 
 void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
                           const Predicate *predicate,
+                          const std::vector<const BloomFilter *> &bloom_filters,
+                          const std::vector<attribute_id> &bloom_filter_attribute_ids,
                           InsertDestinationInterface *destination) const {
   ColumnVectorsValueAccessor temp_result;
   {
@@ -350,10 +358,58 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
 
     std::unique_ptr<TupleIdSequence> matches;
     std::unique_ptr<ValueAccessor> accessor;
-    if (predicate == nullptr) {
+
+    if (bloom_filters.size() > 0) {
+      const std::size_t num_tuples = tuple_store_->numTuples();
+      matches.reset(new TupleIdSequence(num_tuples));
+//      std::cerr << "Before: " << num_tuples << "\n";
       accessor.reset(tuple_store_->createValueAccessor());
+      InvokeOnAnyValueAccessor(
+          accessor.get(),
+          [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+        std::vector<std::size_t> attr_size_vector;
+        attr_size_vector.reserve(bloom_filter_attribute_ids.size());
+        for (const auto &attr : bloom_filter_attribute_ids) {
+          auto val_and_size =
+              accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, attr);
+          attr_size_vector.emplace_back(val_and_size.second);
+        }
+
+        std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter;
+        bloom_filter_adapter.reset(new BloomFilterAdapter(
+            bloom_filters, bloom_filter_attribute_ids, attr_size_vector));
+
+        std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size;
+        std::uint32_t num_tuples_left = accessor->getNumTuples();
+        std::vector<tuple_id> batch(num_tuples_left);
+
+        do {
+          std::uint32_t batch_size =
+              batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
+          for (std::size_t i = 0; i < batch_size; ++i) {
+            accessor->next();
+            batch.push_back(accessor->getCurrentPosition());
+          }
+
+          std::size_t num_hits =
+              bloom_filter_adapter->bulkProbe<true>(accessor, batch, batch_size);
+          for (std::size_t t = 0; t < num_hits; ++t){
+            matches->set(batch[t], true);
+          }
+
+          batch.clear();
+          num_tuples_left -= batch_size;
+          batch_size_try = batch_size * 2;
+        } while (num_tuples_left > 0);
+      });
+//      std::cerr << "After: " << matches->numTuples() << "\n";
+    }
+
+    if (predicate == nullptr) {
+      accessor.reset(tuple_store_->createValueAccessor(matches.get()));
     } else {
-      matches.reset(getMatchesForPredicate(predicate));
+      auto *new_matches = getMatchesForPredicate(predicate, matches.get());
+      matches.reset(new_matches);
       accessor.reset(tuple_store_->createValueAccessor(matches.get()));
     }
 
@@ -371,13 +427,63 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
 
 void StorageBlock::selectSimple(const std::vector<attribute_id> &selection,
                                 const Predicate *predicate,
+                                const std::vector<const BloomFilter *> &bloom_filters,
+                                const std::vector<attribute_id> &bloom_filter_attribute_ids,
                                 InsertDestinationInterface *destination) const {
   std::unique_ptr<ValueAccessor> accessor;
   std::unique_ptr<TupleIdSequence> matches;
-  if (predicate == nullptr) {
+
+  if (bloom_filters.size() > 0) {
+    const std::size_t num_tuples = tuple_store_->numTuples();
+    matches.reset(new TupleIdSequence(num_tuples));
+//    std::cerr << "Before: " << num_tuples << "\n";
     accessor.reset(tuple_store_->createValueAccessor());
+    InvokeOnAnyValueAccessor(
+        accessor.get(),
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      std::vector<std::size_t> attr_size_vector;
+      attr_size_vector.reserve(bloom_filter_attribute_ids.size());
+      for (const auto &attr : bloom_filter_attribute_ids) {
+        auto val_and_size =
+            accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, attr);
+        attr_size_vector.emplace_back(val_and_size.second);
+      }
+
+      std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter;
+      bloom_filter_adapter.reset(new BloomFilterAdapter(
+          bloom_filters, bloom_filter_attribute_ids, attr_size_vector));
+
+      std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size;
+      std::uint32_t num_tuples_left = accessor->getNumTuples();
+      std::vector<tuple_id> batch(num_tuples_left);
+
+      do {
+        std::uint32_t batch_size =
+            batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
+        for (std::size_t i = 0; i < batch_size; ++i) {
+          accessor->next();
+          batch.push_back(accessor->getCurrentPosition());
+        }
+
+        std::size_t num_hits =
+            bloom_filter_adapter->bulkProbe<true>(accessor, batch, batch_size);
+        for (std::size_t t = 0; t < num_hits; ++t){
+          matches->set(batch[t], true);
+        }
+
+        batch.clear();
+        num_tuples_left -= batch_size;
+        batch_size_try = batch_size * 2;
+      } while (num_tuples_left > 0);
+    });
+//    std::cerr << "After: " << matches->numTuples() << "\n";
+  }
+
+  if (predicate == nullptr) {
+    accessor.reset(tuple_store_->createValueAccessor(matches.get()));
   } else {
-    matches.reset(getMatchesForPredicate(predicate));
+    auto *new_matches = getMatchesForPredicate(predicate, matches.get());
+    matches.reset(new_matches);
     accessor.reset(tuple_store_->createValueAccessor(matches.get()));
   }
 
@@ -1219,12 +1325,13 @@ bool StorageBlock::rebuildIndexes(bool short_circuit) {
   return all_indices_consistent_;
 }
 
-TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate) const {
+TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate,
+                                                      const TupleIdSequence *sequence) const {
   if (predicate == nullptr) {
     return tuple_store_->getExistenceMap();
   }
 
-  std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor());
+  std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor(sequence));
   std::unique_ptr<TupleIdSequence> existence_map;
   if (!tuple_store_->isPacked()) {
     existence_map.reset(tuple_store_->getExistenceMap());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 3217fa2..4948828 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -32,6 +32,7 @@
 #include "storage/StorageBlockLayout.pb.h"
 #include "storage/TupleIdSequence.hpp"
 #include "storage/TupleStorageSubBlock.hpp"
+#include "utility/BloomFilter.hpp"
 #include "utility/Macros.hpp"
 #include "utility/PtrVector.hpp"
 
@@ -347,6 +348,8 @@ class StorageBlock : public StorageBlockBase {
    **/
   void select(const std::vector<std::unique_ptr<const Scalar>> &selection,
               const Predicate *predicate,
+              const std::vector<const BloomFilter *> &bloom_filters,
+              const std::vector<attribute_id> &bloom_filter_attribute_ids,
               InsertDestinationInterface *destination) const;
 
   /**
@@ -370,6 +373,8 @@ class StorageBlock : public StorageBlockBase {
    **/
   void selectSimple(const std::vector<attribute_id> &selection,
                     const Predicate *predicate,
+                    const std::vector<const BloomFilter *> &bloom_filters,
+                    const std::vector<attribute_id> &bloom_filter_attribute_ids,
                     InsertDestinationInterface *destination) const;
 
   /**
@@ -585,7 +590,8 @@ class StorageBlock : public StorageBlockBase {
    **/
   const std::size_t getNumTuples() const;
 
-  TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
+  TupleIdSequence* getMatchesForPredicate(const Predicate *predicate,
+                                          const TupleIdSequence *sequence = nullptr) const;
 
  private:
   static TupleStorageSubBlock* CreateTupleStorageSubBlock(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index 4cc1b0f..04cb7a3 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -33,6 +33,7 @@
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
 #include "utility/StringUtil.hpp"
@@ -86,6 +87,9 @@ std::string PlanVisualizer::visualize(const P::PhysicalPtr &input) {
   for (const EdgeInfo &edge_info : edges_) {
     graph_oss << "  " << edge_info.src_node_id << " -> "
               << edge_info.dst_node_id << " [";
+    if (edge_info.dashed) {
+      graph_oss << "style=dashed ";
+    }
     if (!edge_info.labels.empty()) {
       graph_oss << "label=\""
                 << EscapeSpecialChars(JoinToString(edge_info.labels, "&#10;"))
@@ -116,6 +120,12 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
     EdgeInfo &edge_info = edges_.back();
     edge_info.src_node_id = child_id;
     edge_info.dst_node_id = node_id;
+    edge_info.dashed = false;
+
+    if (input->getPhysicalType() == P::PhysicalType::kHashJoin &&
+        child == input->children()[1]) {
+      edge_info.dashed = true;
+    }
 
     for (const auto &attr : child->getOutputAttributes()) {
       if (referenced_ids.find(attr->id()) != referenced_ids.end()) {
@@ -165,7 +175,6 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
         node_info.labels.emplace_back(
             std::string("[BF probe] ") + bf.attribute->attribute_alias());
       }
-
       break;
     }
     case P::PhysicalType::kAggregate: {
@@ -178,7 +187,18 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
         node_info.labels.emplace_back(
             std::string("[BF probe] ") + bf.attribute->attribute_alias());
       }
+      break;
+    }
+    case P::PhysicalType::kSelection: {
+      const P::SelectionPtr selection =
+        std::static_pointer_cast<const P::Selection>(input);
+      node_info.labels.emplace_back(input->getName());
 
+      const auto &bf_config = selection->bloom_filter_config();
+      for (const auto &bf : bf_config.probe_side_bloom_filters) {
+        node_info.labels.emplace_back(
+            std::string("[BF probe] ") + bf.attribute->attribute_alias());
+      }
       break;
     }
     default: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f0311c43/utility/PlanVisualizer.hpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.hpp b/utility/PlanVisualizer.hpp
index 080b7de..a35863f 100644
--- a/utility/PlanVisualizer.hpp
+++ b/utility/PlanVisualizer.hpp
@@ -71,6 +71,7 @@ class PlanVisualizer {
     int src_node_id;
     int dst_node_id;
     std::vector<std::string> labels;
+    bool dashed;
   };
 
   void visit(const optimizer::physical::PhysicalPtr &input);