You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/07 18:22:18 UTC

incubator-quickstep git commit: Backend updates

Repository: incubator-quickstep
Updated Branches:
  refs/heads/lip-refactor 396f8576a -> 6a0f68f0f


Backend updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6a0f68f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6a0f68f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6a0f68f0

Branch: refs/heads/lip-refactor
Commit: 6a0f68f0fb13b74542c3a0bb86eb8f6d568cb121
Parents: 396f857
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Fri Oct 7 13:22:10 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Fri Oct 7 13:22:10 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |   4 +-
 query_execution/QueryContext.cpp                |  27 ++++
 query_execution/QueryContext.hpp                |  31 ++--
 query_execution/QueryContext.proto              |   2 +-
 query_optimizer/CMakeLists.txt                  |   3 +
 query_optimizer/ExecutionGenerator.cpp          |   3 +-
 query_optimizer/LIPFilterGenerator.cpp          |  89 +++++++++--
 query_optimizer/LIPFilterGenerator.hpp          |  66 ++++----
 .../physical/LIPFilterConfiguration.hpp         |   6 +-
 relational_operators/BuildHashOperator.cpp      |  22 ++-
 relational_operators/BuildHashOperator.hpp      |  18 ++-
 relational_operators/HashJoinOperator.cpp       |  31 +++-
 relational_operators/HashJoinOperator.hpp       |  43 +++--
 relational_operators/RelationalOperator.hpp     |   9 ++
 storage/CMakeLists.txt                          |   2 -
 storage/FastHashTable.hpp                       | 152 +++---------------
 storage/FastHashTableFactory.hpp                |  35 +---
 utility/lip_filter/CMakeLists.txt               |  15 +-
 utility/lip_filter/LIPFilter.hpp                |  21 +++
 utility/lip_filter/LIPFilter.proto              |  15 +-
 utility/lip_filter/LIPFilterAdaptiveProber.hpp  | 115 +++++++++++++-
 utility/lip_filter/LIPFilterBuilder.hpp         |  28 +++-
 utility/lip_filter/LIPFilterDeployment.cpp      |  69 ++++++++
 utility/lip_filter/LIPFilterDeployment.hpp      |  71 +++++++++
 utility/lip_filter/LIPFilterDeploymentInfo.hpp  |  69 --------
 utility/lip_filter/LIPFilterFactory.cpp         |  57 +++++++
 utility/lip_filter/LIPFilterFactory.hpp         |   8 +-
 utility/lip_filter/SingleIdentityHashFilter.hpp | 158 +++++++++++++++++++
 28 files changed, 808 insertions(+), 361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 58e5761..78ffb33 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -192,7 +192,9 @@ target_link_libraries(quickstep_queryexecution_QueryContext
                       quickstep_utility_Macros
                       quickstep_utility_SortConfiguration
                       quickstep_utility_lipfilter_LIPFilter
-                      quickstep_utility_lipfilter_LIPFilterDeploymentInfo)
+                      quickstep_utility_lipfilter_LIPFilterDeployment
+                      quickstep_utility_lipfilter_LIPFilterFactory
+                      quickstep_utility_lipfilter_LIPFilter_proto)
 target_link_libraries(quickstep_queryexecution_QueryContext_proto
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_tablegenerator_GeneratorFunction_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 47408d4..57d200a 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -40,6 +40,9 @@
 #include "types/TypedValue.hpp"
 #include "types/containers/Tuple.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
+#include "utility/lip_filter/LIPFilterFactory.hpp"
 #include "utility/SortConfiguration.hpp"
 
 #include "glog/logging.h"
@@ -93,6 +96,18 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
         bus));
   }
 
+  for (int i = 0; i < proto.lip_filters_size(); ++i) {
+    lip_filters_.emplace_back(
+        std::unique_ptr<LIPFilter>(
+            LIPFilterFactory::ReconstructFromProto(proto.lip_filters(i))));
+  }
+
+  for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) {
+    lip_deployments_.emplace_back(
+        std::make_unique<LIPFilterDeployment>(
+            proto.lip_filter_deployments(i), lip_filters_));
+  }
+
   for (int i = 0; i < proto.predicates_size(); ++i) {
     predicates_.emplace_back(
         PredicateFactory::ReconstructFromProto(proto.predicates(i), database));
@@ -180,6 +195,18 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
     }
   }
 
