You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/05/30 23:21:06 UTC

[07/33] incubator-quickstep git commit: Enable semi-join optimization for left-deep trees through bloom filters (#195)

Enable semi-join optimization for left-deep trees through bloom filters (#195)

Link: https://github.com/pivotalsoftware/quickstep/pull/195


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/21b85088
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/21b85088
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/21b85088

Branch: refs/heads/travis-grpc
Commit: 21b85088f5f2b177ade45d3bc86612501b7a92d4
Parents: 6f18495
Author: Saket Saurabh <sa...@users.noreply.github.com>
Authored: Mon May 9 12:29:47 2016 -0500
Committer: Zuyu Zhang <zz...@pivotal.io>
Committed: Mon May 30 15:46:32 2016 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |   2 +
 query_execution/QueryContext.cpp                |  16 +-
 query_execution/QueryContext.hpp                |  57 +++-
 query_execution/QueryContext.proto              |  18 +-
 query_optimizer/CMakeLists.txt                  |  11 +
 query_optimizer/ExecutionGenerator.cpp          |  55 +++-
 query_optimizer/ExecutionGenerator.hpp          |   5 +-
 query_optimizer/ExecutionHeuristics.cpp         | 127 ++++++++
 query_optimizer/ExecutionHeuristics.hpp         | 155 ++++++++++
 query_optimizer/tests/CMakeLists.txt            |  16 +
 .../tests/ExecutionHeuristics_unittest.cpp      | 301 +++++++++++++++++++
 storage/CMakeLists.txt                          |   4 +-
 storage/HashTable.hpp                           | 104 +++++++
 storage/HashTable.proto                         |   8 +
 storage/HashTableFactory.hpp                    |  44 ++-
 utility/BloomFilter.hpp                         | 198 +++++++++++-
 utility/BloomFilter.proto                       |  30 ++
 utility/CMakeLists.txt                          |  15 +
 18 files changed, 1135 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 5887237..04a0348 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -90,9 +90,11 @@ target_link_libraries(quickstep_queryexecution_QueryContext
                       quickstep_storage_InsertDestination_proto
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple
+                      quickstep_utility_BloomFilter
                       quickstep_utility_Macros
                       quickstep_utility_SortConfiguration)
 target_link_libraries(quickstep_queryexecution_QueryContext_proto
+                      quickstep_utility_BloomFilter_proto
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_tablegenerator_GeneratorFunction_proto
                       quickstep_storage_AggregationOperationState_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index b0e9cae..3bfce17 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -37,6 +39,7 @@
 #include "storage/InsertDestination.pb.h"
 #include "types/TypedValue.hpp"
 #include "types/containers/Tuple.hpp"
+#include "utility/BloomFilter.hpp"
 #include "utility/SortConfiguration.hpp"
 
 #include "glog/logging.h"
@@ -65,6 +68,10 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
                                                         storage_manager));
   }
 
+  for (int i = 0; i < proto.bloom_filters_size(); ++i) {
+    bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i)));
+  }
+
   for (int i = 0; i < proto.generator_functions_size(); ++i) {
     const GeneratorFunctionHandle *func_handle =
         GeneratorFunctionFactory::Instance().reconstructFromProto(proto.generator_functions(i));
@@ -76,7 +83,8 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
   for (int i = 0; i < proto.join_hash_tables_size(); ++i) {
     join_hash_tables_.emplace_back(
         JoinHashTableFactory::CreateResizableFromProto(proto.join_hash_tables(i),
-                                                       storage_manager));
+                                                       storage_manager,
+                                                       bloom_filters_));
   }
 
   for (int i = 0; i < proto.insert_destinations_size(); ++i) {
@@ -142,6 +150,12 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
     }
   }
 
