You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/11 20:27:49 UTC

[09/16] incubator-quickstep git commit: Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/rules/AttachBloomFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp
new file mode 100644
index 0000000..03a42a0
--- /dev/null
+++ b/query_optimizer/rules/AttachBloomFilters.cpp
@@ -0,0 +1,308 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_optimizer/rules/AttachBloomFilters.hpp"
+
+#include <memory>
+#include <set>
+#include <unordered_set>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr AttachBloomFilters::apply(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(
+          std::static_pointer_cast<const P::TopLevelPlan>(input)->shared_subplans()));
+
+  visitProducer(input, 0);
+  visitConsumer(input);
+
+//  for (const auto &info_vec_pair : consumers_) {
+//    std::cerr << "--------\n"
+//              << "Node " << info_vec_pair.first->getName()
+//              << " " << info_vec_pair.first << "\n";
+//
+//    for (const auto &info : info_vec_pair.second) {
+//      std::cerr << info.attribute->attribute_alias();
+//      if (info.attribute->id() != info.source_attribute->id()) {
+//        std::cerr << "{FROM " << info.source_attribute->attribute_alias() << "}";
+//      }
+//      if (info.from_sibling) {
+//        std::cerr << " sibling";
+//      }
+//      std::cerr << " @" << info.source << "[" << info.depth << "]"
+//                << ": " << info.selectivity << "\n";
+//    }
+//    std::cerr << "********\n";
+//  }
+
+  return visitAndAttach(input);
+}
+
+void AttachBloomFilters::visitProducer(const P::PhysicalPtr &node, const int depth) {
+  for (const P::PhysicalPtr &child : node->children()) {
+    visitProducer(child, depth+1);
+  }
+
+  std::vector<BloomFilterInfo> bloom_filters;
+
+  if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+    const P::HashJoinPtr &hash_join =
+        std::static_pointer_cast<const P::HashJoin>(node);
+    const P::PhysicalPtr &build_node = hash_join->right();
+    double selectivity = cost_model_->estimateSelectivity(build_node);
+    if (selectivity < 1.0) {
+      auto &build_node_info = producers_[build_node];
+      for (const auto &attr : hash_join->right_join_attributes()) {
+        build_node_info.emplace_back(node, attr, depth, selectivity, false);
+      }
+    }
+  }
+
+  const std::vector<E::AttributeReferencePtr> output_attributes(
+      node->getOutputAttributes());
+  std::unordered_set<E::ExprId> output_attribute_ids;
+  for (const auto &attr : output_attributes) {
+    output_attribute_ids.emplace(attr->id());
+  }
+
+  // First check inherited bloom filters
+  std::vector<const BloomFilterInfo*> candidates;
+  switch (node->getPhysicalType()) {
+    case P::PhysicalType::kAggregate:
+    case P::PhysicalType::kSelection:
+    case P::PhysicalType::kHashJoin: {
+      for (const P::PhysicalPtr &child : node->children()) {
+        for (const BloomFilterInfo &info : producers_[child]) {
+          candidates.emplace_back(&info);
+        }
+      }
+    }
+    default:
+      break;
+  }
+
+  for (const BloomFilterInfo *info : candidates) {
+    if (output_attribute_ids.find(info->attribute->id()) != output_attribute_ids.end()) {
+      bloom_filters.emplace_back(
+          info->source, info->attribute, info->depth, info->selectivity, false);
+    }
+  }
+
+  // Self-produced bloom filters
+//  double selectivity = cost_model_->estimateSelectivity(node);
+//  if (selectivity < 1.0) {
+//    for (const auto &attr : output_attributes) {
+//      bloom_filters.emplace_back(node, attr, depth, selectivity, false);
+//    }
+//  }
+
+  producers_.emplace(node, std::move(bloom_filters));
+}
+
+void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
+  std::vector<BloomFilterInfo> bloom_filters;
+
+  // Bloom filters from parent
+  const auto &parent_bloom_filters = consumers_[node];
+  if (!parent_bloom_filters.empty()) {
+    for (const auto &child : node->children()) {
+      std::unordered_set<E::ExprId> child_output_attribute_ids;
+      for (const auto &attr : child->getOutputAttributes()) {
+        child_output_attribute_ids.emplace(attr->id());
+      }
+
+      std::vector<BloomFilterInfo> bloom_filters;
+      for (const auto &info : parent_bloom_filters) {
+        if (child_output_attribute_ids.find(info.attribute->id())
+                != child_output_attribute_ids.end()) {
+          bloom_filters.emplace_back(info.source,
+                                     info.attribute,
+                                     info.depth,
+                                     info.selectivity,
+                                     false,
+                                     info.source_attribute);
+        }
+      }
+      consumers_.emplace(child, std::move(bloom_filters));
+    }
+  }
+
+  // Bloom filters from build side to probe side via HashJoin
+  if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+    const P::HashJoinPtr hash_join =
+        std::static_pointer_cast<const P::HashJoin>(node);
+    if (hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin ||
+        hash_join->join_type() == P::HashJoin::JoinType::kLeftSemiJoin) {
+      const P::PhysicalPtr &producer_child = hash_join->right();
+      const P::PhysicalPtr &consumer_child = hash_join->left();
+      std::unordered_map<E::ExprId, E::AttributeReferencePtr> join_attribute_pairs;
+      for (std::size_t i = 0; i < hash_join->left_join_attributes().size(); ++i) {
+        const E::AttributeReferencePtr probe_join_attribute =
+            hash_join->left_join_attributes()[i];
+        const E::AttributeReferencePtr build_join_attribute =
+            hash_join->right_join_attributes()[i];
+        join_attribute_pairs.emplace(build_join_attribute->id(),
+                                     probe_join_attribute);
+      }
+
+      auto &consumer_bloom_filters = consumers_[consumer_child];
+      for (const auto &info : producers_[producer_child]) {
+        const auto pair_it = join_attribute_pairs.find(info.attribute->id());
+        if (pair_it != join_attribute_pairs.end()) {
+          consumer_bloom_filters.emplace_back(info.source,
+                                              pair_it->second,
+                                              info.depth,
+                                              info.selectivity,
+                                              true,
+                                              info.attribute);
+        }
+      }
+    }
+  }
+
+  P::PhysicalPtr consumer_child = nullptr;
+  if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+    consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
+  }
+  if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
+    consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
+  }
+
+  if (consumer_child != nullptr) {
+    // Decide attaches
+    auto &consumer_bloom_filters = consumers_[consumer_child];
+    if (cost_model_->estimateCardinality(consumer_child) > 10000000 &&
+        !consumer_bloom_filters.empty()) {
+      std::map<E::AttributeReferencePtr, const BloomFilterInfo*> filters;
+      for (const auto &info : consumer_bloom_filters) {
+        auto it = filters.find(info.attribute);
+        if (it == filters.end()) {
+          filters.emplace(info.attribute, &info);
+        } else {
+          if (BloomFilterInfo::isBetterThan(&info, it->second)) {
+            it->second = &info;
+          }
+        }
+      }
+
+      auto &probe_attaches = getBloomFilterConfig(node);
+      for (const auto &pair : filters) {
+        auto &build_attaches = getBloomFilterConfig(pair.second->source);
+        build_attaches.addBuildSideBloomFilter(
+            pair.second->source_attribute);
+        probe_attaches.addProbeSideBloomFilter(
+            pair.first,
+            pair.second->source_attribute,
+            pair.second->source);
+      }
+    }
+  }
+
+  for (const auto &child : node->children()) {
+    visitConsumer(child);
+  }
+}
+
+P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &node) {
+  std::vector<P::PhysicalPtr> new_children;
+  bool has_changed = false;
+  for (const auto &child : node->children()) {
+    P::PhysicalPtr new_child = visitAndAttach(child);
+    if (new_child != child) {
+      has_changed = true;
+    }
+    new_children.emplace_back(new_child);
+  }
+
+  if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+    const auto attach_it = attaches_.find(node);
+    if (attach_it != attaches_.end()) {
+//      for (const auto& item : attach_it->second.probe_side_bloom_filters) {
+//        std::cout << "Attach probe from " << item.builder
+//                  << " to " << node << "\n";
+//      }
+
+      const P::HashJoinPtr hash_join =
+          std::static_pointer_cast<const P::HashJoin>(node);
+      return P::HashJoin::Create(
+          new_children[0],
+          new_children[1],
+          hash_join->left_join_attributes(),
+          hash_join->right_join_attributes(),
+          hash_join->residual_predicate(),
+          hash_join->project_expressions(),
+          hash_join->join_type(),
+          attach_it->second);
+    }
+  }
+
+  if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
+    const auto attach_it = attaches_.find(node);
+    if (attach_it != attaches_.end()) {
+//      for (const auto& item : attach_it->second.probe_side_bloom_filters) {
+//        std::cout << "Attach probe from " << item.builder
+//                  << " to " << node << "\n";
+//      }
+
+      const P::AggregatePtr aggregate =
+          std::static_pointer_cast<const P::Aggregate>(node);
+      return P::Aggregate::Create(
+          aggregate->input(),
+          aggregate->grouping_expressions(),
+          aggregate->aggregate_expressions(),
+          aggregate->filter_predicate(),
+          attach_it->second);
+    }
+  }
+
+  if (has_changed) {
+    return node->copyWithNewChildren(new_children);
+  }
+
+  return node;
+}
+
+P::BloomFilterConfig& AttachBloomFilters::getBloomFilterConfig(const physical::PhysicalPtr &node) {
+  if (attaches_.find(node) == attaches_.end()) {
+    attaches_.emplace(node, node);
+  }
+  return attaches_[node];
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/rules/AttachBloomFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.hpp b/query_optimizer/rules/AttachBloomFilters.hpp
new file mode 100644
index 0000000..e4437f7
--- /dev/null
+++ b/query_optimizer/rules/AttachBloomFilters.hpp
@@ -0,0 +1,118 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief TODO
+ */
+class AttachBloomFilters : public Rule<physical::Physical> {
+ public:
+  AttachBloomFilters() {}
+
+  ~AttachBloomFilters() override {}
+
+  std::string getName() const override {
+    return "AttachBloomFilters";
+  }
+
+  physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+  struct BloomFilterInfo {
+    BloomFilterInfo(const physical::PhysicalPtr &source_in,
+                    const expressions::AttributeReferencePtr &attribute_in,
+                    const int depth_in,
+                    const double selectivity_in,
+                    const bool from_sibling_in,
+                    const expressions::AttributeReferencePtr &source_attribute_in = nullptr)
+        : source(source_in),
+          attribute(attribute_in),
+          depth(depth_in),
+          selectivity(selectivity_in),
+          from_sibling(from_sibling_in),
+          source_attribute(
+              source_attribute_in == nullptr
+                  ? attribute_in
+                  : source_attribute_in) {
+
+    }
+    static bool isBetterThan(const BloomFilterInfo *a,
+                             const BloomFilterInfo *b) {
+      if (a->selectivity == b->selectivity) {
+        return a->depth > b->depth;
+      } else {
+        return a->selectivity < b->selectivity;
+      }
+    }
+    physical::PhysicalPtr source;
+    expressions::AttributeReferencePtr attribute;
+    int depth;
+    double selectivity;
+    bool from_sibling;
+    expressions::AttributeReferencePtr source_attribute;
+  };
+
+  void visitProducer(const physical::PhysicalPtr &node, const int depth);
+
+  void visitConsumer(const physical::PhysicalPtr &node);
+
+  physical::PhysicalPtr visitAndAttach(const physical::PhysicalPtr &node);
+
+  physical::BloomFilterConfig &getBloomFilterConfig(const physical::PhysicalPtr &node);
+
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+  std::map<physical::PhysicalPtr, std::vector<BloomFilterInfo>> producers_;
+  std::map<physical::PhysicalPtr, std::vector<BloomFilterInfo>> consumers_;
+  std::map<physical::PhysicalPtr, physical::BloomFilterConfig> attaches_;
+
+  DISALLOW_COPY_AND_ASSIGN(AttachBloomFilters);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif /* QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_ */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index d9709ce..54b1e59 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -18,6 +18,7 @@
 add_subdirectory(tests)
 
 # Declare micro-libs:
+add_library(quickstep_queryoptimizer_rules_AttachBloomFilters AttachBloomFilters.cpp AttachBloomFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
@@ -36,6 +37,20 @@ add_library(quickstep_queryoptimizer_rules_UnnestSubqueries UnnestSubqueries.cpp
 
 
 # Link dependencies:
+target_link_libraries(quickstep_queryoptimizer_rules_AttachBloomFilters
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_Rule
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_BottomUpRule
                       glog
                       quickstep_queryoptimizer_rules_Rule
@@ -127,6 +142,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOpti
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_rules_Rule
+                      quickstep_utility_DisjointTreeForest
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_SwapProbeBuild
                       quickstep_queryoptimizer_costmodel_SimpleCostModel
@@ -187,6 +203,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_UpdateExpression
 # Module all-in-one library:
 add_library(quickstep_queryoptimizer_rules ../../empty_src.cpp OptimizerRulesModule.hpp)
 target_link_libraries(quickstep_queryoptimizer_rules
+                      quickstep_queryoptimizer_rules_AttachBloomFilters
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_GenerateJoins

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
index 946d316..15a7154 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
@@ -33,6 +33,7 @@
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "utility/DisjointTreeForest.hpp"
 
 #include "glog/logging.h"
 
@@ -74,6 +75,9 @@ P::PhysicalPtr StarSchemaHashJoinOrderOptimization::applyInternal(const P::Physi
     JoinGroupInfo *join_group = nullptr;
     if (parent_join_group == nullptr || !is_valid_cascading_hash_join) {
       new_join_group.reset(new JoinGroupInfo());
+      for (const auto &attr : input->getReferencedAttributes()) {
+        new_join_group->referenced_attributes.emplace(attr->id());
+      }
       join_group = new_join_group.get();
     } else {
       join_group = parent_join_group;
@@ -146,7 +150,10 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
         i,
         tables[i],
         cost_model_->estimateCardinality(tables[i]),
-        cost_model_->estimateSelectivity(tables[i]));
+        cost_model_->estimateSelectivity(tables[i]),
+        CountSharedAttributes(join_group.referenced_attributes,
+                              tables[i]->getOutputAttributes()),
+        tables[i]->getPhysicalType() == physical::PhysicalType::kAggregate);
   }
 
   // Auxiliary mapping info.
@@ -163,9 +170,19 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
     }
   }
 
-  // Create a join graph where tables are vertices, and add an edge between vertices
-  // t1 and t2 for each join predicate t1.x = t2.y
-  std::vector<std::unordered_set<std::size_t>> join_graph(table_info_storage.size());
+  std::set<TableInfo*> remaining_tables;
+  for (auto &table_info : table_info_storage) {
+    remaining_tables.emplace(&table_info);
+  }
+
+  DisjointTreeForest<E::ExprId> join_attribute_forest;
+  for (const auto &attr_id_pair : join_group.join_attribute_pairs) {
+    join_attribute_forest.makeSet(attr_id_pair.first);
+    join_attribute_forest.makeSet(attr_id_pair.second);
+    join_attribute_forest.merge(attr_id_pair.first, attr_id_pair.second);
+  }
+
+  std::map<std::size_t, std::map<std::size_t, E::ExprId>> join_attribute_groups;
   for (const auto &attr_id_pair : join_group.join_attribute_pairs) {
     DCHECK(attribute_id_to_table_info_index_map.find(attr_id_pair.first)
                != attribute_id_to_table_info_index_map.end());
@@ -178,128 +195,169 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
         attribute_id_to_table_info_index_map[attr_id_pair.second];
     DCHECK_NE(first_table_idx, second_table_idx);
 
-    table_info_storage[first_table_idx].join_attribute_pairs.emplace(
-        attr_id_pair.first, attr_id_pair.second);
-    table_info_storage[second_table_idx].join_attribute_pairs.emplace(
-        attr_id_pair.second, attr_id_pair.first);
-
-    join_graph[first_table_idx].emplace(second_table_idx);
-    join_graph[second_table_idx].emplace(first_table_idx);
-  }
-
-  std::set<TableInfo*, TableInfoPtrLessComparator> table_info_ordered_by_priority;
-  for (std::size_t i = 0; i < table_info_storage.size(); ++i) {
-    table_info_ordered_by_priority.emplace(&table_info_storage[i]);
+    DCHECK_EQ(join_attribute_forest.find(attr_id_pair.first),
+              join_attribute_forest.find(attr_id_pair.second));
+    const std::size_t attr_group_id = join_attribute_forest.find(attr_id_pair.first);
+    auto &attr_group = join_attribute_groups[attr_group_id];
+    attr_group.emplace(first_table_idx, attr_id_pair.first);
+    attr_group.emplace(second_table_idx, attr_id_pair.second);
   }
 
-  // Contruct hash join tree.
   while (true) {
-    TableInfo *first_table_info = *table_info_ordered_by_priority.begin();
-    table_info_ordered_by_priority.erase(
-        table_info_ordered_by_priority.begin());
-    const std::size_t first_table_info_id = first_table_info->table_info_id;
-
-    TableInfo *second_table_info = nullptr;
-    std::set<TableInfo*, TableInfoPtrLessComparator>::iterator second_table_info_it;
-    for (auto candidate_table_info_it = table_info_ordered_by_priority.begin();
-         candidate_table_info_it != table_info_ordered_by_priority.end();
-         ++candidate_table_info_it) {
-      TableInfo *candidate_table_info = *candidate_table_info_it;
-      const std::size_t candidate_table_info_id = candidate_table_info->table_info_id;
-
-      if (join_graph[first_table_info_id].find(candidate_table_info_id)
-              == join_graph[first_table_info_id].end() &&
-          join_graph[candidate_table_info_id].find(first_table_info_id)
-              == join_graph[candidate_table_info_id].end()) {
-        continue;
-      } else if (second_table_info == nullptr) {
-        second_table_info = candidate_table_info;
-        second_table_info_it = candidate_table_info_it;
-      }
-
-      bool is_likely_many_to_many_join = false;
-      for (const auto join_attr_pair : first_table_info->join_attribute_pairs) {
-        if (candidate_table_info->joined_attribute_set.find(join_attr_pair.second)
-                != candidate_table_info->joined_attribute_set.end()) {
-          is_likely_many_to_many_join = true;
-          break;
-        }
-      }
-      for (const auto join_attr_pair : candidate_table_info->join_attribute_pairs) {
-        if (first_table_info->joined_attribute_set.find(join_attr_pair.second)
-                != first_table_info->joined_attribute_set.end()) {
-          is_likely_many_to_many_join = true;
-          break;
+    // TODO(jianqiao): design better data structure to improve efficiency here.
+    std::unique_ptr<JoinPair> best_join = nullptr;
+    for (TableInfo *probe_table_info : remaining_tables) {
+      for (TableInfo *build_table_info : remaining_tables) {
+        if (probe_table_info != build_table_info) {
+          std::vector<E::AttributeReferencePtr> build_attrs;
+          const std::size_t probe_table_id = probe_table_info->table_info_id;
+          const std::size_t build_table_id = build_table_info->table_info_id;
+          for (const auto &attr_group_pair : join_attribute_groups) {
+            const auto &attr_group = attr_group_pair.second;
+            auto probe_it = attr_group.find(probe_table_id);
+            auto build_it = attr_group.find(build_table_id);
+            if (probe_it != attr_group.end() && build_it != attr_group.end()) {
+              build_attrs.emplace_back(
+                  attribute_id_to_reference_map.at(build_it->second));
+            }
+          }
+          if (!build_attrs.empty()
+              && build_table_info->table->impliesUniqueAttributes(build_attrs)) {
+            std::unique_ptr<JoinPair> new_join(
+                new JoinPair(probe_table_info, build_table_info));
+            if (best_join == nullptr || new_join->isBetterThan(*best_join)) {
+//              if (best_join != nullptr) {
+//                std::cerr << "(" << best_join->probe->estimated_selectivity
+//                          << ", " << best_join->probe->estimated_cardinality << ")"
+//                          << " -- "
+//                          << "(" << best_join->build->estimated_selectivity
+//                          << ", " << best_join->build->estimated_cardinality << ")"
+//                          << "\n";
+//                std::cerr << "REPLACED WITH\n";
+//              }
+//              std::cerr << "(" << new_join->probe->estimated_selectivity
+//                        << ", " << new_join->probe->estimated_cardinality << ")"
+//                        << " -- "
+//                        << "(" << new_join->build->estimated_selectivity
+//                        << ", " << new_join->build->estimated_cardinality << ")"
+//                        << "\n****\n";
+              best_join.reset(new_join.release());
+            }
+          }
         }
       }
-      if (!is_likely_many_to_many_join) {
-        second_table_info = candidate_table_info;
-        second_table_info_it = candidate_table_info_it;
-        break;
-      }
     }
-    DCHECK(second_table_info != nullptr);
-    table_info_ordered_by_priority.erase(second_table_info_it);
 
-    const P::PhysicalPtr &left_child = first_table_info->table;
-    const P::PhysicalPtr &right_child = second_table_info->table;
+    TableInfo *selected_probe_table_info = nullptr;
+    TableInfo *selected_build_table_info = nullptr;
+
+    if (best_join != nullptr) {
+      selected_probe_table_info = best_join->probe;
+      selected_build_table_info = best_join->build;
+    }
+
+    // TODO(jianqiao): Handle the case when there is no primary key-foreign key information available.
+    CHECK(selected_probe_table_info != nullptr);
+    CHECK(selected_build_table_info != nullptr);
+
+//    std::cerr << selected_probe_table_info->estimated_selectivity
+//              << " -- "
+//              << selected_build_table_info->estimated_selectivity
+//              << "\n";
+
+//    std::cerr << selected_probe_table_info->estimated_num_output_attributes
+//              << " -- "
+//              << selected_build_table_info->estimated_num_output_attributes
+//              << "\n";
+
+    remaining_tables.erase(selected_probe_table_info);
+    remaining_tables.erase(selected_build_table_info);
+
+    const P::PhysicalPtr &probe_child = selected_probe_table_info->table;
+    const P::PhysicalPtr &build_child = selected_build_table_info->table;
     std::vector<E::NamedExpressionPtr> output_attributes;
-    for (const E::AttributeReferencePtr &left_attr : left_child->getOutputAttributes()) {
-      output_attributes.emplace_back(left_attr);
+    for (const E::AttributeReferencePtr &probe_attr : probe_child->getOutputAttributes()) {
+      output_attributes.emplace_back(probe_attr);
     }
-    for (const E::AttributeReferencePtr &right_attr : right_child->getOutputAttributes()) {
-      output_attributes.emplace_back(right_attr);
+    for (const E::AttributeReferencePtr &build_attr : build_child->getOutputAttributes()) {
+      output_attributes.emplace_back(build_attr);
     }
 
-    std::vector<E::AttributeReferencePtr> left_join_attributes;
-    std::vector<E::AttributeReferencePtr> right_join_attributes;
-    std::unordered_set<expressions::ExprId> new_joined_attribute_set;
-    for (const auto &join_attr_pair : first_table_info->join_attribute_pairs) {
-      if (second_table_info->join_attribute_pairs.find(join_attr_pair.second)
-              != second_table_info->join_attribute_pairs.end()) {
-        left_join_attributes.emplace_back(
-            attribute_id_to_reference_map[join_attr_pair.first]);
-        right_join_attributes.emplace_back(
-            attribute_id_to_reference_map[join_attr_pair.second]);
-
-        new_joined_attribute_set.emplace(join_attr_pair.first);
-        new_joined_attribute_set.emplace(join_attr_pair.second);
+    std::vector<E::AttributeReferencePtr> probe_attributes;
+    std::vector<E::AttributeReferencePtr> build_attributes;
+    const std::size_t probe_table_id = selected_probe_table_info->table_info_id;
+    const std::size_t build_table_id = selected_build_table_info->table_info_id;
+    for (const auto &attr_group_pair : join_attribute_groups) {
+      const auto &attr_group = attr_group_pair.second;
+      auto probe_it = attr_group.find(probe_table_id);
+      auto build_it = attr_group.find(build_table_id);
+      if (probe_it != attr_group.end() && build_it != attr_group.end()) {
+        probe_attributes.emplace_back(
+            attribute_id_to_reference_map.at(probe_it->second));
+        build_attributes.emplace_back(
+            attribute_id_to_reference_map.at(build_it->second));
       }
     }
-    DCHECK_GE(left_join_attributes.size(), static_cast<std::size_t>(1));
 
-    if (table_info_ordered_by_priority.size() > 0) {
+    if (remaining_tables.size() > 0) {
       P::PhysicalPtr output =
-          P::HashJoin::Create(left_child,
-                              right_child,
-                              left_join_attributes,
-                              right_join_attributes,
+          P::HashJoin::Create(probe_child,
+                              build_child,
+                              probe_attributes,
+                              build_attributes,
                               nullptr,
                               output_attributes,
                               P::HashJoin::JoinType::kInnerJoin);
 
-      second_table_info->table = output;
+//      P::PhysicalPtr output;
+//      if (selected_build_table_info->estimated_num_output_attributes >= 4 &&
+//          selected_probe_table_info->estimated_num_output_attributes < 4) {
+//        output = P::HashJoin::Create(build_child,
+//                                     probe_child,
+//                                     build_attributes,
+//                                     probe_attributes,
+//                                     nullptr,
+//                                     output_attributes,
+//                                     P::HashJoin::JoinType::kInnerJoin);
+//      } else {
+//        output = P::HashJoin::Create(probe_child,
+//                                     build_child,
+//                                     probe_attributes,
+//                                     build_attributes,
+//                                     nullptr,
+//                                     output_attributes,
+//                                     P::HashJoin::JoinType::kInnerJoin);
+//      }
+
+      selected_probe_table_info->table = output;
 
       // TODO(jianqiao): Cache the estimated cardinality for each plan in cost
       // model to avoid duplicated estimation.
-      second_table_info->estimated_cardinality = cost_model_->estimateCardinality(output);
-
-      second_table_info->join_attribute_pairs.insert(first_table_info->join_attribute_pairs.begin(),
-                                                     first_table_info->join_attribute_pairs.end());
-      second_table_info->joined_attribute_set.insert(first_table_info->joined_attribute_set.begin(),
-                                                     first_table_info->joined_attribute_set.end());
-      second_table_info->joined_attribute_set.insert(new_joined_attribute_set.begin(),
-                                                     new_joined_attribute_set.end());
-      table_info_ordered_by_priority.emplace(second_table_info);
-
-      join_graph[second_table_info->table_info_id].insert(join_graph[first_table_info_id].begin(),
-                                                          join_graph[first_table_info_id].end());
-
+      selected_probe_table_info->estimated_cardinality = cost_model_->estimateCardinality(output);
+      selected_probe_table_info->estimated_selectivity = cost_model_->estimateSelectivity(output);
+
+      selected_probe_table_info->estimated_num_output_attributes =
+          CountSharedAttributes(join_group.referenced_attributes,
+                                output->getOutputAttributes());
+      selected_probe_table_info->is_aggregation = false;
+
+      remaining_tables.emplace(selected_probe_table_info);
+
+      // Update join attribute groups.
+      for (auto &attr_group_pair : join_attribute_groups) {
+        auto &attr_group = attr_group_pair.second;
+        auto build_it = attr_group.find(build_table_id);
+        if (build_it != attr_group.end()) {
+          const E::ExprId attr_id = build_it->second;
+          attr_group.erase(build_it);
+          attr_group.emplace(probe_table_id, attr_id);
+        }
+      }
     } else {
-      return P::HashJoin::Create(left_child,
-                                 right_child,
-                                 left_join_attributes,
-                                 right_join_attributes,
+      return P::HashJoin::Create(probe_child,
+                                 build_child,
+                                 probe_attributes,
+                                 build_attributes,
                                  residual_predicate,
                                  project_expressions,
                                  P::HashJoin::JoinType::kInnerJoin);
@@ -307,5 +365,18 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
   }
 }
 
+std::size_t StarSchemaHashJoinOrderOptimization::CountSharedAttributes(
+    const std::unordered_set<expressions::ExprId> &attr_set1,
+    const std::vector<expressions::AttributeReferencePtr> &attr_set2) {
+  std::size_t cnt = 0;
+  for (const auto &attr : attr_set2) {
+    if (attr_set1.find(attr->id()) != attr_set1.end()) {
+      ++cnt;
+    }
+  }
+  return cnt;
+}
+
+
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
index 4d6765c..eb21d03 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
@@ -64,6 +64,7 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
    * @brief A group of tables to form a hash join tree.
    */
   struct JoinGroupInfo {
+    std::unordered_set<expressions::ExprId> referenced_attributes;
     std::vector<physical::PhysicalPtr> tables;
     std::vector<std::pair<expressions::ExprId, expressions::ExprId>> join_attribute_pairs;
   };
@@ -72,49 +73,84 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
    * @brief Auxiliary information of a table for the optimizer.
    */
   struct TableInfo {
-    TableInfo(const std::size_t in_table_info_id,
-              const physical::PhysicalPtr &in_table,
-              const std::size_t in_estimated_cardinality,
-              const double in_estimated_selectivity)
-        : table_info_id(in_table_info_id),
-          table(in_table),
-          estimated_cardinality(in_estimated_cardinality),
-          estimated_selectivity(in_estimated_selectivity) {
+    TableInfo(const std::size_t table_info_id_in,
+              const physical::PhysicalPtr &table_in,
+              const std::size_t estimated_cardinality_in,
+              const double estimated_selectivity_in,
+              const std::size_t estimated_num_output_attributes_in,
+              const bool is_aggregation_in)
+        : table_info_id(table_info_id_in),
+          table(table_in),
+          estimated_cardinality(estimated_cardinality_in),
+          estimated_selectivity(estimated_selectivity_in),
+          estimated_num_output_attributes(estimated_num_output_attributes_in),
+          is_aggregation(is_aggregation_in) {
     }
 
     const std::size_t table_info_id;
     physical::PhysicalPtr table;
     std::size_t estimated_cardinality;
     double estimated_selectivity;
-    std::unordered_multimap<expressions::ExprId, expressions::ExprId> join_attribute_pairs;
-    std::unordered_set<expressions::ExprId> joined_attribute_set;
+    std::size_t estimated_num_output_attributes;
+    bool is_aggregation;
   };
 
-  /**
-   * @brief Comparator that compares the join priorities between two tables.
-   */
-  struct TableInfoPtrLessComparator {
-    inline bool operator() (const TableInfo *lhs, const TableInfo *rhs) {
-      bool swapped = false;
-      if (lhs->estimated_cardinality > rhs->estimated_cardinality) {
-        std::swap(lhs, rhs);
-        swapped = true;
+  struct JoinPair {
+    JoinPair(TableInfo *probe_in, TableInfo *build_in)
+        : probe(probe_in), build(build_in) {
+    }
+
+    inline bool isBetterThan(const JoinPair &rhs) const {
+      const auto &lhs = *this;
+      const bool lhs_has_large_output =
+          lhs.build->estimated_num_output_attributes
+              + lhs.probe->estimated_num_output_attributes > 5;
+      const bool rhs_has_large_output =
+          rhs.build->estimated_num_output_attributes
+              + rhs.probe->estimated_num_output_attributes > 5;
+      if (lhs_has_large_output || rhs_has_large_output) {
+        if (lhs_has_large_output != rhs_has_large_output) {
+          return rhs_has_large_output;
+        }
+        double lhs_selectivity =
+            lhs.build->estimated_selectivity * lhs.probe->estimated_selectivity;
+        double rhs_selectivity =
+            rhs.build->estimated_selectivity * rhs.probe->estimated_selectivity;
+        if (lhs_selectivity != rhs_selectivity) {
+          return lhs_selectivity < rhs_selectivity;
+        }
       }
 
-      if (lhs->estimated_selectivity < rhs->estimated_selectivity) {
-        return !swapped;
-      } else if (lhs->estimated_cardinality < 1000u &&
-                 rhs->estimated_cardinality > 10000u &&
-                 lhs->estimated_selectivity < rhs->estimated_selectivity * 1.5) {
-        return !swapped;
-      } else if (lhs->estimated_selectivity > rhs->estimated_selectivity) {
-        return swapped;
-      } else if (lhs->estimated_cardinality != rhs->estimated_cardinality) {
-        return !swapped;
+      const bool lhs_has_small_build =
+          !lhs_has_large_output && lhs.build->estimated_cardinality < 0x100;
+      const bool rhs_has_small_build =
+          !rhs_has_large_output && rhs.build->estimated_cardinality < 0x100;
+      if (lhs_has_small_build != rhs_has_small_build) {
+        return lhs_has_small_build;
+      }
+
+      if (lhs.probe->is_aggregation != rhs.probe->is_aggregation) {
+        return lhs.probe->is_aggregation;
+      }
+
+      if (lhs.probe->estimated_cardinality != rhs.probe->estimated_cardinality) {
+        return lhs.probe->estimated_cardinality < rhs.probe->estimated_cardinality;
+      }
+      if (lhs.build->estimated_selectivity != rhs.build->estimated_selectivity) {
+        return lhs.build->estimated_selectivity < rhs.build->estimated_selectivity;
+      }
+      if (lhs.build->estimated_cardinality != rhs.build->estimated_cardinality) {
+        return lhs.build->estimated_cardinality < rhs.build->estimated_cardinality;
+      }
+      if (lhs.probe->table != rhs.probe->table) {
+        return lhs.probe->table < rhs.probe->table;
       } else {
-        return swapped ^ (lhs->table < rhs->table);
+        return lhs.build->table < rhs.build->table;
       }
     }
+
+    TableInfo *probe;
+    TableInfo *build;
   };
 
   physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input,
@@ -125,6 +161,10 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions);
 
+  static std::size_t CountSharedAttributes(
+      const std::unordered_set<expressions::ExprId> &attr_set1,
+      const std::vector<expressions::AttributeReferencePtr> &attr_set2);
+
   std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
 
   DISALLOW_COPY_AND_ASSIGN(StarSchemaHashJoinOrderOptimization);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionHeuristics_unittest.cpp b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
index 73b3e84..efea423 100644
--- a/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
+++ b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
@@ -72,7 +72,8 @@ class ExecutionHeuristicsTest : public ::testing::Test {
                                           probe_relation,
                                           std::move(build_attribute_ids),
                                           std::move(probe_attribute_ids),
-                                          join_hash_table_id);
+                                          join_hash_table_id,
+                                          build_relation->estimateTupleCardinality());
   }
 
   QueryPlan::DAGNodeIndex createDummyBuildHashOperator(QueryPlan *query_plan,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 779c0fe..9b573ac 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -59,6 +59,11 @@ using std::vector;
 
 namespace quickstep {
 
+DEFINE_int64(bloom_adapter_batch_size, 64,
+             "Number of tuples to probe in bulk in Bloom filter adapter.");
+DEFINE_bool(adapt_bloom_filters, true,
+            "Whether to adaptively adjust the ordering of bloom filters.");
+
 namespace {
 
 // Functor passed to HashTable::getAllFromValueAccessor() to collect matching
@@ -75,6 +80,11 @@ class MapBasedJoinedTupleCollector {
     joined_tuples_[tref.block].emplace_back(tref.tuple, accessor.getCurrentPosition());
   }
 
+  inline void operator()(const tuple_id probe_tid,
+                         const TupleReference &build_tref) {
+    joined_tuples_[build_tref.block].emplace_back(build_tref.tuple, probe_tid);
+  }
+
   // Get a mutable pointer to the collected map of joined tuple ID pairs. The
   // key is inner block_id, values are vectors of joined tuple ID pairs with
   // tuple ID from the inner block on the left and the outer block on the

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index fa393b6..30571a1 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -307,8 +307,9 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      StorageManager *storage_manager,
+      const int op_index = -1)
+      : WorkOrder(query_id, op_index),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(join_key_attributes),
@@ -354,8 +355,9 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      StorageManager *storage_manager,
+      const int op_index = -1)
+      : WorkOrder(query_id, op_index),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(std::move(join_key_attributes)),
@@ -435,8 +437,9 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      StorageManager *storage_manager,
+      const int op_index = -1)
+      : WorkOrder(query_id, op_index),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(join_key_attributes),
@@ -482,8 +485,9 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      StorageManager *storage_manager,
+      const int op_index = -1)
+      : WorkOrder(query_id, op_index),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(std::move(join_key_attributes)),
@@ -559,8 +563,9 @@ class HashAntiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      StorageManager *storage_manager,
+      const int op_index = -1)
+      : WorkOrder(query_id, op_index),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(join_key_attributes),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index c1b9b68..2eeca06 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -300,16 +300,23 @@ class WorkOrder {
     return query_id_;
   }
 
+  inline const int getOperatorIndex() const {
+    return op_index_;
+  }
+
  protected:
   /**
    * @brief Constructor.
    *
    * @param query_id The ID of the query to which this WorkOrder belongs.
    **/
-  explicit WorkOrder(const std::size_t query_id)
-      : query_id_(query_id) {}
+  explicit WorkOrder(const std::size_t query_id,
+                     const int op_index = -1)
+      : query_id_(query_id),
+        op_index_(op_index) {}
 
   const std::size_t query_id_;
+  const int op_index_;
   // A vector of preferred NUMA node IDs where this workorder should be executed.
   // These node IDs typically indicate the NUMA node IDs of the input(s) of the
   // workorder. Derived classes should ensure that there are no duplicate entries

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 3f6e23a..d225258 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -46,10 +46,13 @@
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/containers/Tuple.hpp"
+#include "utility/BloomFilterAdapter.hpp"
 
 #include "glog/logging.h"
 
@@ -57,6 +60,8 @@ using std::unique_ptr;
 
 namespace quickstep {
 
+DECLARE_int64(bloom_adapter_batch_size);
+
 AggregationOperationState::AggregationOperationState(
     const CatalogRelationSchema &input_relation,
     const std::vector<const AggregateFunction*> &aggregate_functions,
@@ -64,12 +69,16 @@ AggregationOperationState::AggregationOperationState(
     std::vector<bool> &&is_distinct,
     std::vector<std::unique_ptr<const Scalar>> &&group_by,
     const Predicate *predicate,
+    std::vector<const BloomFilter *> &&bloom_filters,
+    std::vector<attribute_id> &&bloom_filter_attribute_ids,
     const std::size_t estimated_num_entries,
     const HashTableImplType hash_table_impl_type,
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
       predicate_(predicate),
+      bloom_filters_(std::move(bloom_filters)),
+      bloom_filter_attribute_ids_(std::move(bloom_filter_attribute_ids)),
       group_by_list_(std::move(group_by)),
       arguments_(std::move(arguments)),
       is_distinct_(std::move(is_distinct)),
@@ -183,7 +192,8 @@ AggregationOperationState::AggregationOperationState(
 AggregationOperationState* AggregationOperationState::ReconstructFromProto(
     const serialization::AggregationOperationState &proto,
     const CatalogDatabaseLite &database,
-    StorageManager *storage_manager) {
+    StorageManager *storage_manager,
+    const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
   DCHECK(ProtoIsValid(proto, database));
 
   // Rebuild contructor arguments from their representation in 'proto'.
@@ -232,12 +242,24 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
                                                database));
   }
 
+  std::vector<const BloomFilter*> bloom_filter_vector;
+  std::vector<attribute_id> bloom_filter_attribute_ids;
+  for (int i = 0; i < proto.bloom_filters_size(); ++i) {
+    // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
+    const auto bloom_filter_proto = proto.bloom_filters(i);
+    bloom_filter_vector.emplace_back(
+        bloom_filters[bloom_filter_proto.bloom_filter_id()].get());
+    bloom_filter_attribute_ids.emplace_back(bloom_filter_proto.attr_id());
+  }
+
   return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
                                        aggregate_functions,
                                        std::move(arguments),
                                        std::move(is_distinct),
                                        std::move(group_by_expressions),
                                        predicate.release(),
+                                       std::move(bloom_filter_vector),
+                                       std::move(bloom_filter_attribute_ids),
                                        proto.estimated_num_entries(),
                                        HashTableImplTypeFromProto(proto.hash_table_impl_type()),
                                        distinctify_hash_table_impl_types,
@@ -340,6 +362,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
   // tuples so that it can be reused across multiple aggregates (i.e. we only
   // pay the cost of evaluating the predicate once).
   std::unique_ptr<TupleIdSequence> reuse_matches;
+  if (predicate_) {
+    reuse_matches.reset(block->getMatchesForPredicate(predicate_.get()));
+  }
+
   for (std::size_t agg_idx = 0;
        agg_idx < handles_.size();
        ++agg_idx) {
@@ -358,7 +384,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
                                arguments_[agg_idx],
                                local_arguments_as_attributes,
                                {}, /* group_by */
-                               predicate_.get(),
                                distinctify_hashtables_[agg_idx].get(),
                                &reuse_matches,
                                nullptr /* reuse_group_by_vectors */);
@@ -369,7 +394,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
           block->aggregate(*handles_[agg_idx],
                            arguments_[agg_idx],
                            local_arguments_as_attributes,
-                           predicate_.get(),
                            &reuse_matches));
     }
   }
@@ -391,6 +415,72 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
   // GROUP BY expressions once).
   std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
 
+  if (predicate_) {
+    reuse_matches.reset(block->getMatchesForPredicate(predicate_.get()));
+  }
+
+  if (bloom_filters_.size() > 0) {
+    const std::size_t num_tuples = block->getNumTuples();
+//    std::cerr << "Before: " << num_tuples << " -- "
+//              << (reuse_matches ? reuse_matches->numTuples() : num_tuples)
+//              << "\n";
+    std::unique_ptr<ValueAccessor> accessor;
+    if (reuse_matches) {
+      accessor.reset(
+          block->getTupleStorageSubBlock().createValueAccessor(reuse_matches.get()));
+    } else {
+      accessor.reset(
+          block->getTupleStorageSubBlock().createValueAccessor());
+    }
+    InvokeOnAnyValueAccessor(
+        accessor.get(),
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      std::unique_ptr<TupleIdSequence> filtered(new TupleIdSequence(num_tuples));
+
+      std::vector<std::size_t> attr_size_vector;
+      attr_size_vector.reserve(bloom_filter_attribute_ids_.size());
+      for (const auto &attr : bloom_filter_attribute_ids_) {
+        auto val_and_size =
+            accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, attr);
+        attr_size_vector.emplace_back(val_and_size.second);
+      }
+
+      std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter;
+      bloom_filter_adapter.reset(new BloomFilterAdapter(
+          bloom_filters_, bloom_filter_attribute_ids_, attr_size_vector));
+
+      std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size;
+      std::uint32_t num_tuples_left = accessor->getNumTuples();
+      std::vector<tuple_id> batch(num_tuples_left);
+
+      do {
+        std::uint32_t batch_size =
+            batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
+        for (std::size_t i = 0; i < batch_size; ++i) {
+          accessor->next();
+          batch.push_back(accessor->getCurrentPosition());
+        }
+
+        std::size_t num_hits =
+            bloom_filter_adapter->bulkProbe<true>(accessor, batch, batch_size);
+        for (std::size_t t = 0; t < num_hits; ++t){
+          filtered->set(batch[t], true);
+        }
+
+        batch.clear();
+        num_tuples_left -= batch_size;
+        batch_size_try = batch_size * 2;
+      } while (num_tuples_left > 0);
+
+      if (reuse_matches) {
+        reuse_matches->intersectWith(*filtered);
+      } else {
+        reuse_matches.reset(filtered.release());
+      }
+    });
+//    std::cerr << "After: " << reuse_matches->numTuples() << "\n";
+  }
+
   for (std::size_t agg_idx = 0;
        agg_idx < handles_.size();
        ++agg_idx) {
@@ -402,7 +492,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
                                arguments_[agg_idx],
                                nullptr, /* arguments_as_attributes */
                                group_by_list_,
-                               predicate_.get(),
                                distinctify_hashtables_[agg_idx].get(),
                                &reuse_matches,
                                &reuse_group_by_vectors);
@@ -416,7 +505,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
       block->aggregateGroupBy(*handles_[agg_idx],
                               arguments_[agg_idx],
                               group_by_list_,
-                              predicate_.get(),
                               agg_hash_table,
                               &reuse_matches,
                               &reuse_group_by_vectors);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index ecd116b..30b8f76 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -33,6 +33,7 @@
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTablePool.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/BloomFilter.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -108,6 +109,8 @@ class AggregationOperationState {
                             std::vector<bool> &&is_distinct,
                             std::vector<std::unique_ptr<const Scalar>> &&group_by,
                             const Predicate *predicate,
+                            std::vector<const BloomFilter *> &&bloom_filters,
+                            std::vector<attribute_id> &&bloom_filter_attribute_ids,
                             const std::size_t estimated_num_entries,
                             const HashTableImplType hash_table_impl_type,
                             const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
@@ -131,7 +134,8 @@ class AggregationOperationState {
   static AggregationOperationState* ReconstructFromProto(
       const serialization::AggregationOperationState &proto,
       const CatalogDatabaseLite &database,
-      StorageManager *storage_manager);
+      StorageManager *storage_manager,
+      const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters);
 
   /**
    * @brief Check whether a serialization::AggregationOperationState is
@@ -181,6 +185,10 @@ class AggregationOperationState {
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
   std::unique_ptr<const Predicate> predicate_;
+
+  std::vector<const BloomFilter*> bloom_filters_;
+  std::vector<attribute_id> bloom_filter_attribute_ids_;
+
   std::vector<std::unique_ptr<const Scalar>> group_by_list_;
 
   // Each individual aggregate in this operation has an AggregationHandle and

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/AggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index 7521d73..04b661c 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -42,4 +42,10 @@ message AggregationOperationState {
 
   // Each DISTINCT aggregation has its distinctify hash table impl type.
   repeated HashTableImplType distinctify_hash_table_impl_types = 7;
+
+  message BloomFilter {
+    required uint32 bloom_filter_id = 1;
+    required uint32 attr_id = 2;
+  }
+  repeated BloomFilter bloom_filters = 8;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/BasicColumnStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/BasicColumnStoreValueAccessor.hpp b/storage/BasicColumnStoreValueAccessor.hpp
index 0560d99..f4c6cd9 100644
--- a/storage/BasicColumnStoreValueAccessor.hpp
+++ b/storage/BasicColumnStoreValueAccessor.hpp
@@ -20,6 +20,8 @@
 #ifndef QUICKSTEP_STORAGE_BASIC_COLUMN_STORE_VALUE_ACCESSOR_HPP_
 #define QUICKSTEP_STORAGE_BASIC_COLUMN_STORE_VALUE_ACCESSOR_HPP_
 
+#include <cstddef>
+#include <utility>
 #include <vector>
 
 #include "catalog/CatalogRelationSchema.hpp"
@@ -45,7 +47,8 @@ class BasicColumnStoreValueAccessorHelper {
       : relation_(relation),
         num_tuples_(num_tuples),
         column_stripes_(column_stripes),
-        column_null_bitmaps_(column_null_bitmaps) {
+        column_null_bitmaps_(column_null_bitmaps),
+        attr_max_lengths_(relation.getMaximumAttributeByteLengths()) {
   }
 
   inline tuple_id numPackedTuples() const {
@@ -63,9 +66,23 @@ class BasicColumnStoreValueAccessorHelper {
       return nullptr;
     }
 
-    // TODO(chasseur): Consider cacheing the byte lengths of attributes.
-    return static_cast<const char*>(column_stripes_[attr])
-           + (tuple * relation_.getAttributeById(attr)->getType().maximumByteLength());
+    return static_cast<const char*>(column_stripes_[attr]) + (tuple * attr_max_lengths_[attr]);
+  }
+
+  template <bool check_null>
+  inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple,
+                                                                            const attribute_id attr) const {
+    DEBUG_ASSERT(tuple < num_tuples_);
+    DEBUG_ASSERT(relation_.hasAttributeWithId(attr));
+    if (check_null
+        && (!column_null_bitmaps_.elementIsNull(attr))
+        && column_null_bitmaps_[attr].getBit(tuple)) {
+      return std::make_pair(nullptr, 0);
+    }
+
+    const std::size_t attr_length = attr_max_lengths_[attr];
+    return std::make_pair(static_cast<const char*>(column_stripes_[attr]) + (tuple * attr_length),
+                          attr_length);
   }
 
   inline TypedValue getAttributeValueTyped(const tuple_id tuple,
@@ -82,6 +99,7 @@ class BasicColumnStoreValueAccessorHelper {
   const tuple_id num_tuples_;
   const std::vector<void*> &column_stripes_;
   const PtrVector<BitVector<false>, true> &column_null_bitmaps_;
+  const std::vector<std::size_t> &attr_max_lengths_;
 
   DISALLOW_COPY_AND_ASSIGN(BasicColumnStoreValueAccessorHelper);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/BloomFilterIndexSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/BloomFilterIndexSubBlock.cpp b/storage/BloomFilterIndexSubBlock.cpp
index 4351c05..6ba466e 100644
--- a/storage/BloomFilterIndexSubBlock.cpp
+++ b/storage/BloomFilterIndexSubBlock.cpp
@@ -57,7 +57,6 @@ BloomFilterIndexSubBlock::BloomFilterIndexSubBlock(const TupleStorageSubBlock &t
                     sub_block_memory_size),
       is_initialized_(false),
       is_consistent_(false),
-      random_seed_(kBloomFilterSeed),
       bit_array_size_in_bytes_(description.GetExtension(
                                    BloomFilterIndexSubBlockDescription::bloom_filter_size)) {
   CHECK(DescriptionIsValid(relation_, description_))
@@ -76,8 +75,7 @@ BloomFilterIndexSubBlock::BloomFilterIndexSubBlock(const TupleStorageSubBlock &t
   const std::uint32_t salt_count = description.GetExtension(BloomFilterIndexSubBlockDescription::number_of_hashes);
 
   // Initialize the bloom_filter_ data structure to operate on bit_array.
-  bloom_filter_.reset(new BloomFilter(random_seed_,
-                                      salt_count,
+  bloom_filter_.reset(new BloomFilter(salt_count,
                                       bit_array_size_in_bytes_,
                                       bit_array_.get(),
                                       is_bloom_filter_initialized));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/BloomFilterIndexSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/BloomFilterIndexSubBlock.hpp b/storage/BloomFilterIndexSubBlock.hpp
index 09d6225..4ffbc2c 100644
--- a/storage/BloomFilterIndexSubBlock.hpp
+++ b/storage/BloomFilterIndexSubBlock.hpp
@@ -67,11 +67,6 @@ class BloomFilterIndexSubBlock : public IndexSubBlock {
     kSelectivityNone
   };
 
-  /**
-   * @brief A random seed to initialize the bloom filter hash functions.
-   **/
-  static const std::uint64_t kBloomFilterSeed = 0xA5A5A5A55A5A5A5AULL;
-
   BloomFilterIndexSubBlock(const TupleStorageSubBlock &tuple_store,
                            const IndexSubBlockDescription &description,
                            const bool new_block,
@@ -181,7 +176,6 @@ class BloomFilterIndexSubBlock : public IndexSubBlock {
  private:
   bool is_initialized_;
   bool is_consistent_;
-  const std::uint64_t random_seed_;
   const std::uint64_t bit_array_size_in_bytes_;
   std::vector<attribute_id> indexed_attribute_ids_;
   std::unique_ptr<unsigned char> bit_array_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 65a7975..8fbe7de 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -679,6 +679,8 @@ target_link_libraries(quickstep_storage_HashTable
                       quickstep_types_Type
                       quickstep_types_TypedValue
                       quickstep_utility_BloomFilter
+                      quickstep_utility_BloomFilterAdapter
+                      quickstep_utility_EventProfiler
                       quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_HashTableBase

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/CompressedColumnStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/CompressedColumnStoreValueAccessor.hpp b/storage/CompressedColumnStoreValueAccessor.hpp
index 25e5eed..31ddef5 100644
--- a/storage/CompressedColumnStoreValueAccessor.hpp
+++ b/storage/CompressedColumnStoreValueAccessor.hpp
@@ -54,6 +54,7 @@ class CompressedColumnStoreValueAccessorHelper {
       const PtrVector<BitVector<false>, true> &uncompressed_column_null_bitmaps)
       : relation_(relation),
         num_tuples_(num_tuples),
+        attr_max_lengths_(relation.getMaximumAttributeByteLengths()),
         compression_info_(compression_info),
         dictionary_coded_attributes_(dictionary_coded_attributes),
         truncated_attributes_(truncated_attributes),
@@ -86,6 +87,26 @@ class CompressedColumnStoreValueAccessorHelper {
     }
   }
 
+  template <bool check_null>
+  inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple,
+                                                                            const attribute_id attr) const {
+    if (dictionary_coded_attributes_[attr]) {
+      return dictionaries_.atUnchecked(attr).getUntypedValueAndByteLengthForCode<check_null>(
+          getCode(tuple, attr));
+    } else if (truncated_attributes_[attr]) {
+      if (truncated_attribute_is_int_[attr]) {
+        int_buffer_ = getCode(tuple, attr);
+        return std::make_pair(&int_buffer_, sizeof(int_buffer_));
+      } else {
+        long_buffer_ = getCode(tuple, attr);
+        return std::make_pair(&long_buffer_, sizeof(long_buffer_));
+      }
+    } else {
+      return std::make_pair(getAttributePtr<check_null>(tuple, attr),
+                            attr_max_lengths_[attr]);
+    }
+  }
+
   inline TypedValue getAttributeValueTyped(const tuple_id tuple,
                                            const attribute_id attr) const {
     if (dictionary_coded_attributes_[attr]) {
@@ -140,6 +161,7 @@ class CompressedColumnStoreValueAccessorHelper {
   const CatalogRelationSchema &relation_;
 
   const tuple_id num_tuples_;
+  const std::vector<std::size_t> &attr_max_lengths_;
 
   const CompressedBlockInfo &compression_info_;
   const std::vector<bool> &dictionary_coded_attributes_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/CompressedPackedRowStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/CompressedPackedRowStoreValueAccessor.hpp b/storage/CompressedPackedRowStoreValueAccessor.hpp
index 8858175..66b2f5f 100644
--- a/storage/CompressedPackedRowStoreValueAccessor.hpp
+++ b/storage/CompressedPackedRowStoreValueAccessor.hpp
@@ -60,6 +60,7 @@ class CompressedPackedRowStoreValueAccessorHelper {
         num_tuples_(num_tuples),
         tuple_length_bytes_(tuple_length_bytes),
         attribute_offsets_(attribute_offsets),
+        attr_max_lengths_(relation.getMaximumAttributeByteLengths()),
         compression_info_(compression_info),
         dictionary_coded_attributes_(dictionary_coded_attributes),
         truncated_attributes_(truncated_attributes),
@@ -94,6 +95,26 @@ class CompressedPackedRowStoreValueAccessorHelper {
     }
   }
 
+  template <bool check_null>
+  inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple,
+                                                                            const attribute_id attr) const {
+    if (dictionary_coded_attributes_[attr]) {
+      return dictionaries_.atUnchecked(attr).getUntypedValueAndByteLengthForCode<check_null>(
+          getCode(tuple, attr));
+    } else if (truncated_attributes_[attr]) {
+      if (truncated_attribute_is_int_[attr]) {
+        int_buffer_ = getCode(tuple, attr);
+        return std::make_pair(&int_buffer_, sizeof(int_buffer_));
+      } else {
+        long_buffer_ = getCode(tuple, attr);
+        return std::make_pair(&long_buffer_, sizeof(long_buffer_));
+      }
+    } else {
+      return std::make_pair(getAttributePtr<check_null>(tuple, attr),
+                            attr_max_lengths_[attr]);
+    }
+  }
+
   inline TypedValue getAttributeValueTyped(const tuple_id tuple,
                                            const attribute_id attr) const {
     if (dictionary_coded_attributes_[attr]) {
@@ -152,6 +173,7 @@ class CompressedPackedRowStoreValueAccessorHelper {
   const tuple_id num_tuples_;
   const std::size_t tuple_length_bytes_;
   const std::vector<std::size_t> &attribute_offsets_;
+  const std::vector<std::size_t> &attr_max_lengths_;
 
   const CompressedBlockInfo &compression_info_;
   const std::vector<bool> &dictionary_coded_attributes_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index 9fa41a2..3538181 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -23,6 +23,7 @@
 #include <atomic>
 #include <cstddef>
 #include <cstdlib>
+#include <memory>
 #include <type_traits>
 #include <vector>
 
@@ -39,11 +40,14 @@
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/BloomFilter.hpp"
+#include "utility/BloomFilterAdapter.hpp"
 #include "utility/HashPair.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
+DECLARE_int64(bloom_adapter_batch_size);
+
 /** \addtogroup Storage
  *  @{
  */
@@ -1016,8 +1020,12 @@ class HashTable : public HashTableBase<resizable,
    *
    * @param bloom_filter The pointer to the bloom filter.
    **/
-  inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) {
-    build_bloom_filter_ = bloom_filter;
+  inline void addBuildSideBloomFilter(BloomFilter *bloom_filter) {
+    build_bloom_filters_.emplace_back(bloom_filter);
+  }
+
+  inline void addBuildSideAttributeId(const attribute_id build_attribute_id) {
+    build_attribute_ids_.push_back(build_attribute_id);
   }
 
   /**
@@ -1042,8 +1050,8 @@ class HashTable : public HashTableBase<resizable,
    * @param probe_attribute_ids The vector of attribute ids to use for probing
    *        the bloom filter.
    **/
-  inline void addProbeSideAttributeIds(std::vector<attribute_id> &&probe_attribute_ids) {
-    probe_attribute_ids_.push_back(probe_attribute_ids);
+  inline void addProbeSideAttributeId(const attribute_id probe_attribute_id) {
+    probe_attribute_ids_.push_back(probe_attribute_id);
   }
 
  protected:
@@ -1329,9 +1337,10 @@ class HashTable : public HashTableBase<resizable,
   // Data structures used for bloom filter optimized semi-joins.
   bool has_build_side_bloom_filter_ = false;
   bool has_probe_side_bloom_filter_ = false;
-  BloomFilter *build_bloom_filter_;
+  std::vector<BloomFilter *> build_bloom_filters_;
+  std::vector<attribute_id> build_attribute_ids_;
   std::vector<const BloomFilter*> probe_bloom_filters_;
-  std::vector<std::vector<attribute_id>> probe_attribute_ids_;
+  std::vector<attribute_id> probe_attribute_ids_;
 
   DISALLOW_COPY_AND_ASSIGN(HashTable);
 };
@@ -1477,12 +1486,26 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                                         &prealloc_state);
       }
     }
-    std::unique_ptr<BloomFilter> thread_local_bloom_filter;
+
     if (has_build_side_bloom_filter_) {
-      thread_local_bloom_filter.reset(new BloomFilter(build_bloom_filter_->getRandomSeed(),
-                                                      build_bloom_filter_->getNumberOfHashes(),
-                                                      build_bloom_filter_->getBitArraySize()));
+      for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) {
+        auto *build_bloom_filter = build_bloom_filters_[i];
+        std::unique_ptr<BloomFilter> thread_local_bloom_filter(
+            new BloomFilter(build_bloom_filter->getNumberOfHashes(),
+                            build_bloom_filter->getBitArraySize()));
+        const auto &build_attr = build_attribute_ids_[i];
+        const std::size_t attr_size =
+            accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, build_attr).second;
+        while (accessor->next()) {
+          thread_local_bloom_filter->insertUnSafe(
+              static_cast<const std::uint8_t *>(accessor->getUntypedValue(build_attr)),
+              attr_size);
+        }
+        build_bloom_filter->bitwiseOr(thread_local_bloom_filter.get());
+        accessor->beginIteration();
+      }
     }
+
     if (resizable) {
       while (result == HashTablePutResult::kOutOfSpace) {
         {
@@ -1498,11 +1521,6 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                        variable_size,
                                        (*functor)(*accessor),
                                        using_prealloc ? &prealloc_state : nullptr);
-            // Insert into bloom filter, if enabled.
-            if (has_build_side_bloom_filter_) {
-              thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
-                                                      key.getDataSize());
-            }
             if (result == HashTablePutResult::kDuplicateKey) {
               DEBUG_ASSERT(!using_prealloc);
               return result;
@@ -1528,20 +1546,11 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                    variable_size,
                                    (*functor)(*accessor),
                                    using_prealloc ? &prealloc_state : nullptr);
-        // Insert into bloom filter, if enabled.
-        if (has_build_side_bloom_filter_) {
-          thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
-                                                  key.getDataSize());
-        }
         if (result != HashTablePutResult::kOK) {
           return result;
         }
       }
     }
-    // Update the build side bloom filter with thread local copy, if available.
-    if (has_build_side_bloom_filter_) {
-      build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get());
-    }
 
     return HashTablePutResult::kOK;
   });
@@ -1607,6 +1616,26 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                                         &prealloc_state);
       }
     }
+
+    if (has_build_side_bloom_filter_) {
+      for (std::size_t i = 0; i < build_bloom_filters_.size(); ++i) {
+        auto *build_bloom_filter = build_bloom_filters_[i];
+        std::unique_ptr<BloomFilter> thread_local_bloom_filter(
+            new BloomFilter(build_bloom_filter->getNumberOfHashes(),
+                            build_bloom_filter->getBitArraySize()));
+        const auto &build_attr = build_attribute_ids_[i];
+        const std::size_t attr_size =
+            accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, build_attr).second;
+        while (accessor->next()) {
+          thread_local_bloom_filter->insertUnSafe(
+              static_cast<const std::uint8_t *>(accessor->getUntypedValue(build_attr)),
+              attr_size);
+        }
+        build_bloom_filter->bitwiseOr(thread_local_bloom_filter.get());
+        accessor->beginIteration();
+      }
+    }
+
     if (resizable) {
       while (result == HashTablePutResult::kOutOfSpace) {
         {
@@ -2229,6 +2258,7 @@ inline std::size_t HashTable<ValueT, resizable, serializable, force_key_copy, al
   }
 }
 
+
 template <typename ValueT,
           bool resizable,
           bool serializable,
@@ -2246,42 +2276,85 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
   InvokeOnAnyValueAccessor(
       accessor,
       [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-    while (accessor->next()) {
-      // Probe any bloom filters, if enabled.
-      if (has_probe_side_bloom_filter_) {
-        DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size());
-        // Check if the key is contained in the BloomFilters or not.
-        bool bloom_miss = false;
-        for (std::size_t i = 0; i < probe_bloom_filters_.size() && !bloom_miss; ++i) {
-          const BloomFilter *bloom_filter = probe_bloom_filters_[i];
-          for (const attribute_id &attr_id : probe_attribute_ids_[i]) {
-            TypedValue bloom_key = accessor->getTypedValue(attr_id);
-            if (!bloom_filter->contains(static_cast<const std::uint8_t*>(bloom_key.getDataPtr()),
-                                        bloom_key.getDataSize())) {
-              bloom_miss = true;
+    std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter;
+    if (has_probe_side_bloom_filter_) {
+      // Find (and cache) the size of each attribute in the probe lists.
+      // NOTE(nav): This code uses the accessor to get the size,
+      // and hence only works if there's at least one tuple.
+      std::vector<std::size_t> attr_size_vector;
+      attr_size_vector.reserve(probe_attribute_ids_.size());
+      for (const auto &probe_attr : probe_attribute_ids_) {
+        auto val_and_size =
+            accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, probe_attr);
+//        std::cerr << "BF attr size = " << val_and_size.second << "\n";
+        attr_size_vector.emplace_back(val_and_size.second);
+      }
+
+      bloom_filter_adapter.reset(new BloomFilterAdapter(
+              probe_bloom_filters_, probe_attribute_ids_, attr_size_vector));
+
+      // We want to have large batch sizes for cache efficiency while probeing,
+      // but small batch sizes to ensure that the adaptation logic kicks in
+      // (and does early). We use exponentially increasing batch sizes to
+      // achieve a balance between the two.
+      //
+      // We also keep track of num_tuples_left in the block, to ensure that
+      // we don't reserve an unnecessarily large vector.
+      std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size;
+      std::uint32_t num_tuples_left = accessor->getNumTuples();
+      std::vector<tuple_id> batch(num_tuples_left);
+
+      do {
+        std::uint32_t batch_size =
+            batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
+        for (std::size_t i = 0; i < batch_size; ++i) {
+          accessor->next();
+          batch.push_back(accessor->getCurrentPosition());
+        }
+
+        std::size_t num_hits =
+            bloom_filter_adapter->bulkProbe<true>(accessor, batch, batch_size);
+
+        for (std::size_t t = 0; t < num_hits; ++t){
+          tuple_id probe_tid = batch[t];
+          TypedValue key = accessor->getTypedValueAtAbsolutePosition(key_attr_id, probe_tid);
+          if (check_for_null_keys && key.isNull()) {
+            continue;
+          }
+          const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
+                                                                        : key.getHash();
+          const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
+                                                                  : true_hash;
+          std::size_t entry_num = 0;
+          const ValueT *value;
+          while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
+            (*functor)(probe_tid, *value);
+            if (!allow_duplicate_keys)
               break;
-            }
           }
         }
-        if (bloom_miss) {
-          continue;  // On a bloom filter miss, probing the hash table can be skipped.
-        }
-      }
+        batch.clear();
+        num_tuples_left -= batch_size;
+        batch_size_try = batch_size * 2;
+      } while (!accessor->iterationFinished());
+    }
 
-      TypedValue key = accessor->getTypedValue(key_attr_id);
-      if (check_for_null_keys && key.isNull()) {
-        continue;
-      }
-      const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
-                                                                     : key.getHash();
-      const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
-                                                               : true_hash;
-      std::size_t entry_num = 0;
-      const ValueT *value;
-      while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
-        (*functor)(*accessor, *value);
-        if (!allow_duplicate_keys) {
-          break;
+    else { // no Bloom filters to probe
+      while(accessor->next()) {
+        TypedValue key = accessor->getTypedValue(key_attr_id);
+        if (check_for_null_keys && key.isNull()) {
+          continue;
+        }
+        const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
+                                      : key.getHash();
+        const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
+                                          : true_hash;
+        std::size_t entry_num = 0;
+        const ValueT *value;
+        while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
+          (*functor)(*accessor, *value);
+          if (!allow_duplicate_keys)
+            break;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index ade30d8..209144d 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -34,10 +34,10 @@ message HashTable {
   required HashTableImplType hash_table_impl_type = 1;
   repeated Type key_types = 2;
   required uint64 estimated_num_entries = 3;
-  repeated uint32 build_side_bloom_filter_id = 4;
-  message ProbeSideBloomFilter {
-    required uint32 probe_side_bloom_filter_id = 1;
-    repeated uint32 probe_side_attr_ids = 2;
+  message BloomFilter {
+    required uint32 bloom_filter_id = 1;
+    required uint32 attr_id = 2;
   }
-  repeated ProbeSideBloomFilter probe_side_bloom_filters = 6;
+  repeated BloomFilter probe_side_bloom_filters = 4;
+  repeated BloomFilter build_side_bloom_filters = 5;  
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 40b39de..1b60c53 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -321,9 +321,15 @@ class HashTableFactory {
     //                 individual implementations of the hash table constructors.
 
     // Check if there are any build side bloom filter defined on the hash table.
-    if (proto.build_side_bloom_filter_id_size() > 0) {
+    if (proto.build_side_bloom_filters_size() > 0) {
       hash_table->enableBuildSideBloomFilter();
-      hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
+      for (int j = 0; j < proto.build_side_bloom_filters_size(); ++j) {
+        const auto build_side_bloom_filter = proto.build_side_bloom_filters(j);
+        hash_table->addBuildSideBloomFilter(
+            bloom_filters[build_side_bloom_filter.bloom_filter_id()].get());
+
+        hash_table->addBuildSideAttributeId(build_side_bloom_filter.attr_id());
+      }
     }
 
     // Check if there are any probe side bloom filters defined on the hash table.
@@ -333,15 +339,10 @@ class HashTableFactory {
       for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
         // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
         const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
-        hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
-
-        // Add the attribute ids corresponding to this probe bloom filter.
-        std::vector<attribute_id> probe_attribute_ids;
-        for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
-          const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
-          probe_attribute_ids.push_back(probe_attribute_id);
-        }
-        hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
+        hash_table->addProbeSideBloomFilter(
+            bloom_filters[probe_side_bloom_filter.bloom_filter_id()].get());
+
+        hash_table->addProbeSideAttributeId(probe_side_bloom_filter.attr_id());
       }
     }