+  for (int i = 0; i < proto.lip_filters_size(); ++i) {
+    if (!LIPFilterFactory::ProtoIsValid(proto.lip_filters(i))) {
+      return false;
+    }
+  }
+
+  for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) {
+    if (!LIPFilterDeployment::ProtoIsValid(proto.lip_filter_deployments(i))) {
+      return false;
+    }
+  }
+
   for (int i = 0; i < proto.predicates_size(); ++i) {
     if (!PredicateFactory::ProtoIsValid(proto.predicates(i), database)) {
       return false;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 3e287c0..66476f4 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -36,7 +36,7 @@
 #include "storage/WindowAggregationOperationState.hpp"
 #include "types/containers/Tuple.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
-#include "utility/lip_filter/LIPFilterDeploymentInfo.hpp"
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
 #include "utility/Macros.hpp"
 #include "utility/SortConfiguration.hpp"
 
@@ -93,7 +93,8 @@ class QueryContext {
   /**
    * @brief A unique identifier for a LIPFilterDeploymentInfo per query.
    **/
-  typedef std::uint32_t lip_filter_deployment_info_id;
+  typedef std::uint32_t lip_deployment_id;
+  static constexpr lip_deployment_id kInvalidILIPDeploymentId = static_cast<lip_deployment_id>(-1);
 
   /**
    * @brief A unique identifier for a Predicate per query.
@@ -345,22 +346,22 @@ class QueryContext {
    *
    * @return True if valid, otherwise false.
    **/
-  bool isValidLIPFilterDeploymentInfoId(const lip_filter_deployment_info_id id) const {
-    return id < lip_filter_deployment_infos_.size();
+  bool isValidLIPDeploymentId(const lip_deployment_id id) const {
+    return id < lip_deployments_.size();
   }
 
   /**
-   * @brief Get a constant pointer to the LIPFilterDeploymentInfo.
+   * @brief Get a constant pointer to the LIPFilterDeployment.
    *
-   * @param id The LIPFilterDeploymentInfo id.
+   * @param id The LIPFilterDeployment id.
    *
-   * @return The constant pointer to LIPFilterDeploymentInfo that is
+   * @return The constant pointer to LIPFilterDeployment that is
    *         already created in the constructor.
    **/
-  inline const LIPFilterDeploymentInfo* getLIPFilterDeploymentInfo(
-      const lip_filter_deployment_info_id id) const {
-    DCHECK_LT(id, lip_filter_deployment_infos_.size());
-    return lip_filter_deployment_infos_[id].get();
+  inline const LIPFilterDeployment* getLIPDeployment(
+      const lip_deployment_id id) const {
+    DCHECK_LT(id, lip_deployments_.size());
+    return lip_deployments_[id].get();
   }
 
   /**
@@ -368,9 +369,9 @@ class QueryContext {
    *
    * @param id The id of the LIPFilterDeploymentInfo to destroy.
    **/
-  inline void destroyLIPFilterDeploymentInfo(const lip_filter_deployment_info_id id) {
-    DCHECK_LT(id, lip_filter_deployment_infos_.size());
-    lip_filter_deployment_infos_[id].reset();
+  inline void destroyLIPDeployment(const lip_deployment_id id) {
+    DCHECK_LT(id, lip_deployments_.size());
+    lip_deployments_[id].reset();
   }
 
   /**
@@ -552,7 +553,7 @@ class QueryContext {
   std::vector<std::unique_ptr<InsertDestination>> insert_destinations_;
   std::vector<std::unique_ptr<JoinHashTable>> join_hash_tables_;
   std::vector<std::unique_ptr<LIPFilter>> lip_filters_;
-  std::vector<std::unique_ptr<LIPFilterDeploymentInfo>> lip_filter_deployment_infos_;
+  std::vector<std::unique_ptr<LIPFilterDeployment>> lip_deployments_;
   std::vector<std::unique_ptr<const Predicate>> predicates_;
   std::vector<std::vector<std::unique_ptr<const Scalar>>> scalar_groups_;
   std::vector<std::unique_ptr<const SortConfiguration>> sort_configs_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index d79b990..ab0f520 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -50,7 +50,7 @@ message QueryContext {
   repeated HashTable join_hash_tables = 3;
   repeated InsertDestination insert_destinations = 4;
   repeated LIPFilter lip_filters = 5;
-  repeated LIPFilterDeploymentInfo lip_filter_deployment_infos = 6;
+  repeated LIPFilterDeployment lip_filter_deployments = 6;
   repeated Predicate predicates = 7;
   repeated ScalarGroup scalar_groups = 8;
   repeated SortConfiguration sort_configs = 9;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 4013e3e..16e3934 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -165,7 +165,10 @@ target_link_libraries(quickstep_queryoptimizer_LIPFilterGenerator
                       quickstep_queryoptimizer_physical_LIPFilterConfiguration
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_Selection
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_types_Type
                       quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_lipfilter_LIPFilterDeployment
                       quickstep_utility_lipfilter_LIPFilter_proto)
 target_link_libraries(quickstep_queryoptimizer_LogicalGenerator
                       glog

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 559de57..ce25841 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1460,8 +1460,7 @@ void ExecutionGenerator::convertAggregate(
                                        true);
 
   lip_filter_generator_->addAggregateInfo(physical_plan,
-                                          aggregation_operator_index,
-                                          aggr_state_index);
+                                          aggregation_operator_index);
 }
 
 void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_optimizer/LIPFilterGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/LIPFilterGenerator.cpp b/query_optimizer/LIPFilterGenerator.cpp
index 4e24740..f7394e0 100644
--- a/query_optimizer/LIPFilterGenerator.cpp
+++ b/query_optimizer/LIPFilterGenerator.cpp
@@ -25,6 +25,8 @@
 #include "catalog/CatalogAttribute.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.pb.h"
+#include "relational_operators/RelationalOperator.hpp"
+#include "types/Type.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 #include "utility/lip_filter/LIPFilter.pb.h"
 
@@ -65,21 +67,31 @@ void LIPFilterGenerator::deployLIPFilters(QueryPlan *execution_plan,
 
   // Deploy builders
   const auto &build_info_map = lip_filter_configuration_->getBuildInfoMap();
-  for (const auto &hash_join_info : hash_join_infos_) {
-    const P::PhysicalPtr &builder_node = hash_join_info.hash_join;
-    const auto build_it = build_info_map.find(builder_node);
+  for (const auto &info : builder_infos_) {
+    const auto build_it = build_info_map.find(info.builder_node);
     if (build_it != build_info_map.end()) {
       deployBuilderInternal(execution_plan,
                             query_context_proto,
-                            builder_node,
-                            hash_join_info.build_operator_index,
+                            info.builder_node,
+                            info.builder_operator_index,
                             build_it->second,
                             &lip_filter_builder_map);
     }
   }
 
   // Deploy probers
-  // const auto &probe_info_map = lip_filter_configuration_->getProbeInfoMap();
+  const auto &probe_info_map = lip_filter_configuration_->getProbeInfoMap();
+  for (const auto &info : prober_infos_) {
+    const auto probe_it = probe_info_map.find(info.prober_node);
+    if (probe_it != probe_info_map.end()) {
+      deployProberInteral(execution_plan,
+                          query_context_proto,
+                          info.prober_node,
+                          info.prober_operator_index,
+                          probe_it->second,
+                          lip_filter_builder_map);
+    }
+  }
 }
 
 void LIPFilterGenerator::deployBuilderInternal(
@@ -89,18 +101,29 @@ void LIPFilterGenerator::deployBuilderInternal(
     const QueryPlan::DAGNodeIndex builder_operator_index,
     const std::vector<physical::LIPFilterBuildInfo> &build_info_vec,
     LIPFilterBuilderMap *lip_filter_builder_map) const {
+  const auto lip_deployment_index = query_context_proto->lip_filter_deployments_size();
+  auto *lip_filter_deployment_info_proto =
+      query_context_proto->add_lip_filter_deployments();
+  lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::BUILD);
+
   const auto &builder_attribute_map = attribute_map_.at(builder_node);
   for (const auto &info : build_info_vec) {
     const QueryContext::lip_filter_id lip_filter_id = query_context_proto->lip_filters_size();
     serialization::LIPFilter *lip_filter_proto = query_context_proto->add_lip_filters();
+    const CatalogAttribute *target_attr = builder_attribute_map.at(info.build_attribute->id());
+    const Type &attr_type = target_attr->getType();
 
     switch (info.filter_type) {
-      case LIPFilterType::kSingleIdentityHashFilter:
+      case LIPFilterType::kSingleIdentityHashFilter: {
+        DCHECK(!attr_type.isVariableLength());
         lip_filter_proto->set_lip_filter_type(
             serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER);
         lip_filter_proto->SetExtension(
-            serialization::SingleIdentityHashFilter::num_bits, info.filter_size);
+            serialization::SingleIdentityHashFilter::filter_cardinality, info.filter_cardinality);
+        lip_filter_proto->SetExtension(
+            serialization::SingleIdentityHashFilter::attribute_size, attr_type.minimumByteLength());
         break;
+      }
       default:
         LOG(FATAL) << "Unsupported LIPFilter type";
         break;
@@ -110,21 +133,53 @@ void LIPFilterGenerator::deployBuilderInternal(
         std::make_pair(info.build_attribute->id(), builder_node),
         std::make_pair(lip_filter_id, builder_operator_index));
 
-    auto *lip_filter_deployment_info_proto =
-        query_context_proto->add_lip_filter_deployment_infos();
-    lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::BUILD);
-    lip_filter_deployment_info_proto->set_lip_filter_id(lip_filter_id);
-
-    const CatalogAttribute *target_attr = builder_attribute_map.at(info.build_attribute->id());
-    lip_filter_deployment_info_proto->set_attribute_id(target_attr->getID());
-    lip_filter_deployment_info_proto->mutable_attribute_type()->CopyFrom(
-        target_attr->getType().getProto());
+    auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_entries();
+    lip_filter_entry_proto->set_lip_filter_id(lip_filter_id);
+    lip_filter_entry_proto->set_attribute_id(target_attr->getID());
+    lip_filter_entry_proto->mutable_attribute_type()->CopyFrom(attr_type.getProto());
 
     std::cerr << "Build " << info.build_attribute->toString()
               << " @" << builder_node << "\n";
   }
+
+  RelationalOperator *relop =
+      execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(builder_operator_index);
+  relop->deployLIPFilter(lip_deployment_index);
 }
 
+void LIPFilterGenerator::deployProberInteral(
+    QueryPlan *execution_plan,
+    serialization::QueryContext *query_context_proto,
+    const physical::PhysicalPtr &prober_node,
+    const QueryPlan::DAGNodeIndex prober_operator_index,
+    const std::vector<physical::LIPFilterProbeInfo> &probe_info_vec,
+    const LIPFilterBuilderMap &lip_filter_builder_map) const {
+  const auto lip_deployment_index = query_context_proto->lip_filter_deployments_size();
+  auto *lip_filter_deployment_info_proto =
+      query_context_proto->add_lip_filter_deployments();
+  lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::PROBE);
+
+  const auto &prober_attribute_map = attribute_map_.at(prober_node);
+  for (const auto &info : probe_info_vec) {
+    const auto &builder_info =
+        lip_filter_builder_map.at(
+            std::make_pair(info.build_attribute->id(), info.builder));
+    const CatalogAttribute *target_attr = prober_attribute_map.at(info.probe_attribute->id());
+
+    auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_entries();
+    lip_filter_entry_proto->set_lip_filter_id(builder_info.first);
+    lip_filter_entry_proto->set_attribute_id(target_attr->getID());
+    lip_filter_entry_proto->mutable_attribute_type()->CopyFrom(
+        target_attr->getType().getProto());
+
+    std::cerr << "Probe " << info.probe_attribute->toString()
+              << " @" << prober_node << "\n";
+  }
+
+  RelationalOperator *relop =
+      execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(prober_operator_index);
+  relop->deployLIPFilter(lip_deployment_index);
+}
 
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_optimizer/LIPFilterGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/LIPFilterGenerator.hpp b/query_optimizer/LIPFilterGenerator.hpp
index 05270c9..b5925d4 100644
--- a/query_optimizer/LIPFilterGenerator.hpp
+++ b/query_optimizer/LIPFilterGenerator.hpp
@@ -55,60 +55,44 @@ class LIPFilterGenerator {
       const std::unordered_map<expressions::ExprId, const CatalogAttribute *> &attribute_substitution_map);
 
   void addAggregateInfo(const physical::AggregatePtr &aggregate,
-                        const QueryPlan::DAGNodeIndex aggregate_operator_index,
-                        const QueryContext::aggregation_state_id aggregation_state_id) {
-    aggregate_infos_.emplace_back(aggregate, aggregate_operator_index, aggregation_state_id);
+                        const QueryPlan::DAGNodeIndex aggregate_operator_index) {
+    prober_infos_.emplace_back(aggregate, aggregate_operator_index);
   }
 
   void addHashJoinInfo(const physical::HashJoinPtr &hash_join,
                        const QueryPlan::DAGNodeIndex build_operator_index,
                        const QueryPlan::DAGNodeIndex join_operator_index) {
-    hash_join_infos_.emplace_back(hash_join, build_operator_index, join_operator_index);
+    builder_infos_.emplace_back(hash_join, build_operator_index);
+    prober_infos_.emplace_back(hash_join, join_operator_index);
   }
 
   void addSelectionInfo(const physical::SelectionPtr &selection,
                         const QueryPlan::DAGNodeIndex select_operator_index) {
-    selection_infos_.emplace_back(selection, select_operator_index);
+    prober_infos_.emplace_back(selection, select_operator_index);
   }
 
   void deployLIPFilters(QueryPlan *execution_plan,
                         serialization::QueryContext *query_context_proto) const;
 
  private:
-  struct AggregateInfo {
-    AggregateInfo(const physical::AggregatePtr &aggregate_in,
-                  const QueryPlan::DAGNodeIndex aggregate_operator_index_in,
-                  const QueryContext::aggregation_state_id aggregation_state_id_in)
-        : aggregate(aggregate_in),
-          aggregate_operator_index(aggregate_operator_index_in),
-          aggregation_state_id(aggregation_state_id_in) {
+  struct BuilderInfo {
+    BuilderInfo(const physical::PhysicalPtr &builder_node_in,
+                const QueryPlan::DAGNodeIndex builder_operator_index_in)
+        : builder_node(builder_node_in),
+          builder_operator_index(builder_operator_index_in) {
     }
-    const physical::AggregatePtr aggregate;
-    const QueryPlan::DAGNodeIndex aggregate_operator_index;
-    const QueryContext::aggregation_state_id aggregation_state_id;
+    const physical::PhysicalPtr builder_node;
+    const QueryPlan::DAGNodeIndex builder_operator_index;
   };
 
-  struct HashJoinInfo {
-    HashJoinInfo(const physical::HashJoinPtr &hash_join_in,
-                 const QueryPlan::DAGNodeIndex build_operator_index_in,
-                 const QueryPlan::DAGNodeIndex join_operator_index_in)
-        : hash_join(hash_join_in),
-          build_operator_index(build_operator_index_in),
-          join_operator_index(join_operator_index_in) {
+  struct ProberInfo {
+    ProberInfo(const physical::PhysicalPtr &prober_node_in,
+               const QueryPlan::DAGNodeIndex prober_operator_index_in)
+        : prober_node(prober_node_in),
+          prober_operator_index(prober_operator_index_in) {
     }
-    const physical::HashJoinPtr hash_join;
-    const QueryPlan::DAGNodeIndex build_operator_index;
-    const QueryPlan::DAGNodeIndex join_operator_index;
-  };
-
-  struct SelectionInfo {
-    SelectionInfo(const physical::SelectionPtr &selection_in,
-                  const QueryPlan::DAGNodeIndex select_operator_index_in)
-        : selection(selection_in),
-          select_operator_index(select_operator_index_in) {
-    }
-    const physical::SelectionPtr selection;
-    const QueryPlan::DAGNodeIndex select_operator_index;
+    const physical::PhysicalPtr prober_node;
+    const QueryPlan::DAGNodeIndex prober_operator_index;
   };
 
   typedef std::map<std::pair<expressions::ExprId, physical::PhysicalPtr>,
@@ -121,13 +105,17 @@ class LIPFilterGenerator {
                              const std::vector<physical::LIPFilterBuildInfo> &build_info_vec,
                              LIPFilterBuilderMap *lip_filter_builder_map) const;
 
-  void deployProberInteral();
+  void deployProberInteral(QueryPlan *execution_plan,
+                           serialization::QueryContext *query_context_proto,
+                           const physical::PhysicalPtr &prober_node,
+                           const QueryPlan::DAGNodeIndex prober_operator_index,
+                           const std::vector<physical::LIPFilterProbeInfo> &probe_info_vec,
+                           const LIPFilterBuilderMap &lip_filter_builder_map) const;
 
   const physical::LIPFilterConfigurationPtr lip_filter_configuration_;
   std::map<physical::PhysicalPtr, std::map<expressions::ExprId, const CatalogAttribute *>> attribute_map_;
-  std::vector<AggregateInfo> aggregate_infos_;
-  std::vector<HashJoinInfo> hash_join_infos_;
-  std::vector<SelectionInfo> selection_infos_;
+  std::vector<BuilderInfo> builder_infos_;
+  std::vector<ProberInfo> prober_infos_;
 };
 
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_optimizer/physical/LIPFilterConfiguration.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/LIPFilterConfiguration.hpp b/query_optimizer/physical/LIPFilterConfiguration.hpp
index 9b028ad..f9236e5 100644
--- a/query_optimizer/physical/LIPFilterConfiguration.hpp
+++ b/query_optimizer/physical/LIPFilterConfiguration.hpp
@@ -44,14 +44,14 @@ typedef std::shared_ptr<const Physical> PhysicalPtr;
 
 struct LIPFilterBuildInfo {
   LIPFilterBuildInfo(const expressions::AttributeReferencePtr &build_attribute_in,
-                     const std::size_t filter_size_in,
+                     const std::size_t filter_cardinality_in,
                      const LIPFilterType &filter_type_in)
       : build_attribute(build_attribute_in),
-        filter_size(filter_size_in),
+        filter_cardinality(filter_cardinality_in),
         filter_type(filter_type_in) {
   }
   expressions::AttributeReferencePtr build_attribute;
-  std::size_t filter_size;
+  std::size_t filter_cardinality;
   LIPFilterType filter_type;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/relational_operators/BuildHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp
index 465621c..eaf3259 100644
--- a/relational_operators/BuildHashOperator.cpp
+++ b/relational_operators/BuildHashOperator.cpp
@@ -34,6 +34,7 @@
 #include "storage/TupleReference.hpp"
 #include "storage/TupleStorageSubBlock.hpp"
 #include "storage/ValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
 
 #include "glog/logging.h"
 
@@ -68,6 +69,14 @@ bool BuildHashOperator::getAllWorkOrders(
     tmb::MessageBus *bus) {
   DCHECK(query_context != nullptr);
 
+  LIPFilterBuilderPtr lip_filter_builder = nullptr;
+  if (lip_deployment_index_ != QueryContext::kInvalidILIPDeploymentId) {
+    const LIPFilterDeployment *lip_filter_deployment =
+        query_context->getLIPDeployment(lip_deployment_index_);
+    lip_filter_builder = std::shared_ptr<LIPFilterBuilder>(
+        lip_filter_deployment->createLIPFilterBuilder());
+  }
+
   JoinHashTable *hash_table = query_context->getJoinHashTable(hash_table_index_);
   if (input_relation_is_stored_) {
     if (!started_) {
@@ -79,7 +88,8 @@ bool BuildHashOperator::getAllWorkOrders(
                                    any_join_key_attributes_nullable_,
                                    input_block_id,
                                    hash_table,
-                                   storage_manager),
+                                   storage_manager,
+                                   lip_filter_builder),
             op_index_);
       }
       started_ = true;
@@ -95,7 +105,8 @@ bool BuildHashOperator::getAllWorkOrders(
               any_join_key_attributes_nullable_,
               input_relation_block_ids_[num_workorders_generated_],
               hash_table,
-              storage_manager),
+              storage_manager,
+              lip_filter_builder),
           op_index_);
       ++num_workorders_generated_;
     }
@@ -136,17 +147,22 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id
                       any_join_key_attributes_nullable_);
   proto->SetExtension(serialization::BuildHashWorkOrder::join_hash_table_index, hash_table_index_);
   proto->SetExtension(serialization::BuildHashWorkOrder::block_id, block);
+  // TODO(jianqiao): update lip_filter related stuff
 
   return proto;
 }
 
-
 void BuildHashWorkOrder::execute() {
   BlockReference block(
       storage_manager_->getBlock(build_block_id_, input_relation_));
 
   TupleReferenceGenerator generator(build_block_id_);
   std::unique_ptr<ValueAccessor> accessor(block->getTupleStorageSubBlock().createValueAccessor());
+
+  if (lip_filter_builder_ != nullptr) {
+    lip_filter_builder_->insertValueAccessor(accessor.get());
+  }
+
   HashTablePutResult result;
   if (join_key_attributes_.size() == 1) {
     result = hash_table_->putValueAccessor(accessor.get(),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index 4a80a8a..940298c 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
 #define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
 
+#include <memory>
 #include <string>
 #include <utility>
 #include <vector>
@@ -31,6 +32,7 @@
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
 
 #include "glog/logging.h"
 
@@ -162,6 +164,7 @@ class BuildHashWorkOrder : public WorkOrder {
    * @param build_block_id The block id.
    * @param hash_table The JoinHashTable to use.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_builder The attached builder for building LIP filters.
    **/
   BuildHashWorkOrder(const std::size_t query_id,
                      const CatalogRelationSchema &input_relation,
@@ -169,14 +172,16 @@ class BuildHashWorkOrder : public WorkOrder {
                      const bool any_join_key_attributes_nullable,
                      const block_id build_block_id,
                      JoinHashTable *hash_table,
-                     StorageManager *storage_manager)
+                     StorageManager *storage_manager,
+                     LIPFilterBuilderPtr lip_filter_builder = nullptr)
       : WorkOrder(query_id),
         input_relation_(input_relation),
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         build_block_id_(build_block_id),
         hash_table_(DCHECK_NOTNULL(hash_table)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_builder_(lip_filter_builder) {}
 
   /**
    * @brief Constructor for the distributed version.
@@ -189,6 +194,7 @@ class BuildHashWorkOrder : public WorkOrder {
    * @param build_block_id The block id.
    * @param hash_table The JoinHashTable to use.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_builder The attached builder for building LIP filters.
    **/
   BuildHashWorkOrder(const std::size_t query_id,
                      const CatalogRelationSchema &input_relation,
@@ -196,14 +202,16 @@ class BuildHashWorkOrder : public WorkOrder {
                      const bool any_join_key_attributes_nullable,
                      const block_id build_block_id,
                      JoinHashTable *hash_table,
-                     StorageManager *storage_manager)
+                     StorageManager *storage_manager,
+                     LIPFilterBuilderPtr lip_filter_builder = nullptr)
       : WorkOrder(query_id),
         input_relation_(input_relation),
         join_key_attributes_(std::move(join_key_attributes)),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         build_block_id_(build_block_id),
         hash_table_(DCHECK_NOTNULL(hash_table)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_builder_(lip_filter_builder) {}
 
   ~BuildHashWorkOrder() override {}
 
@@ -222,6 +230,8 @@ class BuildHashWorkOrder : public WorkOrder {
   JoinHashTable *hash_table_;
   StorageManager *storage_manager_;
 
+  LIPFilterBuilderPtr lip_filter_builder_;
+
   DISALLOW_COPY_AND_ASSIGN(BuildHashWorkOrder);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 779c0fe..f8916df 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -48,6 +48,7 @@
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -180,6 +181,11 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
   if (blocking_dependencies_met_) {
     DCHECK(query_context != nullptr);
 
+    const LIPFilterDeployment *lip_filter_deployment = nullptr;
+    if (lip_deployment_index_ != QueryContext::kInvalidILIPDeploymentId) {
+      lip_filter_deployment = query_context->getLIPDeployment(lip_deployment_index_);
+    }
+
     const Predicate *residual_predicate =
         query_context->getPredicate(residual_predicate_index_);
     const vector<unique_ptr<const Scalar>> &selection =
@@ -192,6 +198,10 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
     if (probe_relation_is_stored_) {
       if (!started_) {
         for (const block_id probe_block_id : probe_relation_block_ids_) {
+          LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+          if (lip_filter_deployment != nullptr) {
+            lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+          }
           container->addNormalWorkOrder(
               new JoinWorkOrderClass(query_id_,
                                      build_relation_,
@@ -203,7 +213,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
                                      selection,
                                      hash_table,
                                      output_destination,
-                                     storage_manager),
+                                     storage_manager,
+                                     lip_filter_adaptive_prober),
               op_index_);
         }
         started_ = true;
@@ -211,6 +222,10 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
       return started_;
     } else {
       while (num_workorders_generated_ < probe_relation_block_ids_.size()) {
+        LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+        if (lip_filter_deployment != nullptr) {
+          lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+        }
         container->addNormalWorkOrder(
             new JoinWorkOrderClass(
                 query_id_,
@@ -223,7 +238,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
                 selection,
                 hash_table,
                 output_destination,
-                storage_manager),
+                storage_manager,
+                lip_filter_adaptive_prober),
             op_index_);
         ++num_workorders_generated_;
       }  // end while
@@ -423,6 +439,17 @@ void HashInnerJoinWorkOrder::execute() {
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
 
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+
+  std::unique_ptr<TupleIdSequence> lip_filter_existence_map;
+  std::unique_ptr<ValueAccessor> base_accessor;
+  if (lip_filter_adaptive_prober_ != nullptr) {
+    base_accessor.reset(probe_accessor.release());
+    lip_filter_existence_map.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+    probe_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*lip_filter_existence_map));
+  }
+
   MapBasedJoinedTupleCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index fa393b6..29d6eba 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -35,6 +35,7 @@
 #include "storage/HashTable.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 
 #include "glog/logging.h"
 
@@ -307,7 +308,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -318,7 +320,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   /**
    * @brief Constructor for the distributed version.
@@ -354,7 +357,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -365,7 +369,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   ~HashInnerJoinWorkOrder() override {}
 
@@ -392,6 +397,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
   InsertDestination *output_destination_;
   StorageManager *storage_manager_;
 
+  std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
   DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder);
 };
 
@@ -435,7 +442,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -446,7 +454,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   /**
    * @brief Constructor for the distributed version.
@@ -482,7 +491,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -493,7 +503,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   ~HashSemiJoinWorkOrder() override {}
 
@@ -516,6 +527,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
   InsertDestination *output_destination_;
   StorageManager *storage_manager_;
 
+  std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
   DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder);
 };
 
@@ -559,7 +572,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -570,7 +584,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   /**
    * @brief Constructor for the distributed version.
@@ -606,7 +621,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -617,7 +633,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   ~HashAntiJoinWorkOrder() override {}
 
@@ -646,6 +663,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
   InsertDestination *output_destination_;
   StorageManager *storage_manager_;
 
+  std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
   DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index f0303e5..fb05e9e 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -245,6 +245,13 @@ class RelationalOperator {
     return op_index_;
   }
 
+  /**
+   * @brief TODO
+   */
+  void deployLIPFilter(const QueryContext::lip_deployment_id lip_deployment_index) {
+    lip_deployment_index_ = lip_deployment_index;
+  }
+
  protected:
   /**
    * @brief Constructor
@@ -265,6 +272,8 @@ class RelationalOperator {
   bool done_feeding_input_relation_;
   std::size_t op_index_;
 
+  QueryContext::lip_deployment_id lip_deployment_index_;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(RelationalOperator);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index f05cc46..e85e005 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -643,7 +643,6 @@ target_link_libraries(quickstep_storage_FastHashTable
                       quickstep_threading_SpinSharedMutex
                       quickstep_types_Type
                       quickstep_types_TypedValue
-                      quickstep_utility_BloomFilter
                       quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_FastHashTableFactory
@@ -659,7 +658,6 @@ target_link_libraries(quickstep_storage_FastHashTableFactory
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
                       quickstep_storage_TupleReference
                       quickstep_types_TypeFactory
-                      quickstep_utility_BloomFilter
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
                       quickstep_storage_FastHashTable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/storage/FastHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp
index 4a95cd9..74d9ee3 100644
--- a/storage/FastHashTable.hpp
+++ b/storage/FastHashTable.hpp
@@ -39,7 +39,6 @@
 #include "threading/SpinSharedMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
-#include "utility/BloomFilter.hpp"
 #include "utility/HashPair.hpp"
 #include "utility/Macros.hpp"
 
@@ -958,62 +957,6 @@ class FastHashTable : public HashTableBase<resizable,
   template <typename FunctorT>
   std::size_t forEachCompositeKeyFast(FunctorT *functor, int index) const;
 
-  /**
-   * @brief A call to this function will cause a bloom filter to be built
-   *        during the build phase of this hash table.
-   **/
-  inline void enableBuildSideBloomFilter() {
-    has_build_side_bloom_filter_ = true;
-  }
-
-  /**
-   * @brief A call to this function will cause a set of bloom filters to be
-   *        probed during the probe phase of this hash table.
-   **/
-  inline void enableProbeSideBloomFilter() {
-    has_probe_side_bloom_filter_ = true;
-  }
-
-  /**
-   * @brief This function sets the pointer to the bloom filter to be
-   *        used during the build phase of this hash table.
-   * @warning Should call enable_build_side_bloom_filter() first to enable
-   *          bloom filter usage during build phase.
-   * @note The ownership of the bloom filter lies with the caller.
-   *
-   * @param bloom_filter The pointer to the bloom filter.
-   **/
-  inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) {
-    build_bloom_filter_ = bloom_filter;
-  }
-
-  /**
-   * @brief This function adds a pointer to the list of bloom filters to be
-   *        used during the probe phase of this hash table.
-   * @warning Should call enable_probe_side_bloom_filter() first to enable
-   *          bloom filter usage during probe phase.
-   * @note The ownership of the bloom filter lies with the caller.
-   *
-   * @param bloom_filter The pointer to the bloom filter.
-   **/
-  inline void addProbeSideBloomFilter(const BloomFilter *bloom_filter) {
-    probe_bloom_filters_.emplace_back(bloom_filter);
-  }
-
-  /**
-   * @brief This function adds a vector of attribute ids corresponding to a
-   *        bloom filter used during the probe phase of this hash table.
-   * @warning Should call enable_probe_side_bloom_filter() first to enable
-   *          bloom filter usage during probe phase.
-   *
-   * @param probe_attribute_ids The vector of attribute ids to use for probing
-   *        the bloom filter.
-   **/
-  inline void addProbeSideAttributeIds(
-      std::vector<attribute_id> &&probe_attribute_ids) {
-    probe_attribute_ids_.push_back(probe_attribute_ids);
-  }
-
  protected:
   /**
    * @brief Constructor for new resizable hash table.
@@ -1318,12 +1261,6 @@ class FastHashTable : public HashTableBase<resizable,
                                    const attribute_id key_attr_id,
                                    FunctorT *functor) const;
 
-  // Data structures used for bloom filter optimized semi-joins.
-  bool has_build_side_bloom_filter_ = false;
-  bool has_probe_side_bloom_filter_ = false;
-  BloomFilter *build_bloom_filter_;
-  std::vector<const BloomFilter *> probe_bloom_filters_;
-  std::vector<std::vector<attribute_id>> probe_attribute_ids_;
   DISALLOW_COPY_AND_ASSIGN(FastHashTable);
 };
 
@@ -1449,13 +1386,6 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
                 total_entries, total_variable_key_size, &prealloc_state);
           }
         }
-        std::unique_ptr<BloomFilter> thread_local_bloom_filter;
-        if (has_build_side_bloom_filter_) {
-          thread_local_bloom_filter.reset(
-              new BloomFilter(build_bloom_filter_->getRandomSeed(),
-                              build_bloom_filter_->getNumberOfHashes(),
-                              build_bloom_filter_->getBitArraySize()));
-        }
         if (resizable) {
           while (result == HashTablePutResult::kOutOfSpace) {
             {
@@ -1474,12 +1404,6 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
                     variable_size,
                     (*functor)(*accessor),
                     using_prealloc ? &prealloc_state : nullptr);
-                // Insert into bloom filter, if enabled.
-                if (has_build_side_bloom_filter_) {
-                  thread_local_bloom_filter->insertUnSafe(
-                      static_cast<const std::uint8_t *>(key.getDataPtr()),
-                      key.getDataSize());
-                }
                 if (result == HashTablePutResult::kDuplicateKey) {
                   DEBUG_ASSERT(!using_prealloc);
                   return result;
@@ -1507,22 +1431,11 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
                                   variable_size,
                                   (*functor)(*accessor),
                                   using_prealloc ? &prealloc_state : nullptr);
-            // Insert into bloom filter, if enabled.
-            if (has_build_side_bloom_filter_) {
-              thread_local_bloom_filter->insertUnSafe(
-                  static_cast<const std::uint8_t *>(key.getDataPtr()),
-                  key.getDataSize());
-            }
             if (result != HashTablePutResult::kOK) {
               return result;
             }
           }
         }
-        // Update the build side bloom filter with thread local copy, if
-        // available.
-        if (has_build_side_bloom_filter_) {
-          build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get());
-        }
 
         return HashTablePutResult::kOK;
       });
@@ -2462,52 +2375,27 @@ void FastHashTable<resizable,
   InvokeOnAnyValueAccessor(
       accessor,
       [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-        while (accessor->next()) {
-          // Probe any bloom filters, if enabled.
-          if (has_probe_side_bloom_filter_) {
-            DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size());
-            // Check if the key is contained in the BloomFilters or not.
-            bool bloom_miss = false;
-            for (std::size_t i = 0;
-                 i < probe_bloom_filters_.size() && !bloom_miss;
-                 ++i) {
-              const BloomFilter *bloom_filter = probe_bloom_filters_[i];
-              for (const attribute_id &attr_id : probe_attribute_ids_[i]) {
-                TypedValue bloom_key = accessor->getTypedValue(attr_id);
-                if (!bloom_filter->contains(static_cast<const std::uint8_t *>(
-                                                bloom_key.getDataPtr()),
-                                            bloom_key.getDataSize())) {
-                  bloom_miss = true;
-                  break;
-                }
-              }
-            }
-            if (bloom_miss) {
-              continue;  // On a bloom filter miss, probing the hash table can
-                         // be skipped.
-            }
-          }
-
-          TypedValue key = accessor->getTypedValue(key_attr_id);
-          if (check_for_null_keys && key.isNull()) {
-            continue;
-          }
-          const std::size_t true_hash = use_scalar_literal_hash_template
-                                            ? key.getHashScalarLiteral()
-                                            : key.getHash();
-          const std::size_t adjusted_hash =
-              adjust_hashes_template ? this->AdjustHash(true_hash) : true_hash;
-          std::size_t entry_num = 0;
-          const std::uint8_t *value;
-          while (this->getNextEntryForKey(
-              key, adjusted_hash, &value, &entry_num)) {
-            (*functor)(*accessor, *value);
-            if (!allow_duplicate_keys) {
-              break;
-            }
-          }
+    while (accessor->next()) {
+      TypedValue key = accessor->getTypedValue(key_attr_id);
+      if (check_for_null_keys && key.isNull()) {
+        continue;
+      }
+      const std::size_t true_hash = use_scalar_literal_hash_template
+                                        ? key.getHashScalarLiteral()
+                                        : key.getHash();
+      const std::size_t adjusted_hash =
+          adjust_hashes_template ? this->AdjustHash(true_hash) : true_hash;
+      std::size_t entry_num = 0;
+      const std::uint8_t *value;
+      while (this->getNextEntryForKey(
+          key, adjusted_hash, &value, &entry_num)) {
+        (*functor)(*accessor, *value);
+        if (!allow_duplicate_keys) {
+          break;
         }
-      });
+      }
+    }
+  });
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/storage/FastHashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTableFactory.hpp b/storage/FastHashTableFactory.hpp
index 6d0b693..682cc2a 100644
--- a/storage/FastHashTableFactory.hpp
+++ b/storage/FastHashTableFactory.hpp
@@ -32,7 +32,6 @@
 #include "storage/SimpleScalarSeparateChainingHashTable.hpp"
 #include "storage/TupleReference.hpp"
 #include "types/TypeFactory.hpp"
-#include "utility/BloomFilter.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -183,14 +182,11 @@ class FastHashTableFactory {
    * @param proto A protobuf description of a resizable HashTable.
    * @param storage_manager The StorageManager to use (a StorageBlob will be
    *        allocated to hold the HashTable's contents).
-   * @param bloom_filters A vector of pointers to bloom filters that may be used
-   *        during hash table construction in build/probe phase.
    * @return A new resizable HashTable with parameters specified by proto.
    **/
   static FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>*
       CreateResizableFromProto(const serialization::HashTable &proto,
-                               StorageManager *storage_manager,
-                               const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
+                               StorageManager *storage_manager) {
     DCHECK(ProtoIsValid(proto))
         << "Attempted to create HashTable from invalid proto description:\n"
         << proto.DebugString();
@@ -204,35 +200,6 @@ class FastHashTableFactory {
                                       key_types,
                                       proto.estimated_num_entries(),
                                       storage_manager);
-
-    // TODO(ssaurabh): These lazy initializations can be moved from here and pushed to the
-    //                 individual implementations of the hash table constructors.
-
-    // Check if there are any build side bloom filter defined on the hash table.
-    if (proto.build_side_bloom_filter_id_size() > 0) {
-      hash_table->enableBuildSideBloomFilter();
-      hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
-    }
-
-    // Check if there are any probe side bloom filters defined on the hash table.
-    if (proto.probe_side_bloom_filters_size() > 0) {
-      hash_table->enableProbeSideBloomFilter();
-      // Add as many probe bloom filters as defined by the proto.
-      for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
-        // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
-        const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
-        hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
-
-        // Add the attribute ids corresponding to this probe bloom filter.
-        std::vector<attribute_id> probe_attribute_ids;
-        for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
-          const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
-          probe_attribute_ids.push_back(probe_attribute_id);
-        }
-        hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
-      }
-    }
-
     return hash_table;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index df6a5ec..d78f5cd 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -23,10 +23,11 @@ QS_PROTOBUF_GENERATE_CPP(utility_lipfilter_LIPFilter_proto_srcs
 add_library(quickstep_utility_lipfilter_LIPFilter LIPFilter.cpp LIPFilter.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.cpp LIPFilterAdaptiveProber.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp)
-add_library(quickstep_utility_lipfilter_LIPFilterDeploymentInfo ../../empty_src.cpp LIPFilterDeploymentInfo.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterDeployment LIPFilterDeployment.cpp LIPFilterDeployment.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilterFactory LIPFilterFactory.cpp LIPFilterFactory.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilter_proto
             ${utility_lipfilter_LIPFilter_proto_srcs})
+add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp)
 
 # Link dependencies:
 target_link_libraries(quickstep_utility_lipfilter_LIPFilter
@@ -37,13 +38,21 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilterAdaptiveProber
 target_link_libraries(quickstep_utility_lipfilter_LIPFilterBuilder
                       quickstep_catalog_CatalogTypedefs
                       quickstep_utility_Macros)
-target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeploymentInfo
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeployment
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_types_TypeFactory
                       quickstep_utility_Macros
-                      quickstep_utility_lipfilter_LIPFilter)
+                      quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_lipfilter_LIPFilter_proto)
 target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory
+                      quickstep_utility_lipfilter_LIPFilter
                       quickstep_utility_lipfilter_LIPFilter_proto
+                      quickstep_utility_lipfilter_SingleIdentityHashFilter
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto
                       ${PROTOBUF_LIBRARY}
                       quickstep_types_Type_proto)
+target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
+                      quickstep_storage_StorageConstants
+                      quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_Macros)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.hpp b/utility/lip_filter/LIPFilter.hpp
index c14b526..dd72e48 100644
--- a/utility/lip_filter/LIPFilter.hpp
+++ b/utility/lip_filter/LIPFilter.hpp
@@ -20,14 +20,20 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
 
+#include <cstddef>
 #include <vector>
 
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
 
 namespace quickstep {
 
+class Type;
+class ValueAccessor;
+
 /** \addtogroup Utility
  *  @{
  */
@@ -44,6 +50,21 @@ class LIPFilter {
     return type_;
   }
 
+  virtual void insertValueAccessor(ValueAccessor *accessor,
+                                   const attribute_id attr_id,
+                                   const Type *attr_type) = 0;
+
+  virtual std::size_t filterBatch(ValueAccessor *accessor,
+                                  const attribute_id attr_id,
+                                  const bool is_attr_nullable,
+                                  std::vector<tuple_id> *batch,
+                                  const std::size_t batch_size) const = 0;
+
+ protected:
+  LIPFilter(const LIPFilterType &type)
+      : type_(type) {
+  }
+
  private:
   LIPFilterType type_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilter.proto
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.proto b/utility/lip_filter/LIPFilter.proto
index 897a86e..def13dd 100644
--- a/utility/lip_filter/LIPFilter.proto
+++ b/utility/lip_filter/LIPFilter.proto
@@ -36,7 +36,8 @@ message LIPFilter {
 message SingleIdentityHashFilter {
   extend LIPFilter {
     // All required
-    optional uint64 num_bits = 16;
+    optional uint64 filter_cardinality = 16;
+    optional uint64 attribute_size = 17;
   }
 }
 
@@ -45,9 +46,13 @@ enum LIPFilterActionType {
   PROBE = 2;
 }
 
-message LIPFilterDeploymentInfo {
+message LIPFilterDeployment {
+  message Entry {
+    required uint32 lip_filter_id = 1;
+    required int32 attribute_id = 2;
+    required Type attribute_type = 3;
+  }
+
   required LIPFilterActionType action_type = 1;
-  required uint32 lip_filter_id = 2;
-  required int32 attribute_id = 3;
-  required Type attribute_type = 4;
+  repeated Entry entries = 2;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterAdaptiveProber.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterAdaptiveProber.hpp b/utility/lip_filter/LIPFilterAdaptiveProber.hpp
index 6005690..af42446 100644
--- a/utility/lip_filter/LIPFilterAdaptiveProber.hpp
+++ b/utility/lip_filter/LIPFilterAdaptiveProber.hpp
@@ -20,10 +20,19 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_
 
+#include <algorithm>
+#include <cstdint>
+#include <memory>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
 #include "utility/Macros.hpp"
+#include "utility/lip_filter/SingleIdentityHashFilter.hpp"
 
 namespace quickstep {
 
@@ -33,16 +42,16 @@ namespace quickstep {
 
 class LIPFilterAdaptiveProber {
  public:
-  LIPFilterAdaptiveProber(const std::vector<const LIPFilter *> &lip_filters,
+  LIPFilterAdaptiveProber(const std::vector<LIPFilter *> &lip_filters,
                           const std::vector<attribute_id> &attr_ids,
-                          const std::vector<std::size_t> &attr_sizes) {
+                          const std::vector<const Type *> &attr_types) {
     DCHECK_EQ(lip_filters.size(), attr_ids.size());
-    DCHECK_EQ(lip_filters.size(), attr_sizes.size());
+    DCHECK_EQ(lip_filters.size(), attr_types.size());
 
     probe_entries_.reserve(lip_filters.size());
     for (std::size_t i = 0; i < lip_filters.size(); ++i) {
       probe_entries_.emplace_back(
-          new ProbeEntry(lip_filters[i], attr_ids[i], attr_sizes[i]));
+          new ProbeEntry(lip_filters[i], attr_ids[i], attr_types[i]));
     }
   }
 
@@ -52,14 +61,23 @@ class LIPFilterAdaptiveProber {
     }
   }
 
+  TupleIdSequence* filterValueAccessor(ValueAccessor *accessor) {
+    const TupleIdSequence *existence_map = accessor->getTupleIdSequenceVirtual();
+    if (existence_map == nullptr) {
+      return filterValueAccessorNoExistenceMap(accessor);
+    } else {
+      return filterValueAccessorWithExistenceMap(accessor, existence_map);
+    }
+  }
+
  private:
   struct ProbeEntry {
     ProbeEntry(const LIPFilter *lip_filter_in,
                const attribute_id attr_id_in,
-               const std::size_t attr_size_in)
+               const Type *attr_type_in)
         : lip_filter(lip_filter_in),
           attr_id(attr_id_in),
-          attr_size(attr_size_in),
+          attr_type(attr_type_in),
           miss(0),
           cnt(0) {
     }
@@ -69,12 +87,95 @@ class LIPFilterAdaptiveProber {
     }
     const LIPFilter *lip_filter;
     const attribute_id attr_id;
-    const std::size_t attr_size;
+    const Type *attr_type;
     std::uint32_t miss;
     std::uint32_t cnt;
     float miss_rate;
   };
 
+
+  inline TupleIdSequence* filterValueAccessorNoExistenceMap(ValueAccessor *accessor) {
+    const std::uint32_t num_tuples = accessor->getNumTuplesVirtual();
+    std::unique_ptr<TupleIdSequence> matches(new TupleIdSequence(num_tuples));
+    std::uint32_t next_batch_size = 64u;
+    std::vector<tuple_id> batch(num_tuples);
+
+    std::uint32_t batch_start = 0;
+    do {
+      const std::uint32_t batch_size =
+          std::min(next_batch_size, num_tuples - batch_start);
+      for (std::uint32_t i = 0; i < batch_size; ++i) {
+        batch[i] = batch_start + i;
+      }
+
+      const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size);
+      for (std::uint32_t i = 0; i < num_hits; ++i) {
+        matches->set(batch[i], true);
+      }
+
+      batch_start += batch_size;
+      next_batch_size *= 2;
+    } while (batch_start < num_tuples);
+
+    return matches.release();
+  }
+
+  inline TupleIdSequence* filterValueAccessorWithExistenceMap(ValueAccessor *accessor,
+                                                              const TupleIdSequence *existence_map) {
+    std::unique_ptr<TupleIdSequence> matches(
+        new TupleIdSequence(existence_map->length()));
+    std::uint32_t next_batch_size = 64u;
+    std::uint32_t num_tuples_left = existence_map->numTuples();
+    std::vector<tuple_id> batch(num_tuples_left);
+
+    TupleIdSequence::const_iterator tuple_it = existence_map->before_begin();
+    do {
+      const std::uint32_t batch_size =
+          next_batch_size < num_tuples_left ? next_batch_size : num_tuples_left;
+      for (std::uint32_t i = 0; i < batch_size; ++i) {
+        ++tuple_it;
+        batch[i] = *tuple_it;
+      }
+
+      const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size);
+      for (std::uint32_t i = 0; i < num_hits; ++i) {
+        matches->set(batch[i], true);
+      }
+
+      num_tuples_left -= batch_size;
+      next_batch_size *= 2;
+    } while (num_tuples_left > 0);
+
+    return matches.release();
+  }
+
+  inline std::size_t filterBatch(ValueAccessor *accessor,
+                                 std::vector<tuple_id> *batch,
+                                 std::uint32_t batch_size) {
+    for (auto *entry : probe_entries_) {
+      const std::uint32_t out_size =
+          entry->lip_filter->filterBatch(accessor,
+                                         entry->attr_id,
+                                         entry->attr_type->isNullable(),
+                                         batch,
+                                         batch_size);
+      entry->cnt += batch_size;
+      entry->miss += batch_size - out_size;
+      batch_size = out_size;
+    }
+    adaptEntryOrder();
+    return batch_size;
+  }
+
+  inline void adaptEntryOrder() {
+    for (auto &entry : probe_entries_) {
+      entry->miss_rate = static_cast<float>(entry->miss) / entry->cnt;
+    }
+    std::sort(probe_entries_.begin(),
+              probe_entries_.end(),
+              ProbeEntry::isBetterThan);
+  }
+
   std::vector<ProbeEntry *> probe_entries_;
 
   DISALLOW_COPY_AND_ASSIGN(LIPFilterAdaptiveProber);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterBuilder.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterBuilder.hpp b/utility/lip_filter/LIPFilterBuilder.hpp
index 07b26da..0a2d465 100644
--- a/utility/lip_filter/LIPFilterBuilder.hpp
+++ b/utility/lip_filter/LIPFilterBuilder.hpp
@@ -20,29 +20,41 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_
 
+#include <memory>
 #include <vector>
 
-#include "utility/Macros.hpp"
-
 #include "catalog/CatalogTypedefs.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
 
 namespace quickstep {
 
+class ValueAccessor;
+
 /** \addtogroup Utility
  *  @{
  */
 
+class LIPFilterBuilder;
+typedef std::shared_ptr<LIPFilterBuilder> LIPFilterBuilderPtr;
+
 class LIPFilterBuilder {
  public:
   LIPFilterBuilder(const std::vector<LIPFilter *> &lip_filters,
                    const std::vector<attribute_id> &attr_ids,
-                   const std::vector<std::size_t> &attr_sizes) {
+                   const std::vector<const Type *> &attr_types) {
     DCHECK_EQ(lip_filters.size(), attr_ids.size());
-    DCHECK_EQ(lip_filters.size(), attr_sizes.size());
+    DCHECK_EQ(lip_filters.size(), attr_types.size());
 
     build_entries_.reserve(lip_filters.size());
     for (std::size_t i = 0; i < lip_filters.size(); ++i) {
-      build_entries_.emplace_back(lip_filters[i], attr_ids[i], attr_sizes[i]);
+      build_entries_.emplace_back(lip_filters[i], attr_ids[i], attr_types[i]);
+    }
+  }
+
+  void insertValueAccessor(ValueAccessor *accessor) {
+    for (auto &entry : build_entries_) {
+      entry.lip_filter->insertValueAccessor(accessor, entry.attr_id, entry.attr_type);
     }
   }
 
@@ -50,14 +62,14 @@ class LIPFilterBuilder {
   struct BuildEntry {
     BuildEntry(LIPFilter *lip_filter_in,
                const attribute_id attr_id_in,
-               const std::size_t attr_size_in)
+               const Type *attr_type_in)
         : lip_filter(lip_filter_in),
           attr_id(attr_id_in),
-          attr_size(attr_size_in) {
+          attr_type(attr_type_in) {
     }
     LIPFilter *lip_filter;
     const attribute_id attr_id;
-    const std::size_t attr_size;
+    const Type *attr_type;
   };
 
   std::vector<BuildEntry> build_entries_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterDeployment.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.cpp b/utility/lip_filter/LIPFilterDeployment.cpp
new file mode 100644
index 0000000..0ac396b
--- /dev/null
+++ b/utility/lip_filter/LIPFilterDeployment.cpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
+
+#include "types/TypeFactory.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+LIPFilterDeployment::LIPFilterDeployment(
+    const serialization::LIPFilterDeployment &proto,
+    const std::vector<std::unique_ptr<LIPFilter>> &lip_filters) {
+  switch (proto.action_type()) {
+    case serialization::LIPFilterActionType::BUILD:
+      action_type_ = LIPFilterActionType::kBuild;
+      break;
+    case serialization::LIPFilterActionType::PROBE:
+      action_type_ = LIPFilterActionType::kProbe;
+      break;
+    default:
+      LOG(FATAL) << "Unsupported LIPFilterActionType: "
+                 << serialization::LIPFilterActionType_Name(proto.action_type());
+  }
+
+  for (int i = 0; i < proto.entries_size(); ++i) {
+    const auto &entry_proto = proto.entries(i);
+    lip_filters_.emplace_back(lip_filters.at(entry_proto.lip_filter_id()).get());
+    attr_ids_.emplace_back(entry_proto.attribute_id());
+    attr_types_.emplace_back(&TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+  }
+}
+
+bool LIPFilterDeployment::ProtoIsValid(
+    const serialization::LIPFilterDeployment &proto) {
+  return true;
+}
+
+LIPFilterBuilder* LIPFilterDeployment::createLIPFilterBuilder() const {
+  DCHECK(action_type_ == LIPFilterActionType::kBuild);
+  return new LIPFilterBuilder(lip_filters_, attr_ids_, attr_types_);
+}
+
+LIPFilterAdaptiveProber* LIPFilterDeployment::createLIPFilterAdaptiveProber() const {
+  DCHECK(action_type_ == LIPFilterActionType::kProbe);
+  return new LIPFilterAdaptiveProber(lip_filters_, attr_ids_, attr_types_);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterDeployment.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.hpp b/utility/lip_filter/LIPFilterDeployment.hpp
new file mode 100644
index 0000000..60de14e
--- /dev/null
+++ b/utility/lip_filter/LIPFilterDeployment.hpp
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
+
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+
+namespace quickstep {
+
+class LIPFilterBuilder;
+class LIPFilterAdaptiveProber;
+class Type;
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+enum class LIPFilterActionType {
+  kBuild = 0,
+  kProbe
+};
+
+class LIPFilterDeployment {
+ public:
+  LIPFilterDeployment(const serialization::LIPFilterDeployment &proto,
+                      const std::vector<std::unique_ptr<LIPFilter>> &lip_filters);
+
+  static bool ProtoIsValid(const serialization::LIPFilterDeployment &proto);
+
+  LIPFilterActionType getActionType() const {
+    return action_type_;
+  }
+
+  LIPFilterBuilder* createLIPFilterBuilder() const;
+
+  LIPFilterAdaptiveProber* createLIPFilterAdaptiveProber() const;
+
+ private:
+  LIPFilterActionType action_type_;
+  std::vector<LIPFilter *> lip_filters_;
+  std::vector<attribute_id> attr_ids_;
+  std::vector<const Type *> attr_types_;
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterDeploymentInfo.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeploymentInfo.hpp b/utility/lip_filter/LIPFilterDeploymentInfo.hpp
deleted file mode 100644
index db75021..0000000
--- a/utility/lip_filter/LIPFilterDeploymentInfo.hpp
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_INFO_HPP_
-#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_INFO_HPP_
-
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "utility/Macros.hpp"
-#include "utility/lip_filter/LIPFilter.hpp"
-
-namespace quickstep {
-
-/** \addtogroup Utility
- *  @{
- */
-
-enum class LIPFilterActionType {
-  kBuild = 0,
-  kProbe
-};
-
-class LIPFilterDeploymentInfo {
- public:
-  const LIPFilterActionType getActionType() const {
-    return action_type_;
-  }
-
-  const std::vector<LIPFilter*>& lip_filters() const {
-    return lip_filters_;
-  }
-
-  const std::vector<attribute_id>& attr_ids() const {
-    return attr_ids_;
-  }
-
-  const std::vector<const Type*>& attr_types() const {
-    return attr_types_;
-  }
-
- private:
-  LIPFilterActionType action_type_;
-  std::vector<LIPFilter*> lip_filters_;
-  std::vector<attribute_id> attr_ids_;
-  std::vector<const Type*> attr_types_;
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_INFO_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterFactory.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.cpp b/utility/lip_filter/LIPFilterFactory.cpp
index e69de29..f0e7725 100644
--- a/utility/lip_filter/LIPFilterFactory.cpp
+++ b/utility/lip_filter/LIPFilterFactory.cpp
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilterFactory.hpp"
+
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/SingleIdentityHashFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter &proto) {
+  switch (proto.lip_filter_type()) {
+    case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
+      const std::size_t attr_size =
+          proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);
+      const std::size_t filter_cardinality =
+          proto.GetExtension(serialization::SingleIdentityHashFilter::filter_cardinality);
+
+      if (attr_size >= 8) {
+        return new SingleIdentityHashFilter<std::uint64_t>(filter_cardinality);
+      } else if (attr_size >= 4) {
+        return new SingleIdentityHashFilter<std::uint32_t>(filter_cardinality);
+      } else if (attr_size >= 2) {
+        return new SingleIdentityHashFilter<std::uint16_t>(filter_cardinality);
+      } else {
+        return new SingleIdentityHashFilter<std::uint8_t>(filter_cardinality);
+      }
+    }
+    default:
+      LOG(FATAL) << "Unsupported LIP filter type: "
+                 << serialization::LIPFilterType_Name(proto.lip_filter_type());
+  }
+}
+
+bool LIPFilterFactory::ProtoIsValid(const serialization::LIPFilter &proto) {
+  return true;
+}
+
+}  // namespace quickstep
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterFactory.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.hpp b/utility/lip_filter/LIPFilterFactory.hpp
index 0567093..6a94ae4 100644
--- a/utility/lip_filter/LIPFilterFactory.hpp
+++ b/utility/lip_filter/LIPFilterFactory.hpp
@@ -23,17 +23,21 @@
 #include <vector>
 
 #include "utility/Macros.hpp"
-
-#include "glog/logging.h"
+#include "utility/lip_filter/LIPFilter.pb.h"
 
 namespace quickstep {
 
+class LIPFilter;
+
 /** \addtogroup Utility
  *  @{
  */
 
 class LIPFilterFactory {
  public:
+  static LIPFilter* ReconstructFromProto(const serialization::LIPFilter &proto);
+
+  static bool ProtoIsValid(const serialization::LIPFilter &proto);
 
  private:
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
new file mode 100644
index 0000000..40ef14b
--- /dev/null
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
+
+#include <vector>
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <utility>
+#include <vector>
+
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+template <typename CppType>
+class SingleIdentityHashFilter : public LIPFilter {
+ public:
+  SingleIdentityHashFilter(const std::size_t filter_cardinality)
+      : LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
+        filter_cardinality_(filter_cardinality),
+        bit_array_(GetByteSize(filter_cardinality)) {
+    std::memset(bit_array_.data(),
+                0x0,
+                sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
+  }
+
+  void insertValueAccessor(ValueAccessor *accessor,
+                           const attribute_id attr_id,
+                           const Type *attr_type) override {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      if (attr_type->isNullable()) {
+        insertValueAccessorInternal<true>(accessor, attr_id);
+      } else {
+        insertValueAccessorInternal<false>(accessor, attr_id);
+      }
+    });
+  }
+
+  std::size_t filterBatch(ValueAccessor *accessor,
+                          const attribute_id attr_id,
+                          const bool is_attr_nullable,
+                          std::vector<tuple_id> *batch,
+                          const std::size_t batch_size) const override {
+    return InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> std::size_t {  // NOLINT(build/c++11)
+      if (is_attr_nullable) {
+        return filterBatchInternal<true>(accessor, attr_id, batch, batch_size);
+      } else {
+        return filterBatchInternal<false>(accessor, attr_id, batch, batch_size);
+      }
+    });
+  }
+
+
+  /**
+   * @brief Inserts a given value into the hash filter.
+   *
+   * @param key_begin A pointer to the value being inserted.
+   */
+  inline void insert(const void *key_begin) {
+    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    bit_array_[hash >> 3].fetch_or(1 << (hash & 0x7), std::memory_order_relaxed);
+  }
+
+  /**
+   * @brief Test membership of a given value in the hash filter.
+   *        If true is returned, then a value may or may not be present in the hash filter.
+   *        If false is returned, a value is certainly not present in the hash filter.
+   *
+   * @param key_begin A pointer to the value being tested for membership.
+   */
+  inline bool contains(const void *key_begin) const {
+    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    return ((bit_array_[hash >> 3].load(std::memory_order_relaxed) & (1 << (hash & 0x7))) > 0);
+  }
+
+ private:
+  inline static std::size_t GetByteSize(const std::size_t bit_size) {
+    return (bit_size + 7) / 8;
+  }
+
+  template <bool is_attr_nullable, typename ValueAccessorT>
+  inline void insertValueAccessorInternal(ValueAccessorT *accessor,
+                                          const attribute_id attr_id) {
+    accessor->beginIteration();
+    while (accessor->next()) {
+      const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+      if (!is_attr_nullable || value != nullptr) {
+        insert(value);
+      }
+    }
+  }
+
+  template <bool is_attr_nullable, typename ValueAccessorT>
+  inline std::size_t filterBatchInternal(const ValueAccessorT *accessor,
+                                         const attribute_id attr_id,
+                                         std::vector<tuple_id> *batch,
+                                         const std::size_t batch_size) const {
+    std::size_t out_size = 0;
+    for (std::size_t i = 0; i < batch_size; ++i) {
+      const tuple_id tid = batch->at(i);
+      const void *value =
+          accessor->template getUntypedValueAtAbsolutePosition(attr_id, tid);
+      if (contains(value)) {
+        batch->at(out_size) = tid;
+        ++out_size;
+      }
+    }
+    return out_size;
+  }
+
+
+  std::size_t filter_cardinality_;
+  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+
+  DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_