+  for (int i = 0; i < proto.bloom_filters_size(); ++i) {
+    if (!BloomFilter::ProtoIsValid(proto.bloom_filters(i))) {
+      return false;
+    }
+  }
+
   // Each GeneratorFunctionHandle object is serialized as a function name with
   // a list of arguments. Here checks that the arguments are valid TypedValue's.
   for (int i = 0; i < proto.generator_functions_size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 0e9e21c..9440fae 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -32,6 +34,7 @@
 #include "storage/HashTable.hpp"
 #include "storage/InsertDestination.hpp"
 #include "types/containers/Tuple.hpp"
+#include "utility/BloomFilter.hpp"
 #include "utility/Macros.hpp"
 #include "utility/SortConfiguration.hpp"
 
@@ -63,6 +66,11 @@ class QueryContext {
   typedef std::uint32_t aggregation_state_id;
 
   /**
+   * @brief A unique identifier for a BloomFilter per query.
+   **/
+  typedef std::uint32_t bloom_filter_id;
+
+  /**
    * @brief A unique identifier for a GeneratorFunctionHandle per query.
    **/
   typedef std::uint32_t generator_function_id;
@@ -181,6 +189,52 @@ class QueryContext {
   }
 
   /**
+   * @brief Whether the given BloomFilter id is valid.
+   *
+   * @param id The BloomFilter id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidBloomFilterId(const bloom_filter_id id) const {
+    return id < bloom_filters_.size();
+  }
+
+  /**
+   * @brief Get a mutable reference to the BloomFilter.
+   *
+   * @param id The BloomFilter id.
+   *
+   * @return The BloomFilter, already created in the constructor.
+   **/
+  inline BloomFilter* getBloomFilterMutable(const bloom_filter_id id) {
+    DCHECK_LT(id, bloom_filters_.size());
+    return bloom_filters_[id].get();
+  }
+
+  /**
+   * @brief Get a constant pointer to the BloomFilter.
+   *
+   * @param id The BloomFilter id.
+   *
+   * @return The constant pointer to BloomFilter that is 
+   *         already created in the constructor.
+   **/
+  inline const BloomFilter* getBloomFilter(const bloom_filter_id id) const {
+    DCHECK_LT(id, bloom_filters_.size());
+    return bloom_filters_[id].get();
+  }
+
+  /**
+   * @brief Destory the given BloomFilter.
+   *
+   * @param id The id of the BloomFilter to destroy.
+   **/
+  inline void destroyBloomFilter(const bloom_filter_id id) {
+    DCHECK_LT(id, bloom_filters_.size());
+    bloom_filters_[id].reset();
+  }
+
+  /**
    * @brief Whether the given GeneratorFunctionHandle id is valid.
    *
    * @param id The GeneratorFunctionHandle id.
@@ -257,7 +311,7 @@ class QueryContext {
    *
    * @param id The JoinHashTable id in the query.
    *
-   * @return The JoinHashTable, alreadly created in the constructor.
+   * @return The JoinHashTable, already created in the constructor.
    **/
   inline JoinHashTable* getJoinHashTable(const join_hash_table_id id) {
     DCHECK_LT(id, join_hash_tables_.size());
@@ -408,6 +462,7 @@ class QueryContext {
 
  private:
   std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_;
+  std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
   std::vector<std::unique_ptr<const GeneratorFunctionHandle>> generator_functions_;
   std::vector<std::unique_ptr<InsertDestination>> insert_destinations_;
   std::vector<std::unique_ptr<JoinHashTable>> join_hash_tables_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index a7c2a25..b37286c 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -23,6 +23,7 @@ import "storage/AggregationOperationState.proto";
 import "storage/HashTable.proto";
 import "storage/InsertDestination.proto";
 import "types/containers/Tuple.proto";
+import "utility/BloomFilter.proto";
 import "utility/SortConfiguration.proto";
 
 message QueryContext {
@@ -42,14 +43,15 @@ message QueryContext {
   }
 
   repeated AggregationOperationState aggregation_states = 1;
-  repeated HashTable join_hash_tables = 2;
-  repeated InsertDestination insert_destinations = 3;
-  repeated Predicate predicates = 4;
-  repeated ScalarGroup scalar_groups = 5;
-  repeated SortConfiguration sort_configs = 6;
-  repeated Tuple tuples = 7;
-  repeated GeneratorFunctionHandle generator_functions = 8;
+  repeated BloomFilter bloom_filters = 2;
+  repeated GeneratorFunctionHandle generator_functions = 3;
+  repeated HashTable join_hash_tables = 4;
+  repeated InsertDestination insert_destinations = 5;
+  repeated Predicate predicates = 6;
+  repeated ScalarGroup scalar_groups = 7;
+  repeated SortConfiguration sort_configs = 8;
+  repeated Tuple tuples = 9;
 
   // NOTE(zuyu): For UpdateWorkOrder only.
-  repeated UpdateGroup update_groups = 9;
+  repeated UpdateGroup update_groups = 10;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 2d09bee..feaecb3 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -35,6 +35,7 @@ add_subdirectory(tests)
 
 # Declare micro-libs:
 add_library(quickstep_queryoptimizer_ExecutionGenerator ExecutionGenerator.cpp ExecutionGenerator.hpp)
+add_library(quickstep_queryoptimizer_ExecutionHeuristics ExecutionHeuristics.cpp ExecutionHeuristics.hpp)
 add_library(quickstep_queryoptimizer_LogicalGenerator LogicalGenerator.cpp LogicalGenerator.hpp)
 add_library(quickstep_queryoptimizer_LogicalToPhysicalMapper
             ../empty_src.cpp
@@ -64,6 +65,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_expressions_scalar_ScalarAttribute
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
+                      quickstep_queryoptimizer_ExecutionHeuristics
                       quickstep_queryoptimizer_OptimizerContext
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_queryoptimizer_QueryPlan
@@ -139,6 +141,14 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                         quickstep_catalog_Catalog_proto)
 endif()
+target_link_libraries(quickstep_queryoptimizer_ExecutionHeuristics
+                      glog
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_QueryContext_proto
+                      quickstep_queryoptimizer_QueryPlan
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_LogicalGenerator
                       glog
                       quickstep_parser_ParseStatement
@@ -211,6 +221,7 @@ target_link_libraries(quickstep_queryoptimizer_Validator
 add_library(quickstep_queryoptimizer ../empty_src.cpp QueryOptimizerModule.hpp)
 target_link_libraries(quickstep_queryoptimizer
                       quickstep_queryoptimizer_ExecutionGenerator
+                      quickstep_queryoptimizer_ExecutionHeuristics
                       quickstep_queryoptimizer_LogicalGenerator
                       quickstep_queryoptimizer_LogicalToPhysicalMapper
                       quickstep_queryoptimizer_Optimizer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 077d35d..7f26e85 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -50,6 +50,7 @@
 #include "expressions/scalar/ScalarAttribute.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/ExecutionHeuristics.hpp"
 #include "query_optimizer/OptimizerContext.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
@@ -144,6 +145,9 @@ static const volatile bool aggregate_hashtable_type_dummy
 
 DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
 
+DEFINE_bool(optimize_joins, false,
+            "Enable post execution plan generation optimizations for joins.");
+
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
 namespace S = ::quickstep::serialization;
@@ -198,6 +202,11 @@ void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) {
         temporary_relation_info.producer_operator_index);
   }
 
+  // Optimize execution plan based on heuristics captured during execution plan generation, if enabled.
+  if (FLAGS_optimize_joins) {
+    execution_heuristics_->optimizeExecutionPlan(execution_plan_, query_context_proto_);
+  }
+
 #ifdef QUICKSTEP_DISTRIBUTED
   catalog_database_cache_proto_->set_name(optimizer_context_->catalog_database()->getName());
 
@@ -576,12 +585,32 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   std::vector<attribute_id> probe_attribute_ids;
   std::vector<attribute_id> build_attribute_ids;
 
+  std::vector<attribute_id> probe_original_attribute_ids;
+  std::vector<attribute_id> build_original_attribute_ids;
+
+  const CatalogRelation *referenced_stored_probe_relation;
+  const CatalogRelation *referenced_stored_build_relation;
+
   bool any_probe_attributes_nullable = false;
   bool any_build_attributes_nullable = false;
 
+  bool skip_hash_join_optimization = false;
+
   const std::vector<E::AttributeReferencePtr> &left_join_attributes =
       physical_plan->left_join_attributes();
   for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) {
+    // Try to determine the original stored relation referenced in the Hash Join.
+    referenced_stored_probe_relation =
+        optimizer_context_->catalog_database()->getRelationByName(left_join_attribute->relation_name());
+    if (referenced_stored_probe_relation == nullptr) {
+      // Hash Join optimizations are not possible, if the referenced relation cannot be determined.
+      skip_hash_join_optimization = true;
+    } else {
+      const attribute_id probe_operator_attribute_id =
+          referenced_stored_probe_relation->getAttributeByName(left_join_attribute->attribute_name())->getID();
+      probe_original_attribute_ids.emplace_back(probe_operator_attribute_id);
+    }
+
     const CatalogAttribute *probe_catalog_attribute
         = attribute_substitution_map_[left_join_attribute->id()];
     probe_attribute_ids.emplace_back(probe_catalog_attribute->getID());
@@ -594,6 +623,18 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   const std::vector<E::AttributeReferencePtr> &right_join_attributes =
       physical_plan->right_join_attributes();
   for (const E::AttributeReferencePtr &right_join_attribute : right_join_attributes) {
+    // Try to determine the original stored relation referenced in the Hash Join.
+    referenced_stored_build_relation =
+        optimizer_context_->catalog_database()->getRelationByName(right_join_attribute->relation_name());
+    if (referenced_stored_build_relation == nullptr) {
+      // Hash Join optimizations are not possible, if the referenced relation cannot be determined.
+      skip_hash_join_optimization = true;
+    } else {
+      const attribute_id build_operator_attribute_id =
+          referenced_stored_build_relation->getAttributeByName(right_join_attribute->attribute_name())->getID();
+      build_original_attribute_ids.emplace_back(build_operator_attribute_id);
+    }
+
     const CatalogAttribute *build_catalog_attribute
         = attribute_substitution_map_[right_join_attribute->id()];
     build_attribute_ids.emplace_back(build_catalog_attribute->getID());
@@ -629,6 +670,8 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
       std::swap(probe_cardinality, build_cardinality);
       std::swap(probe_attribute_ids, build_attribute_ids);
       std::swap(any_probe_attributes_nullable, any_build_attributes_nullable);
+      std::swap(probe_original_attribute_ids, build_original_attribute_ids);
+      std::swap(referenced_stored_probe_relation, referenced_stored_build_relation);
     }
   }
 
@@ -783,6 +826,17 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
       std::forward_as_tuple(join_operator_index,
                             output_relation));
   temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
+
+  // Add heuristics for the Hash Join, if enabled.
+  if (FLAGS_optimize_joins && !skip_hash_join_optimization) {
+    execution_heuristics_->addHashJoinInfo(build_operator_index,
+                                           join_operator_index,
+                                           referenced_stored_build_relation,
+                                           referenced_stored_probe_relation,
+                                           std::move(build_original_attribute_ids),
+                                           std::move(probe_original_attribute_ids),
+                                           join_hash_table_index);
+  }
 }
 
 void ExecutionGenerator::convertNestedLoopsJoin(
@@ -895,7 +949,6 @@ void ExecutionGenerator::convertCopyFrom(
                                        false /* is_pipeline_breaker */);
 }
 
-
 void ExecutionGenerator::convertCreateIndex(
   const P::CreateIndexPtr &physical_plan) {
   // CreateIndex is converted to a CreateIndex operator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 7c563d4..0630bca 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -33,6 +33,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/ExecutionHeuristics.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
@@ -102,7 +103,8 @@ class ExecutionGenerator {
       : optimizer_context_(DCHECK_NOTNULL(optimizer_context)),
         query_handle_(DCHECK_NOTNULL(query_handle)),
         execution_plan_(DCHECK_NOTNULL(query_handle->getQueryPlanMutable())),
-        query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())) {
+        query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())),
+        execution_heuristics_(new ExecutionHeuristics()) {
 #ifdef QUICKSTEP_DISTRIBUTED
     catalog_database_cache_proto_ = DCHECK_NOTNULL(query_handle->getCatalogDatabaseCacheProtoMutable());
 #endif
@@ -376,6 +378,7 @@ class ExecutionGenerator {
   QueryHandle *query_handle_;
   QueryPlan *execution_plan_;  // A part of QueryHandle.
   serialization::QueryContext *query_context_proto_;  // A part of QueryHandle.
+  std::unique_ptr<ExecutionHeuristics> execution_heuristics_;
 
 #ifdef QUICKSTEP_DISTRIBUTED
   serialization::CatalogDatabase *catalog_database_cache_proto_;  // A part of QueryHandle.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/query_optimizer/ExecutionHeuristics.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.cpp b/query_optimizer/ExecutionHeuristics.cpp
new file mode 100644
index 0000000..fc31c53
--- /dev/null
+++ b/query_optimizer/ExecutionHeuristics.cpp
@@ -0,0 +1,127 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_optimizer/ExecutionHeuristics.hpp"
+
+#include <cstddef>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/QueryPlan.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
+                                                serialization::QueryContext *query_context_proto) {
+  // Currently this only optimizes left deep joins using bloom filters.
+  // It uses a simple algorithm to discover the left deep joins.
+  // It starts with the first hash join in the plan and keeps on iterating
+  // over the next hash joins, till a probe on a different relation id is found.
+  // The set of hash joins found in this way forms a chain and can be recognized
+  // as a left deep join. It becomes a candidate for optimization.
+
+  // The optimization is done by modifying each of the build operators in the chain
+  // to generate a bloom filter on the build key during their hash table creation.
+  // The leaf-level probe operator is then modified to query all the bloom
+  // filters generated from all the build operators in the chain. These
+  // bloom filters are queried to test the membership of the probe key
+  // just prior to probing the hash table.
+
+  QueryPlan::DAGNodeIndex origin_node = 0;
+  while (origin_node < hash_joins_.size() - 1) {
+    std::vector<std::size_t> chained_nodes;
+    chained_nodes.push_back(origin_node);
+    for (std::size_t i = origin_node + 1; i < hash_joins_.size(); ++i) {
+      const relation_id checked_relation_id = hash_joins_[origin_node].referenced_stored_probe_relation_->getID();
+      const relation_id expected_relation_id = hash_joins_[i].referenced_stored_probe_relation_->getID();
+      if (checked_relation_id == expected_relation_id) {
+        chained_nodes.push_back(i);
+      } else {
+        break;
+      }
+    }
+
+    // Only chains of length greater than one are suitable candidates for semi-join optimization.
+    if (chained_nodes.size() > 1) {
+      std::unordered_map<QueryContext::bloom_filter_id, std::vector<attribute_id>> probe_bloom_filter_info;
+      for (const std::size_t node : chained_nodes) {
+        // Provision for a new bloom filter to be used by the build operator.
+        const QueryContext::bloom_filter_id bloom_filter_id =  query_context_proto->bloom_filters_size();
+        serialization::BloomFilter *bloom_filter_proto = query_context_proto->add_bloom_filters();
+
+        // Modify the bloom filter properties based on the statistics of the relation.
+        setBloomFilterProperties(bloom_filter_proto, hash_joins_[node].referenced_stored_build_relation_);
+
+        // Add build-side bloom filter information to the corresponding hash table proto.
+        query_context_proto->mutable_join_hash_tables(hash_joins_[node].join_hash_table_id_)
+            ->add_build_side_bloom_filter_id(bloom_filter_id);
+
+        probe_bloom_filter_info.insert(std::make_pair(bloom_filter_id, hash_joins_[node].probe_attributes_));
+      }
+
+      // Add probe-side bloom filter information to the corresponding hash table proto for each build-side bloom filter.
+      for (const std::pair<QueryContext::bloom_filter_id, std::vector<attribute_id>>
+               &bloom_filter_info : probe_bloom_filter_info) {
+        auto *probe_side_bloom_filter =
+            query_context_proto->mutable_join_hash_tables(hash_joins_[origin_node].join_hash_table_id_)
+                                  ->add_probe_side_bloom_filters();
+        probe_side_bloom_filter->set_probe_side_bloom_filter_id(bloom_filter_info.first);
+        for (const attribute_id &probe_attribute_id : bloom_filter_info.second) {
+          probe_side_bloom_filter->add_probe_side_attr_ids(probe_attribute_id);
+        }
+      }
+
+      // Add node dependencies from chained build nodes to origin node probe.
+      for (std::size_t i = 1; i < chained_nodes.size(); ++i) {  // Note: It starts from index 1.
+        query_plan->addDirectDependency(hash_joins_[origin_node].join_operator_index_,
+                                        hash_joins_[origin_node + i].build_operator_index_,
+                                        true /* is_pipeline_breaker */);
+      }
+    }
+
+    // Update the origin node.
+    origin_node = chained_nodes.back() + 1;
+  }
+}
+
+void ExecutionHeuristics::setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,
+                                                   const CatalogRelation *relation) {
+  const std::size_t cardinality = relation->estimateTupleCardinality();
+  if (cardinality < kOneThousand) {
+    bloom_filter_proto->set_bloom_filter_size(kOneThousand / kCompressionFactor);
+    bloom_filter_proto->set_number_of_hashes(kVeryLowSparsityHash);
+  } else if (cardinality < kTenThousand) {
+    bloom_filter_proto->set_bloom_filter_size(kTenThousand / kCompressionFactor);
+    bloom_filter_proto->set_number_of_hashes(kLowSparsityHash);
+  } else if (cardinality < kHundredThousand) {
+    bloom_filter_proto->set_bloom_filter_size(kHundredThousand / kCompressionFactor);
+    bloom_filter_proto->set_number_of_hashes(kMediumSparsityHash);
+  } else {
+    bloom_filter_proto->set_bloom_filter_size(kMillion / kCompressionFactor);
+    bloom_filter_proto->set_number_of_hashes(kHighSparsityHash);
+  }
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/query_optimizer/ExecutionHeuristics.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.hpp b/query_optimizer/ExecutionHeuristics.hpp
new file mode 100644
index 0000000..92a7fe8
--- /dev/null
+++ b/query_optimizer/ExecutionHeuristics.hpp
@@ -0,0 +1,155 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_HEURISTICS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_HEURISTICS_HPP_
+
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/QueryPlan.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup QueryOptimizer
+ *  @{
+ */
+
+/**
+ * @brief The ExecutionHeuristics compiles certain heuristics for an execution plan
+ *        as it is being converted to a physical plan. These heuristics can then be
+ *        used to optimize the execution plan after it has been generated.
+ **/
+class ExecutionHeuristics {
+ public:
+  static const std::size_t kOneHundred = 100;
+  static const std::size_t kOneThousand = 1000;
+  static const std::size_t kTenThousand = 10000;
+  static const std::size_t kHundredThousand = 100000;
+  static const std::size_t kMillion = 1000000;
+
+  static const std::size_t kCompressionFactor = 10;
+
+  static const std::size_t kVeryLowSparsityHash = 1;
+  static const std::size_t kLowSparsityHash = 2;
+  static const std::size_t kMediumSparsityHash = 5;
+  static const std::size_t kHighSparsityHash = 10;
+
+  /**
+   * @brief A simple internal class that holds information about various
+   *        hash joins within the execution plan for a query.
+   **/
+  struct HashJoinInfo {
+    HashJoinInfo(const QueryPlan::DAGNodeIndex build_operator_index,
+                 const QueryPlan::DAGNodeIndex join_operator_index,
+                 const CatalogRelation *referenced_stored_build_relation,
+                 const CatalogRelation *referenced_stored_probe_relation,
+                 std::vector<attribute_id> &&build_attributes,
+                 std::vector<attribute_id> &&probe_attributes,
+                 const QueryContext::join_hash_table_id join_hash_table_id)
+        : build_operator_index_(build_operator_index),
+          join_operator_index_(join_operator_index),
+          referenced_stored_build_relation_(referenced_stored_build_relation),
+          referenced_stored_probe_relation_(referenced_stored_probe_relation),
+          build_attributes_(std::move(build_attributes)),
+          probe_attributes_(std::move(probe_attributes)),
+          join_hash_table_id_(join_hash_table_id) {
+    }
+
+    const QueryPlan::DAGNodeIndex build_operator_index_;
+    const QueryPlan::DAGNodeIndex join_operator_index_;
+    const CatalogRelation *referenced_stored_build_relation_;
+    const CatalogRelation *referenced_stored_probe_relation_;
+    const std::vector<attribute_id> build_attributes_;
+    const std::vector<attribute_id> probe_attributes_;
+    const QueryContext::join_hash_table_id join_hash_table_id_;
+  };
+
+
+  /**
+   * @brief Constructor.
+   **/
+  ExecutionHeuristics() {}
+
+  /**
+   * @brief Saves information about a hash join used within the execution plan
+   *        for a query.
+   *
+   * @param build_operator_index Index of the build operator of the hash join.
+   * @param join_operator_index Index of the join operator of the hash join.
+   * @param build_relation_id Id of the relation on which hash table is being built.
+   * @param probe_relation_id Id of the relation on which hash table is being probed.
+   * @param build_attributes List of attributes on which hash table is being built.
+   * @param probe_attributes List of attributes on which hash table is being probed.
+   * @param join_hash_table_id Id of the hash table which refers to the actual hash
+   *        table within the query context.
+   **/
+  inline void addHashJoinInfo(const QueryPlan::DAGNodeIndex build_operator_index,
+                              const QueryPlan::DAGNodeIndex join_operator_index,
+                              const CatalogRelation *referenced_stored_build_relation,
+                              const CatalogRelation *referenced_stored_probe_relation,
+                              std::vector<attribute_id> &&build_attributes,
+                              std::vector<attribute_id> &&probe_attributes,
+                              const QueryContext::join_hash_table_id join_hash_table_id) {
+    hash_joins_.push_back(HashJoinInfo(build_operator_index,
+                                       join_operator_index,
+                                       referenced_stored_build_relation,
+                                       referenced_stored_probe_relation,
+                                       std::move(build_attributes),
+                                       std::move(probe_attributes),
+                                       join_hash_table_id));
+  }
+
+  /**
+   * @brief Optimize the execution plan based on heuristics generated
+   *        during physical plan to execution plan conversion.
+   *
+   * @param query_plan A mutable reference to the query execution plan.
+   * @param query_context_proto A mutable reference to the protobuf representation
+   *        of the query context.
+   **/
+  void optimizeExecutionPlan(QueryPlan *query_plan, serialization::QueryContext *query_context_proto);
+
+  /**
+   * @brief Set the properties of the bloom filter proto based on the statistics
+   *        of the given relation.
+   *
+   * @param bloom_filter_proto A mutable reference to the bloom filter protobuf representation.
+   * @param relation The catalog relation on which bloom filter is being built.
+   **/
+  void setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,
+                                const CatalogRelation *relation);
+
+ private:
+  std::vector<HashJoinInfo> hash_joins_;
+
+  DISALLOW_COPY_AND_ASSIGN(ExecutionHeuristics);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif /* QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_HEURISTICS_HPP_ */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 1d2fa10..5647bfd 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -78,6 +78,22 @@ add_executable(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
                ExecutionGeneratorTestRunner.hpp
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+add_executable(ExecutionHeuristics_unittest ExecutionHeuristics_unittest.cpp)
+target_link_libraries(ExecutionHeuristics_unittest
+                      gtest
+                      gtest_main
+                      quickstep_catalog_Catalog
+                      quickstep_catalog_CatalogDatabase
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_QueryContext_proto
+                      quickstep_queryoptimizer_ExecutionHeuristics
+                      quickstep_queryoptimizer_QueryPlan
+                      quickstep_relationaloperators_BuildHashOperator
+                      quickstep_relationaloperators_HashJoinOperator
+                      quickstep_utility_Macros)
+add_test(ExecutionHeuristics_unittest ExecutionHeuristics_unittest)
+
 add_executable(quickstep_queryoptimizer_tests_OptimizerTextTest
                OptimizerTextTest.cpp
                OptimizerTextTestRunner.cpp

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionHeuristics_unittest.cpp b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
new file mode 100644
index 0000000..12acaff
--- /dev/null
+++ b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
@@ -0,0 +1,301 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "catalog/Catalog.hpp"
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/ExecutionHeuristics.hpp"
+#include "query_optimizer/QueryPlan.hpp"
+#include "relational_operators/BuildHashOperator.hpp"
+#include "relational_operators/HashJoinOperator.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+namespace quickstep {
+namespace optimizer {
+
+class ExecutionHeuristicsTest : public ::testing::Test {
+ protected:
+  virtual void SetUp() {
+    db_ = cat_.getDatabaseByIdMutable(cat_.addDatabase(new CatalogDatabase(nullptr, "db")));
+    execution_heuristics_.reset(new ExecutionHeuristics());
+    query_plan_.reset(new QueryPlan());
+    query_context_proto_.reset(new serialization::QueryContext());
+  }
+
+  CatalogRelation* createCatalogRelation(const std::string &name, bool temporary = false) {
+    return db_->getRelationByIdMutable(db_->addRelation(new CatalogRelation(nullptr, name, -1, temporary)));
+  }
+
+  void addDummyHashJoinInfo(ExecutionHeuristics *execution_heuristics,
+                            const QueryPlan::DAGNodeIndex build_operator_index,
+                            const QueryPlan::DAGNodeIndex join_operator_index,
+                            const CatalogRelation *build_relation,
+                            const CatalogRelation *probe_relation,
+                            const attribute_id build_attribute_id,
+                            const attribute_id probe_attribute_id,
+                            const QueryContext::join_hash_table_id join_hash_table_id) {
+    std::vector<attribute_id> build_attribute_ids(1, build_attribute_id);
+    std::vector<attribute_id> probe_attribute_ids(1, probe_attribute_id);
+    execution_heuristics->addHashJoinInfo(build_operator_index,
+                                          join_operator_index,
+                                          build_relation,
+                                          probe_relation,
+                                          std::move(build_attribute_ids),
+                                          std::move(probe_attribute_ids),
+                                          join_hash_table_id);
+  }
+
+  QueryPlan::DAGNodeIndex createDummyBuildHashOperator(QueryPlan *query_plan,
+                                                       const CatalogRelation *build_relation,
+                                                       const attribute_id build_attribute_id,
+                                                       const QueryContext::join_hash_table_id join_hash_table_index) {
+    std::vector<attribute_id> build_attribute_ids;
+    build_attribute_ids.push_back(build_attribute_id);
+    QueryPlan::DAGNodeIndex build_operator_index =
+        query_plan->addRelationalOperator(new BuildHashOperator(*build_relation,
+                                                                true,
+                                                                build_attribute_ids,
+                                                                false,
+                                                                join_hash_table_index));
+    return build_operator_index;
+  }
+
+  QueryPlan::DAGNodeIndex createDummyHashJoinOperator(QueryPlan *query_plan,
+                                                      const CatalogRelation *build_relation,
+                                                      const CatalogRelation *probe_relation,
+                                                      const attribute_id probe_attribute_id,
+                                                      const QueryContext::join_hash_table_id join_hash_table_index) {
+    std::vector<attribute_id> probe_attribute_ids;
+    probe_attribute_ids.push_back(probe_attribute_id);
+    QueryPlan::DAGNodeIndex join_operator_index =
+        query_plan->addRelationalOperator(new HashJoinOperator(*build_relation,
+                                                               *probe_relation,
+                                                               true,
+                                                               probe_attribute_ids,
+                                                               false,
+                                                               *probe_relation,
+                                                               0,
+                                                               join_hash_table_index,
+                                                               0,
+                                                               0));
+    return join_operator_index;
+  }
+
+  Catalog cat_;
+  CatalogDatabase *db_;  // db_ is owned by cat_.
+  std::unique_ptr<QueryPlan> query_plan_;
+  std::unique_ptr<serialization::QueryContext> query_context_proto_;
+  std::unique_ptr<ExecutionHeuristics> execution_heuristics_;
+};
+
+TEST_F(ExecutionHeuristicsTest, HashJoinOptimizedTest) {
+  // This test case creates three hash joins, all of which are being probed on the same relation.
+  // Since the probe are being made on the same relation, ExecutionHeuristics should optimize
+  // these hash joins using bloom filters.
+
+  const CatalogRelation *build_relation_1 = createCatalogRelation("build_relation_1");
+  const CatalogRelation *build_relation_2 = createCatalogRelation("build_relation_2");
+  const CatalogRelation *build_relation_3 = createCatalogRelation("build_relation_3");
+  const CatalogRelation *probe_relation_1 = createCatalogRelation("probe_relation_1");
+
+  const attribute_id build_attribute_id_1 = 0;
+  const attribute_id build_attribute_id_2 = 0;
+  const attribute_id build_attribute_id_3 = 0;
+  const attribute_id probe_attribute_id_1 = 1;
+  const attribute_id probe_attribute_id_2 = 2;
+  const attribute_id probe_attribute_id_3 = 3;
+
+  const QueryContext::join_hash_table_id join_hash_table_index_1 = 0;
+  const QueryContext::join_hash_table_id join_hash_table_index_2 = 1;
+  const QueryContext::join_hash_table_id join_hash_table_index_3 = 2;
+  query_context_proto_->add_join_hash_tables();
+  query_context_proto_->add_join_hash_tables();
+  query_context_proto_->add_join_hash_tables();
+
+  const QueryPlan::DAGNodeIndex build_operator_index_1 = createDummyBuildHashOperator(query_plan_.get(),
+                                                                                      build_relation_1,
+                                                                                      build_attribute_id_1,
+                                                                                      join_hash_table_index_1);
+  const QueryPlan::DAGNodeIndex probe_operator_index_1 = createDummyHashJoinOperator(query_plan_.get(),
+                                                                                     build_relation_1,
+                                                                                     probe_relation_1,
+                                                                                     probe_attribute_id_1,
+                                                                                     join_hash_table_index_1);
+  const QueryPlan::DAGNodeIndex build_operator_index_2 = createDummyBuildHashOperator(query_plan_.get(),
+                                                                                      build_relation_2,
+                                                                                      build_attribute_id_2,
+                                                                                      join_hash_table_index_2);
+  const QueryPlan::DAGNodeIndex probe_operator_index_2 = createDummyHashJoinOperator(query_plan_.get(),
+                                                                                     build_relation_2,
+                                                                                     probe_relation_1,
+                                                                                     probe_attribute_id_2,
+                                                                                     join_hash_table_index_2);
+  const QueryPlan::DAGNodeIndex build_operator_index_3 = createDummyBuildHashOperator(query_plan_.get(),
+                                                                                      build_relation_3,
+                                                                                      build_attribute_id_3,
+                                                                                      join_hash_table_index_3);
+  const QueryPlan::DAGNodeIndex probe_operator_index_3 = createDummyHashJoinOperator(query_plan_.get(),
+                                                                                     build_relation_3,
+                                                                                     probe_relation_1,
+                                                                                     probe_attribute_id_3,
+                                                                                     join_hash_table_index_3);
+
+  addDummyHashJoinInfo(execution_heuristics_.get(),
+                       build_operator_index_1,
+                       probe_operator_index_1,
+                       build_relation_1,
+                       probe_relation_1,
+                       build_attribute_id_1,
+                       probe_attribute_id_1,
+                       join_hash_table_index_1);
+  addDummyHashJoinInfo(execution_heuristics_.get(),
+                       build_operator_index_2,
+                       probe_operator_index_2,
+                       build_relation_2,
+                       probe_relation_1,
+                       build_attribute_id_2,
+                       probe_attribute_id_2,
+                       join_hash_table_index_2);
+  addDummyHashJoinInfo(execution_heuristics_.get(),
+                       build_operator_index_3,
+                       probe_operator_index_3,
+                       build_relation_3,
+                       probe_relation_1,
+                       build_attribute_id_3,
+                       probe_attribute_id_3,
+                       join_hash_table_index_3);
+
+  execution_heuristics_->optimizeExecutionPlan(query_plan_.get(), query_context_proto_.get());
+
+  // Test whether correct number of bloom filters were added.
+  EXPECT_EQ(1, query_context_proto_->join_hash_tables(0).build_side_bloom_filter_id_size());
+  EXPECT_EQ(1, query_context_proto_->join_hash_tables(1).build_side_bloom_filter_id_size());
+  EXPECT_EQ(1, query_context_proto_->join_hash_tables(2).build_side_bloom_filter_id_size());
+  EXPECT_EQ(3, query_context_proto_->join_hash_tables(0).probe_side_bloom_filters_size());
+
+  // Test that the DAG was modified correctly or not.
+  // Probe operator 1 should have now build operator 1 and build operator 2 added as dependencies.
+  auto const probe_node_dependencies = query_plan_->getQueryPlanDAG().getDependencies(probe_operator_index_1);
+  EXPECT_EQ(1u, probe_node_dependencies.count(build_operator_index_2));
+  EXPECT_EQ(1u, probe_node_dependencies.count(build_operator_index_3));
+}
+
+TEST_F(ExecutionHeuristicsTest, HashJoinNotOptimizedTest) {
+  // This test case creates three hash joins, all of which are being probed on different relations.
+  // Since the probe are being made on the different relations, ExecutionHeuristics should optimize
+  // these hash joins using bloom filters.
+
+  const CatalogRelation *build_relation_1 = createCatalogRelation("build_relation_1");
+  const CatalogRelation *build_relation_2 = createCatalogRelation("build_relation_2");
+  const CatalogRelation *build_relation_3 = createCatalogRelation("build_relation_3");
+  const CatalogRelation *probe_relation_1 = createCatalogRelation("probe_relation_1");
+  const CatalogRelation *probe_relation_2 = createCatalogRelation("probe_relation_2");
+  const CatalogRelation *probe_relation_3 = createCatalogRelation("probe_relation_3");
+
+  const attribute_id build_attribute_id_1 = 0;
+  const attribute_id build_attribute_id_2 = 0;
+  const attribute_id build_attribute_id_3 = 0;
+  const attribute_id probe_attribute_id_1 = 1;
+  const attribute_id probe_attribute_id_2 = 2;
+  const attribute_id probe_attribute_id_3 = 3;
+
+  const QueryContext::join_hash_table_id join_hash_table_index_1 = 0;
+  const QueryContext::join_hash_table_id join_hash_table_index_2 = 1;
+  const QueryContext::join_hash_table_id join_hash_table_index_3 = 2;
+  query_context_proto_->add_join_hash_tables();
+  query_context_proto_->add_join_hash_tables();
+  query_context_proto_->add_join_hash_tables();
+
+  const QueryPlan::DAGNodeIndex build_operator_index_1 = createDummyBuildHashOperator(query_plan_.get(),
+                                                                                      build_relation_1,
+                                                                                      build_attribute_id_1,
+                                                                                      join_hash_table_index_1);
+  const QueryPlan::DAGNodeIndex probe_operator_index_1 = createDummyHashJoinOperator(query_plan_.get(),
+                                                                                     build_relation_1,
+                                                                                     probe_relation_1,
+                                                                                     probe_attribute_id_1,
+                                                                                     join_hash_table_index_1);
+  const QueryPlan::DAGNodeIndex build_operator_index_2 = createDummyBuildHashOperator(query_plan_.get(),
+                                                                                      build_relation_2,
+                                                                                      build_attribute_id_2,
+                                                                                      join_hash_table_index_2);
+  const QueryPlan::DAGNodeIndex probe_operator_index_2 = createDummyHashJoinOperator(query_plan_.get(),
+                                                                                     build_relation_2,
+                                                                                     probe_relation_2,
+                                                                                     probe_attribute_id_2,
+                                                                                     join_hash_table_index_2);
+  const QueryPlan::DAGNodeIndex build_operator_index_3 = createDummyBuildHashOperator(query_plan_.get(),
+                                                                                      build_relation_3,
+                                                                                      build_attribute_id_3,
+                                                                                      join_hash_table_index_3);
+  const QueryPlan::DAGNodeIndex probe_operator_index_3 = createDummyHashJoinOperator(query_plan_.get(),
+                                                                                     build_relation_3,
+                                                                                     probe_relation_3,
+                                                                                     probe_attribute_id_3,
+                                                                                     join_hash_table_index_3);
+
+  addDummyHashJoinInfo(execution_heuristics_.get(),
+                       build_operator_index_1,
+                       probe_operator_index_1,
+                       build_relation_1,
+                       probe_relation_1,
+                       build_attribute_id_1,
+                       probe_attribute_id_1,
+                       join_hash_table_index_1);
+  addDummyHashJoinInfo(execution_heuristics_.get(),
+                       build_operator_index_2,
+                       probe_operator_index_2,
+                       build_relation_2,
+                       probe_relation_2,
+                       build_attribute_id_2,
+                       probe_attribute_id_2,
+                       join_hash_table_index_2);
+  addDummyHashJoinInfo(execution_heuristics_.get(),
+                       build_operator_index_3,
+                       probe_operator_index_3,
+                       build_relation_3,
+                       probe_relation_3,
+                       build_attribute_id_3,
+                       probe_attribute_id_3,
+                       join_hash_table_index_3);
+
+  execution_heuristics_->optimizeExecutionPlan(query_plan_.get(), query_context_proto_.get());
+
+  // Test that no bloom filters were added.
+  EXPECT_EQ(0, query_context_proto_->join_hash_tables(0).build_side_bloom_filter_id_size());
+  EXPECT_EQ(0, query_context_proto_->join_hash_tables(1).build_side_bloom_filter_id_size());
+  EXPECT_EQ(0, query_context_proto_->join_hash_tables(2).build_side_bloom_filter_id_size());
+  EXPECT_EQ(0, query_context_proto_->join_hash_tables(0).probe_side_bloom_filters_size());
+
+  // Test that the DAG was not modified at all.
+  // Probe operator 1 should not have build operator 1 and build operator 2 added as dependencies.
+  auto probe_node_dependencies = query_plan_->getQueryPlanDAG().getDependencies(probe_operator_index_1);
+  EXPECT_EQ(0u, probe_node_dependencies.count(build_operator_index_2));
+  EXPECT_EQ(0u, probe_node_dependencies.count(build_operator_index_3));
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index dacacfa..115248c 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -629,6 +629,7 @@ target_link_libraries(quickstep_storage_HashTable
                       quickstep_threading_SpinSharedMutex
                       quickstep_types_Type
                       quickstep_types_TypedValue
+                      quickstep_utility_BloomFilter
                       quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_HashTableBase
@@ -648,6 +649,7 @@ target_link_libraries(quickstep_storage_HashTableFactory
                       quickstep_types_Type
                       quickstep_types_TypeFactory
                       quickstep_types_TypedValue
+                      quickstep_utility_BloomFilter
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_HashTableKeyManager
                       glog
@@ -1196,7 +1198,7 @@ target_link_libraries(BloomFilterIndexSubBlock_unittest
 add_test(BloomFilterIndexSubBlock_unittest BloomFilterIndexSubBlock_unittest)
 
 if(QUICKSTEP_HAVE_BITWEAVING)
-  add_executable(BitWeavingIndexSubBlock_unittest 
+  add_executable(BitWeavingIndexSubBlock_unittest
                  "${CMAKE_CURRENT_SOURCE_DIR}/bitweaving/tests/BitWeavingIndexSubBlock_unittest.cpp")
   target_link_libraries(BitWeavingIndexSubBlock_unittest
                         glog

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index 667848e..be31fd9 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -38,6 +38,7 @@
 #include "threading/SpinSharedMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
+#include "utility/BloomFilter.hpp"
 #include "utility/HashPair.hpp"
 #include "utility/Macros.hpp"
 
@@ -990,6 +991,61 @@ class HashTable : public HashTableBase<resizable,
   template <typename FunctorT>
   std::size_t forEachCompositeKey(FunctorT *functor) const;
 
+  /**
+   * @brief A call to this function will cause a bloom filter to be built
+   *        during the build phase of this hash table.
+   **/
+  inline void enableBuildSideBloomFilter() {
+    has_build_side_bloom_filter_ = true;
+  }
+
+  /**
+   * @brief A call to this function will cause a set of bloom filters to be
+   *        probed during the probe phase of this hash table.
+   **/
+  inline void enableProbeSideBloomFilter() {
+    has_probe_side_bloom_filter_ = true;
+  }
+
+  /**
+   * @brief This function sets the pointer to the bloom filter to be
+   *        used during the build phase of this hash table.
+   * @warning Should call enable_build_side_bloom_filter() first to enable
+   *          bloom filter usage during build phase.
+   * @note The ownership of the bloom filter lies with the caller.
+   *
+   * @param bloom_filter The pointer to the bloom filter.
+   **/
+  inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) {
+    build_bloom_filter_ = bloom_filter;
+  }
+
+  /**
+   * @brief This function adds a pointer to the list of bloom filters to be
+   *        used during the probe phase of this hash table.
+   * @warning Should call enable_probe_side_bloom_filter() first to enable
+   *          bloom filter usage during probe phase.
+   * @note The ownership of the bloom filter lies with the caller.
+   *
+   * @param bloom_filter The pointer to the bloom filter.
+   **/
+  inline void addProbeSideBloomFilter(const BloomFilter *bloom_filter) {
+    probe_bloom_filters_.emplace_back(bloom_filter);
+  }
+
+  /**
+   * @brief This function adds a vector of attribute ids corresponding to a
+   *        bloom filter used during the probe phase of this hash table.
+   * @warning Should call enable_probe_side_bloom_filter() first to enable
+   *          bloom filter usage during probe phase.
+   *
+   * @param probe_attribute_ids The vector of attribute ids to use for probing
+   *        the bloom filter.
+   **/
+  inline void addProbeSideAttributeIds(std::vector<attribute_id> &&probe_attribute_ids) {
+    probe_attribute_ids_.push_back(probe_attribute_ids);
+  }
+
  protected:
   /**
    * @brief Constructor for new resizable hash table.
@@ -1270,6 +1326,13 @@ class HashTable : public HashTableBase<resizable,
                                    const attribute_id key_attr_id,
                                    FunctorT *functor) const;
 
+  // Data structures used for bloom filter optimized semi-joins.
+  bool has_build_side_bloom_filter_ = false;
+  bool has_probe_side_bloom_filter_ = false;
+  BloomFilter *build_bloom_filter_;
+  std::vector<const BloomFilter*> probe_bloom_filters_;
+  std::vector<std::vector<attribute_id>> probe_attribute_ids_;
+
   DISALLOW_COPY_AND_ASSIGN(HashTable);
 };
 
@@ -1414,6 +1477,12 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                                         &prealloc_state);
       }
     }
+    std::unique_ptr<BloomFilter> thread_local_bloom_filter;
+    if (has_build_side_bloom_filter_) {
+      thread_local_bloom_filter.reset(new BloomFilter(build_bloom_filter_->getRandomSeed(),
+                                                      build_bloom_filter_->getNumberOfHashes(),
+                                                      build_bloom_filter_->getBitArraySize()));
+    }
     if (resizable) {
       while (result == HashTablePutResult::kOutOfSpace) {
         {
@@ -1429,6 +1498,11 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                        variable_size,
                                        (*functor)(*accessor),
                                        using_prealloc ? &prealloc_state : nullptr);
+            // Insert into bloom filter, if enabled.
+            if (has_build_side_bloom_filter_) {
+              thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
+                                                      key.getDataSize());
+            }
             if (result == HashTablePutResult::kDuplicateKey) {
               DEBUG_ASSERT(!using_prealloc);
               return result;
@@ -1454,11 +1528,20 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                    variable_size,
                                    (*functor)(*accessor),
                                    using_prealloc ? &prealloc_state : nullptr);
+        // Insert into bloom filter, if enabled.
+        if (has_build_side_bloom_filter_) {
+          thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
+                                                  key.getDataSize());
+        }
         if (result != HashTablePutResult::kOK) {
           return result;
         }
       }
     }
+    // Update the build side bloom filter with thread local copy, if available.
+    if (has_build_side_bloom_filter_) {
+      build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get());
+    }
 
     return HashTablePutResult::kOK;
   });
@@ -2164,6 +2247,27 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
       accessor,
       [&](auto *accessor) -> void {  // NOLINT(build/c++11)
     while (accessor->next()) {
+      // Probe any bloom filters, if enabled.
+      if (has_probe_side_bloom_filter_) {
+        DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size());
+        // Check if the key is contained in the BloomFilters or not.
+        bool bloom_miss = false;
+        for (std::size_t i = 0; i < probe_bloom_filters_.size() && !bloom_miss; ++i) {
+          const BloomFilter *bloom_filter = probe_bloom_filters_[i];
+          for (const attribute_id &attr_id : probe_attribute_ids_[i]) {
+            TypedValue bloom_key = accessor->getTypedValue(attr_id);
+            if (!bloom_filter->contains(static_cast<const std::uint8_t*>(bloom_key.getDataPtr()),
+                                        bloom_key.getDataSize())) {
+              bloom_miss = true;
+              break;
+            }
+          }
+        }
+        if (bloom_miss) {
+          continue;  // On a bloom filter miss, probing the hash table can be skipped.
+        }
+      }
+
       TypedValue key = accessor->getTypedValue(key_attr_id);
       if (check_for_null_keys && key.isNull()) {
         continue;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index 653c3a7..7f00f29 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -1,5 +1,7 @@
 //   Copyright 2011-2015 Quickstep Technologies LLC.
 //   Copyright 2015-2016 Pivotal Software, Inc.
+//   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+//    University of Wisconsin\u2014Madison.
 //
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -32,4 +34,10 @@ message HashTable {
   required HashTableImplType hash_table_impl_type = 1;
   repeated Type key_types = 2;
   required uint64 estimated_num_entries = 3;
+  repeated uint32 build_side_bloom_filter_id = 4;
+  message ProbeSideBloomFilter {
+    required uint32 probe_side_bloom_filter_id = 1;
+    repeated uint32 probe_side_attr_ids = 2;
+  }
+  repeated ProbeSideBloomFilter probe_side_bloom_filters = 6;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 94a0721..34baaeb 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -29,6 +29,7 @@
 #include "storage/SimpleScalarSeparateChainingHashTable.hpp"
 #include "storage/TupleReference.hpp"
 #include "types/TypeFactory.hpp"
+#include "utility/BloomFilter.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -291,11 +292,14 @@ class HashTableFactory {
    * @param proto A protobuf description of a resizable HashTable.
    * @param storage_manager The StorageManager to use (a StorageBlob will be
    *        allocated to hold the HashTable's contents).
+   * @param bloom_filters A vector of pointers to bloom filters that may be used
+   *        during hash table construction in build/probe phase.
    * @return A new resizable HashTable with parameters specified by proto.
    **/
   static HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>*
       CreateResizableFromProto(const serialization::HashTable &proto,
-                               StorageManager *storage_manager) {
+                               StorageManager *storage_manager,
+                               const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
     DCHECK(ProtoIsValid(proto))
         << "Attempted to create HashTable from invalid proto description:\n"
         << proto.DebugString();
@@ -305,10 +309,40 @@ class HashTableFactory {
       key_types.emplace_back(&TypeFactory::ReconstructFromProto(proto.key_types(i)));
     }
 
-    return CreateResizable(HashTableImplTypeFromProto(proto.hash_table_impl_type()),
-                           key_types,
-                           proto.estimated_num_entries(),
-                           storage_manager);
+    auto hash_table = CreateResizable(HashTableImplTypeFromProto(proto.hash_table_impl_type()),
+                                      key_types,
+                                      proto.estimated_num_entries(),
+                                      storage_manager);
+
+    // TODO(ssaurabh): These lazy initializations can be moved from here and pushed to the
+    //                 individual implementations of the hash table constructors.
+
+    // Check if there are any build side bloom filter defined on the hash table.
+    if (proto.build_side_bloom_filter_id_size() > 0) {
+      hash_table->enableBuildSideBloomFilter();
+      hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
+    }
+
+    // Check if there are any probe side bloom filters defined on the hash table.
+    if (proto.probe_side_bloom_filters_size() > 0) {
+      hash_table->enableProbeSideBloomFilter();
+      // Add as many probe bloom filters as defined by the proto.
+      for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
+        // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
+        const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
+        hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
+
+        // Add the attribute ids corresponding to this probe bloom filter.
+        std::vector<attribute_id> probe_attribute_ids;
+        for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
+          const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
+          probe_attribute_ids.push_back(probe_attribute_id);
+        }
+        hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
+      }
+    }
+
+    return hash_table;
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/utility/BloomFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/BloomFilter.hpp b/utility/BloomFilter.hpp
index 1d4fdc7..b93df84 100644
--- a/utility/BloomFilter.hpp
+++ b/utility/BloomFilter.hpp
@@ -26,8 +26,15 @@
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <memory>
+#include <utility>
 #include <vector>
 
+#include "storage/StorageConstants.hpp"
+#include "threading/Mutex.hpp"
+#include "threading/SharedMutex.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "utility/BloomFilter.pb.h"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -47,7 +54,30 @@ class BloomFilter {
 
   /**
    * @brief Constructor.
-   * @note The ownership of the bit array lies with the caller.
+   * @note When no bit_array is being passed to the constructor,
+   *       then the bit_array is owned and managed by this class.
+   *
+   * @param random_seed A random_seed that generates unique hash functions.
+   * @param hash_fn_count The number of hash functions used by this bloom filter.
+   * @param bit_array_size_in_bytes Size of the bit array.
+   **/
+  BloomFilter(const std::uint64_t random_seed,
+              const std::size_t hash_fn_count,
+              const std::uint64_t bit_array_size_in_bytes)
+      : random_seed_(random_seed),
+        hash_fn_count_(hash_fn_count),
+        array_size_in_bytes_(bit_array_size_in_bytes),
+        array_size_(array_size_in_bytes_ * kNumBitsPerByte),
+        bit_array_(new std::uint8_t[array_size_in_bytes_]),
+        is_bit_array_owner_(true) {
+    reset();
+    generate_unique_hash_fn();
+  }
+
+  /**
+   * @brief Constructor.
+   * @note When a bit_array is passed as an argument to the constructor,
+   *       then the ownership of the bit array lies with the caller.
    *
    * @param random_seed A random_seed that generates unique hash functions.
    * @param hash_fn_count The number of hash functions used by this bloom filter.
@@ -61,11 +91,12 @@ class BloomFilter {
               const std::uint64_t bit_array_size_in_bytes,
               std::uint8_t *bit_array,
               const bool is_initialized)
-      : hash_fn_count_(hash_fn_count),
-        random_seed_(random_seed) {
-    array_size_ = bit_array_size_in_bytes * kNumBitsPerByte;
-    array_size_in_bytes_ = bit_array_size_in_bytes;
-    bit_array_  = bit_array;  // Owned by the calling method.
+      : random_seed_(random_seed),
+        hash_fn_count_(hash_fn_count),
+        array_size_in_bytes_(bit_array_size_in_bytes),
+        array_size_(bit_array_size_in_bytes * kNumBitsPerByte),
+        bit_array_(bit_array),  // Owned by the calling method.
+        is_bit_array_owner_(false) {
     if (!is_initialized) {
       reset();
     }
@@ -73,27 +104,149 @@ class BloomFilter {
   }
 
   /**
+   * @brief Constructor.
+   * @note When a bloom filter proto is passed as an initializer,
+   *       then the bit_array is owned and managed by this class.
+   *
+   * @param bloom_filter_proto The protobuf representation of a
+   *        bloom filter configuration.
+   **/
+  explicit BloomFilter(const serialization::BloomFilter &bloom_filter_proto)
+      : random_seed_(bloom_filter_proto.bloom_filter_seed()),
+        hash_fn_count_(bloom_filter_proto.number_of_hashes()),
+        array_size_in_bytes_(bloom_filter_proto.bloom_filter_size()),
+        array_size_(array_size_in_bytes_ * kNumBitsPerByte),
+        bit_array_(new std::uint8_t[array_size_in_bytes_]),
+        is_bit_array_owner_(true) {
+    reset();
+    generate_unique_hash_fn();
+  }
+
+  /**
+   * @brief Destructor.
+   **/
+  ~BloomFilter() {
+    if (is_bit_array_owner_) {
+      bit_array_.reset();
+    } else {
+      bit_array_.release();
+    }
+  }
+
+  static bool ProtoIsValid(const serialization::BloomFilter &bloom_filter_proto) {
+    return bloom_filter_proto.IsInitialized();
+  }
+
+  /**
    * @brief Zeros out the contents of the bit array.
    **/
   inline void reset() {
     // Initialize the bit_array with all zeros.
-    std::fill_n(bit_array_, array_size_in_bytes_, 0x00);
+    std::fill_n(bit_array_.get(), array_size_in_bytes_, 0x00);
     inserted_element_count_ = 0;
   }
 
   /**
+   * @brief Get the random seed that was used to initialize this bloom filter.
+   *
+   * @return Returns the random seed.
+   **/
+  inline std::uint64_t getRandomSeed() const {
+    return random_seed_;
+  }
+
+  /**
+   * @brief Get the number of hash functions used in this bloom filter.
+   *
+   * @return Returns the number of hash functions.
+   **/
+  inline std::uint32_t getNumberOfHashes() const {
+    return hash_fn_count_;
+  }
+
+  /**
+   * @brief Get the size of the bit array in bytes for this bloom filter.
+   *
+   * @return Returns the bit array size (in bytes).
+   **/
+  inline std::uint64_t getBitArraySize() const {
+    return array_size_in_bytes_;
+  }
+
+  /**
+   * @brief Get the constant pointer to the bit array for this bloom filter
+   *
+   * @return Returns constant pointer to the bit array.
+   **/
+  inline const std::uint8_t* getBitArray() const {
+    return bit_array_.get();
+  }
+
+  /**
+   * @brief Inserts a given value into the bloom filter in a thread-safe manner.
+   *
+   * @param key_begin A pointer to the value being inserted.
+   * @param length Size of the value being inserted in bytes.
+   */
+  inline void insert(const std::uint8_t *key_begin, const std::size_t length) {
+    // Locks are needed during insertion, when multiple workers may be modifying the
+    // bloom filter concurrently. However, locks are not required during membership test.
+    std::size_t bit_index = 0;
+    std::size_t bit = 0;
+    std::vector<std::pair<std::size_t, std::size_t>> modified_bit_positions;
+    std::vector<bool> is_bit_position_correct;
+
+    // Determine all the bit positions that are required to be set.
+    for (std::size_t i = 0; i < hash_fn_count_; ++i) {
+      compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit);
+      modified_bit_positions.push_back(std::make_pair(bit_index, bit));
+    }
+
+    // Acquire a reader lock and check which of the bit positions are already set.
+    {
+      SpinSharedMutexSharedLock<false> shared_reader_lock(bloom_filter_insert_mutex_);
+      for (std::size_t i = 0; i < hash_fn_count_; ++i) {
+        bit_index = modified_bit_positions[i].first;
+        bit = modified_bit_positions[i].second;
+        if (((bit_array_.get())[bit_index / kNumBitsPerByte] & (1 << bit)) != (1 << bit)) {
+          is_bit_position_correct.push_back(false);
+        } else {
+          is_bit_position_correct.push_back(true);
+        }
+      }
+    }
+
+    // Acquire a writer lock and set the bit positions are which are not set.
+    {
+      SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
+      for (std::size_t i = 0; i < hash_fn_count_; ++i) {
+        if (!is_bit_position_correct[i]) {
+          bit_index = modified_bit_positions[i].first;
+          bit = modified_bit_positions[i].second;
+          (bit_array_.get())[bit_index / kNumBitsPerByte] |= (1 << bit);
+        }
+      }
+    }
+    ++inserted_element_count_;
+  }
+
+  /**
    * @brief Inserts a given value into the bloom filter.
+   * @Warning This is a faster thread-unsafe version of the insert() function.
+   *          The caller needs to ensure the thread safety.
    *
    * @param key_begin A pointer to the value being inserted.
    * @param length Size of the value being inserted in bytes.
    */
-  inline void insert(const std::uint8_t *key_begin, const std::size_t &length) {
+  inline void insertUnSafe(const std::uint8_t *key_begin, const std::size_t length) {
     std::size_t bit_index = 0;
     std::size_t bit = 0;
+
     for (std::size_t i = 0; i < hash_fn_count_; ++i) {
       compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit);
-      bit_array_[bit_index / kNumBitsPerByte] |= (1 << bit);
+      (bit_array_.get())[bit_index / kNumBitsPerByte] |= (1 << bit);
     }
+
     ++inserted_element_count_;
   }
 
@@ -102,6 +255,9 @@ class BloomFilter {
    *        If true is returned, then a value may or may not be present in the bloom filter.
    *        If false is returned, a value is certainly not present in the bloom filter.
    *
+   * @note The membersip test does not require any locks, because the assumption is that
+   *       the bloom filter will only be used after it has been built.
+   *
    * @param key_begin A pointer to the value being tested for membership.
    * @param length Size of the value being inserted in bytes.
    */
@@ -110,7 +266,7 @@ class BloomFilter {
     std::size_t bit = 0;
     for (std::size_t i = 0; i < hash_fn_count_; ++i) {
       compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit);
-      if ((bit_array_[bit_index / kNumBitsPerByte] & (1 << bit)) != (1 << bit)) {
+      if (((bit_array_.get())[bit_index / kNumBitsPerByte] & (1 << bit)) != (1 << bit)) {
         return false;
       }
     }
@@ -118,6 +274,19 @@ class BloomFilter {
   }
 
   /**
+   * @brief Perform a bitwise-OR of the given Bloom filter with this bloom filter.
+   *        Essentially, it does a union of this bloom filter with the passed bloom filter.
+   *
+   * @param bloom_filter A const pointer to the bloom filter object to do bitwise-OR with.
+   */
+  inline void bitwiseOr(const BloomFilter *bloom_filter) {
+    SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
+    for (std::size_t byte_index = 0; byte_index < bloom_filter->getBitArraySize(); ++byte_index) {
+      (bit_array_.get())[byte_index] |= bloom_filter->getBitArray()[byte_index];
+    }
+  }
+
+  /**
    * @brief Return the number of elements currently inserted into bloom filter.
    *
    * @return The number of elements inserted into bloom filter.
@@ -219,13 +388,16 @@ class BloomFilter {
   }
 
  private:
+  const std::uint64_t random_seed_;
   std::vector<std::uint32_t> hash_fn_;
-  std::uint8_t *bit_array_;
   const std::uint32_t hash_fn_count_;
-  std::uint64_t array_size_;
   std::uint64_t array_size_in_bytes_;
+  std::uint64_t array_size_;
+  std::unique_ptr<std::uint8_t> bit_array_;
   std::uint32_t inserted_element_count_;
-  const std::uint64_t random_seed_;
+  const bool is_bit_array_owner_;
+
+  alignas(kCacheLineBytes) mutable SpinSharedMutex<false> bloom_filter_insert_mutex_;
 
   DISALLOW_COPY_AND_ASSIGN(BloomFilter);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/utility/BloomFilter.proto
----------------------------------------------------------------------
diff --git a/utility/BloomFilter.proto b/utility/BloomFilter.proto
new file mode 100644
index 0000000..8dd9163
--- /dev/null
+++ b/utility/BloomFilter.proto
@@ -0,0 +1,30 @@
+//   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+//     University of Wisconsin\u2014Madison.
+//
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+message BloomFilter {
+  // The default values were determined from empirical experiments.
+  // These values control the amount of false positivity that
+  // is expected from Bloom Filter.
+  // - Default seed for initializing family of hashes = 0xA5A5A5A55A5A5A5A.
+  // - Default bloom filter size = 10 KB.
+  // - Default number of hash functions used in bloom filter = 5.
+  optional fixed64 bloom_filter_seed = 1 [default = 0xA5A5A5A55A5A5A5A];
+  optional uint32 bloom_filter_size = 2 [default = 10000];
+  optional uint32 number_of_hashes = 3 [default = 5];
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21b85088/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index bb59f65..6d1eeab 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -146,6 +146,10 @@ configure_file (
   "${CMAKE_CURRENT_BINARY_DIR}/UtilityConfig.h"
 )
 
+QS_PROTOBUF_GENERATE_CPP(quickstep_utility_BloomFilter_proto_srcs
+                         quickstep_utility_BloomFilter_proto_hdrs
+                         BloomFilter.proto)
+
 QS_PROTOBUF_GENERATE_CPP(quickstep_utility_SortConfiguration_proto_srcs
                          quickstep_utility_SortConfiguration_proto_hdrs
                          SortConfiguration.proto)
@@ -155,6 +159,9 @@ add_library(quickstep_utility_Alignment ../empty_src.cpp Alignment.hpp)
 add_library(quickstep_utility_BitManipulation ../empty_src.cpp BitManipulation.hpp)
 add_library(quickstep_utility_BitVector ../empty_src.cpp BitVector.hpp)
 add_library(quickstep_utility_BloomFilter ../empty_src.cpp BloomFilter.hpp)
+add_library(quickstep_utility_BloomFilter_proto
+            ${quickstep_utility_BloomFilter_proto_srcs}
+            ${quickstep_utility_BloomFilter_proto_hdrs})
 add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
@@ -202,7 +209,14 @@ target_link_libraries(quickstep_utility_BitVector
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_BloomFilter
                       glog
+                      quickstep_storage_StorageConstants
+                      quickstep_threading_Mutex
+                      quickstep_threading_SharedMutex
+                      quickstep_threading_SpinSharedMutex
+                      quickstep_utility_BloomFilter_proto
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_BloomFilter_proto
+                      ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_utility_CalculateInstalledMemory
                       glog)
 target_link_libraries(quickstep_utility_CheckSnprintf
@@ -271,6 +285,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_BitManipulation
                       quickstep_utility_BitVector
                       quickstep_utility_BloomFilter
+                      quickstep_utility_BloomFilter_proto
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf