You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/10/22 19:15:45 UTC

[2/4] incubator-quickstep git commit: ExecutionGenerator and QueryContext support for LIPFilters.

ExecutionGenerator and QueryContext support for LIPFilters.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8746eed4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8746eed4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8746eed4

Branch: refs/heads/partitioned-aggregate-new
Commit: 8746eed4d06ee07d64cbef8ab7851c53c50e95be
Parents: a58a747
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Wed Sep 7 13:20:43 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Fri Oct 21 12:34:59 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |   7 +-
 query_execution/QueryContext.cpp                |  33 ++-
 query_execution/QueryContext.hpp                |  96 ++++++++
 query_execution/QueryContext.proto              |  25 +-
 query_optimizer/CMakeLists.txt                  |  21 ++
 query_optimizer/ExecutionGenerator.cpp          |  36 ++-
 query_optimizer/ExecutionGenerator.hpp          |   4 +
 query_optimizer/LIPFilterGenerator.cpp          | 194 +++++++++++++++
 query_optimizer/LIPFilterGenerator.hpp          | 194 +++++++++++++++
 query_optimizer/QueryPlan.hpp                   |  32 +++
 relational_operators/RelationalOperator.hpp     |  12 +-
 utility/lip_filter/CMakeLists.txt               |  55 ++++-
 utility/lip_filter/LIPFilter.hpp                |  64 +++++
 utility/lip_filter/LIPFilter.proto              |  58 +++++
 utility/lip_filter/LIPFilterAdaptiveProber.hpp  | 243 +++++++++++++++++++
 utility/lip_filter/LIPFilterBuilder.hpp         | 109 +++++++++
 utility/lip_filter/LIPFilterDeployment.cpp      |  87 +++++++
 utility/lip_filter/LIPFilterDeployment.hpp      | 111 +++++++++
 utility/lip_filter/LIPFilterFactory.cpp         |  74 ++++++
 utility/lip_filter/LIPFilterFactory.hpp         |  69 ++++++
 utility/lip_filter/SingleIdentityHashFilter.hpp | 185 ++++++++++++++
 21 files changed, 1684 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index dafdea4..b5e07df 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -190,9 +190,11 @@ target_link_libraries(quickstep_queryexecution_QueryContext
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple
                       quickstep_utility_Macros
-                      quickstep_utility_SortConfiguration)
+                      quickstep_utility_SortConfiguration
+                      quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_lipfilter_LIPFilterDeployment
+                      quickstep_utility_lipfilter_LIPFilterFactory)
 target_link_libraries(quickstep_queryexecution_QueryContext_proto
-                      quickstep_utility_BloomFilter_proto
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_tablegenerator_GeneratorFunction_proto
                       quickstep_storage_AggregationOperationState_proto
@@ -201,6 +203,7 @@ target_link_libraries(quickstep_queryexecution_QueryContext_proto
                       quickstep_storage_WindowAggregationOperationState_proto
                       quickstep_types_containers_Tuple_proto
                       quickstep_utility_SortConfiguration_proto
+                      quickstep_utility_lipfilter_LIPFilter_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_catalog_Catalog_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 6612611..0e6636d 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -40,6 +40,9 @@
 #include "types/TypedValue.hpp"
 #include "types/containers/Tuple.hpp"
 #include "utility/SortConfiguration.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
+#include "utility/lip_filter/LIPFilterFactory.hpp"
 
 #include "glog/logging.h"
 
@@ -92,6 +95,18 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
         bus));
   }
 
+  for (int i = 0; i < proto.lip_filters_size(); ++i) {
+    lip_filters_.emplace_back(
+        std::unique_ptr<LIPFilter>(
+            LIPFilterFactory::ReconstructFromProto(proto.lip_filters(i))));
+  }
+
+  for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) {
+    lip_deployments_.emplace_back(
+        std::make_unique<LIPFilterDeployment>(
+            proto.lip_filter_deployments(i), lip_filters_));
+  }
+
   for (int i = 0; i < proto.predicates_size(); ++i) {
     predicates_.emplace_back(
         PredicateFactory::ReconstructFromProto(proto.predicates(i), database));
@@ -151,12 +166,6 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
     }
   }
 
-  for (int i = 0; i < proto.bloom_filters_size(); ++i) {
-    if (!BloomFilter::ProtoIsValid(proto.bloom_filters(i))) {
-      return false;
-    }
-  }
-
   // Each GeneratorFunctionHandle object is serialized as a function name with
   // a list of arguments. Here checks that the arguments are valid TypedValue's.
   for (int i = 0; i < proto.generator_functions_size(); ++i) {
@@ -185,6 +194,18 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
     }
   }
 
+  for (int i = 0; i < proto.lip_filters_size(); ++i) {
+    if (!LIPFilterFactory::ProtoIsValid(proto.lip_filters(i))) {
+      return false;
+    }
+  }
+
+  for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) {
+    if (!LIPFilterDeployment::ProtoIsValid(proto.lip_filter_deployments(i))) {
+      return false;
+    }
+  }
+
   for (int i = 0; i < proto.predicates_size(); ++i) {
     if (!PredicateFactory::ProtoIsValid(proto.predicates(i), database)) {
       return false;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 7cc44ad..4ebb042 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -37,6 +37,8 @@
 #include "types/containers/Tuple.hpp"
 #include "utility/Macros.hpp"
 #include "utility/SortConfiguration.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
 
 #include "glog/logging.h"
 
@@ -84,6 +86,17 @@ class QueryContext {
   typedef std::uint32_t join_hash_table_id;
 
   /**
+   * @brief A unique identifier for a LIPFilterDeployment per query.
+   **/
+  typedef std::uint32_t lip_deployment_id;
+  static constexpr lip_deployment_id kInvalidLIPDeploymentId = static_cast<lip_deployment_id>(-1);
+
+  /**
+   * @brief A unique identifier for a LIPFilter per query.
+   **/
+  typedef std::uint32_t lip_filter_id;
+
+  /**
    * @brief A unique identifier for a Predicate per query.
    *
    * @note A negative value indicates a null Predicate.
@@ -295,6 +308,87 @@ class QueryContext {
   }
 
   /**
+   * @brief Whether the given LIPFilterDeployment id is valid.
+   *
+   * @param id The LIPFilterDeployment id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidLIPDeploymentId(const lip_deployment_id id) const {
+    return id < lip_deployments_.size();
+  }
+
+  /**
+   * @brief Get a constant pointer to the LIPFilterDeployment.
+   *
+   * @param id The LIPFilterDeployment id.
+   *
+   * @return The constant pointer to LIPFilterDeployment that is
+   *         already created in the constructor.
+   **/
+  inline const LIPFilterDeployment* getLIPDeployment(
+      const lip_deployment_id id) const {
+    DCHECK_LT(id, lip_deployments_.size());
+    return lip_deployments_[id].get();
+  }
+
+  /**
+   * @brief Destory the given LIPFilterDeployment.
+   *
+   * @param id The id of the LIPFilterDeployment to destroy.
+   **/
+  inline void destroyLIPDeployment(const lip_deployment_id id) {
+    DCHECK_LT(id, lip_deployments_.size());
+    lip_deployments_[id].reset();
+  }
+
+  /**
+   * @brief Whether the given LIPFilter id is valid.
+   *
+   * @param id The LIPFilter id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidLIPFilterId(const lip_filter_id id) const {
+    return id < lip_filters_.size();
+  }
+
+  /**
+   * @brief Get a mutable reference to the LIPFilter.
+   *
+   * @param id The LIPFilter id.
+   *
+   * @return The LIPFilter, already created in the constructor.
+   **/
+  inline LIPFilter* getLIPFilterMutable(const lip_filter_id id) {
+    DCHECK_LT(id, lip_filters_.size());
+    return lip_filters_[id].get();
+  }
+
+  /**
+   * @brief Get a constant pointer to the LIPFilter.
+   *
+   * @param id The LIPFilter id.
+   *
+   * @return The constant pointer to LIPFilter that is
+   *         already created in the constructor.
+   **/
+  inline const LIPFilter* getLIPFilter(const lip_filter_id id) const {
+    DCHECK_LT(id, lip_filters_.size());
+    return lip_filters_[id].get();
+  }
+
+  /**
+   * @brief Destory the given LIPFilter.
+   *
+   * @param id The id of the LIPFilter to destroy.
+   **/
+  inline void destroyLIPFilter(const lip_filter_id id) {
+    DCHECK_LT(id, lip_filters_.size());
+    lip_filters_[id].reset();
+  }
+
+  /**
    * @brief Whether the given Predicate id is valid or no predicate.
    *
    * @param id The Predicate id.
@@ -472,6 +566,8 @@ class QueryContext {
   std::vector<std::unique_ptr<const GeneratorFunctionHandle>> generator_functions_;
   std::vector<std::unique_ptr<InsertDestination>> insert_destinations_;
   std::vector<std::unique_ptr<JoinHashTable>> join_hash_tables_;
+  std::vector<std::unique_ptr<LIPFilterDeployment>> lip_deployments_;
+  std::vector<std::unique_ptr<LIPFilter>> lip_filters_;
   std::vector<std::unique_ptr<const Predicate>> predicates_;
   std::vector<std::vector<std::unique_ptr<const Scalar>>> scalar_groups_;
   std::vector<std::unique_ptr<const SortConfiguration>> sort_configs_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index 1a586a4..ab0f520 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -26,8 +26,8 @@ import "storage/HashTable.proto";
 import "storage/InsertDestination.proto";
 import "storage/WindowAggregationOperationState.proto";
 import "types/containers/Tuple.proto";
-import "utility/BloomFilter.proto";
 import "utility/SortConfiguration.proto";
+import "utility/lip_filter/LIPFilter.proto";
 
 message QueryContext {
   message ScalarGroup {
@@ -46,19 +46,20 @@ message QueryContext {
   }
 
   repeated AggregationOperationState aggregation_states = 1;
-  repeated BloomFilter bloom_filters = 2;
-  repeated GeneratorFunctionHandle generator_functions = 3;
-  repeated HashTable join_hash_tables = 4;
-  repeated InsertDestination insert_destinations = 5;
-  repeated Predicate predicates = 6;
-  repeated ScalarGroup scalar_groups = 7;
-  repeated SortConfiguration sort_configs = 8;
-  repeated Tuple tuples = 9;
+  repeated GeneratorFunctionHandle generator_functions = 2;
+  repeated HashTable join_hash_tables = 3;
+  repeated InsertDestination insert_destinations = 4;
+  repeated LIPFilter lip_filters = 5;
+  repeated LIPFilterDeployment lip_filter_deployments = 6;
+  repeated Predicate predicates = 7;
+  repeated ScalarGroup scalar_groups = 8;
+  repeated SortConfiguration sort_configs = 9;
+  repeated Tuple tuples = 10;
 
   // NOTE(zuyu): For UpdateWorkOrder only.
-  repeated UpdateGroup update_groups = 10;
+  repeated UpdateGroup update_groups = 11;
 
-  repeated WindowAggregationOperationState window_aggregation_states = 11;
+  repeated WindowAggregationOperationState window_aggregation_states = 12;
 
-  required uint64 query_id = 12;
+  required uint64 query_id = 13;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 8333d4b..00d5163 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -41,6 +41,7 @@ add_subdirectory(tests)
 
 # Declare micro-libs:
 add_library(quickstep_queryoptimizer_ExecutionGenerator ExecutionGenerator.cpp ExecutionGenerator.hpp)
+add_library(quickstep_queryoptimizer_LIPFilterGenerator LIPFilterGenerator.cpp LIPFilterGenerator.hpp)
 add_library(quickstep_queryoptimizer_LogicalGenerator LogicalGenerator.cpp LogicalGenerator.hpp)
 add_library(quickstep_queryoptimizer_LogicalToPhysicalMapper
             ../empty_src.cpp
@@ -72,6 +73,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
+                      quickstep_queryoptimizer_LIPFilterGenerator
                       quickstep_queryoptimizer_OptimizerContext
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_queryoptimizer_QueryPlan
@@ -99,6 +101,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_InsertSelection
                       quickstep_queryoptimizer_physical_InsertTuple
+                      quickstep_queryoptimizer_physical_LIPFilterConfiguration
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
                       quickstep_queryoptimizer_physical_PatternMatcher
                       quickstep_queryoptimizer_physical_Physical
@@ -152,6 +155,23 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                         quickstep_catalog_Catalog_proto)
 endif()
+target_link_libraries(quickstep_queryoptimizer_LIPFilterGenerator
+                      glog
+                      quickstep_catalog_CatalogAttribute
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_QueryContext_proto
+                      quickstep_queryoptimizer_QueryPlan
+                      quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_LIPFilterConfiguration
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_types_Type
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_lipfilter_LIPFilter_proto)
 target_link_libraries(quickstep_queryoptimizer_LogicalGenerator
                       glog
                       quickstep_parser_ParseStatement
@@ -225,6 +245,7 @@ target_link_libraries(quickstep_queryoptimizer_Validator
 add_library(quickstep_queryoptimizer ../empty_src.cpp QueryOptimizerModule.hpp)
 target_link_libraries(quickstep_queryoptimizer
                       quickstep_queryoptimizer_ExecutionGenerator
+                      quickstep_queryoptimizer_LIPFilterGenerator
                       quickstep_queryoptimizer_LogicalGenerator
                       quickstep_queryoptimizer_LogicalToPhysicalMapper
                       quickstep_queryoptimizer_Optimizer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 5a701b7..2e0d8f3 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -54,6 +54,7 @@
 #include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/LIPFilterGenerator.hpp"
 #include "query_optimizer/OptimizerContext.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
@@ -76,6 +77,7 @@
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/InsertSelection.hpp"
 #include "query_optimizer/physical/InsertTuple.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/PatternMatcher.hpp"
 #include "query_optimizer/physical/Physical.hpp"
@@ -153,9 +155,6 @@ static const volatile bool aggregate_hashtable_type_dummy
 
 DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
 
-DEFINE_bool(optimize_joins, false,
-            "Enable post execution plan generation optimizations for joins.");
-
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
 namespace S = ::quickstep::serialization;
@@ -171,6 +170,12 @@ void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) {
   cost_model_for_hash_join_.reset(
       new cost::SimpleCostModel(top_level_physical_plan_->shared_subplans()));
 
+  const auto &lip_filter_configuration =
+      top_level_physical_plan_->lip_filter_configuration();
+  if (lip_filter_configuration != nullptr) {
+    lip_filter_generator_.reset(new LIPFilterGenerator(lip_filter_configuration));
+  }
+
   const CatalogRelation *result_relation = nullptr;
 
   try {
@@ -179,6 +184,11 @@ void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) {
     }
     generatePlanInternal(top_level_physical_plan_->plan());
 
+    // Deploy LIPFilters if enabled.
+    if (lip_filter_generator_ != nullptr) {
+      lip_filter_generator_->deployLIPFilters(execution_plan_, query_context_proto_);
+    }
+
     // Set the query result relation if the input plan exists in physical_to_execution_map_,
     // which indicates the plan is the result of a SELECT query.
     const std::unordered_map<P::PhysicalPtr, CatalogRelationInfo>::const_iterator it =
@@ -235,6 +245,11 @@ void ExecutionGenerator::generatePlanInternal(
     generatePlanInternal(child);
   }
 
+  // If enabled, collect attribute substitution map for LIPFilterGenerator.
+  if (lip_filter_generator_ != nullptr) {
+    lip_filter_generator_->registerAttributeMap(physical_plan, attribute_substitution_map_);
+  }
+
   switch (physical_plan->getPhysicalType()) {
     case P::PhysicalType::kAggregate:
       return convertAggregate(
@@ -566,6 +581,10 @@ void ExecutionGenerator::convertSelection(
       std::forward_as_tuple(select_index,
                             output_relation));
   temporary_relation_info_vec_.emplace_back(select_index, output_relation);
+
+  if (lip_filter_generator_ != nullptr) {
+    lip_filter_generator_->addSelectionInfo(physical_selection, select_index);
+  }
 }
 
 void ExecutionGenerator::convertSharedSubplanReference(const physical::SharedSubplanReferencePtr &physical_plan) {
@@ -794,6 +813,12 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
       std::forward_as_tuple(join_operator_index,
                             output_relation));
   temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
+
+  if (lip_filter_generator_ != nullptr) {
+    lip_filter_generator_->addHashJoinInfo(physical_plan,
+                                           build_operator_index,
+                                           join_operator_index);
+  }
 }
 
 void ExecutionGenerator::convertNestedLoopsJoin(
@@ -1421,6 +1446,11 @@ void ExecutionGenerator::convertAggregate(
   execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
                                        finalize_aggregation_operator_index,
                                        true);
+
+  if (lip_filter_generator_ != nullptr) {
+    lip_filter_generator_->addAggregateInfo(physical_plan,
+                                            aggregation_operator_index);
+  }
 }
 
 void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index b7d8ef9..55197c9 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -33,6 +33,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/LIPFilterGenerator.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
@@ -427,6 +428,9 @@ class ExecutionGenerator {
 
   physical::TopLevelPlanPtr top_level_physical_plan_;
 
+  // Sub-generator for deploying LIP (lookahead information passing) filters.
+  std::unique_ptr<LIPFilterGenerator> lip_filter_generator_;
+
   DISALLOW_COPY_AND_ASSIGN(ExecutionGenerator);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/query_optimizer/LIPFilterGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/LIPFilterGenerator.cpp b/query_optimizer/LIPFilterGenerator.cpp
new file mode 100644
index 0000000..404037e
--- /dev/null
+++ b/query_optimizer/LIPFilterGenerator.cpp
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/LIPFilterGenerator.hpp"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "relational_operators/RelationalOperator.hpp"
+#include "types/Type.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+void LIPFilterGenerator::registerAttributeMap(
+    const P::PhysicalPtr &node,
+    const std::unordered_map<E::ExprId, const CatalogAttribute *> &attribute_substitution_map) {
+  // Check if a builder is attached to node.
+  const auto &build_info_map = lip_filter_configuration_->getBuildInfoMap();
+  const auto build_it = build_info_map.find(node);
+  if (build_it != build_info_map.end()) {
+    auto &map_entry = attribute_map_[node];
+    for (const auto &info : build_it->second) {
+      E::ExprId attr_id = info.build_attribute->id();
+      map_entry.emplace(attr_id, attribute_substitution_map.at(attr_id));
+    }
+  }
+  // Check if a prober is attached to node.
+  const auto &probe_info_map = lip_filter_configuration_->getProbeInfoMap();
+  const auto probe_it = probe_info_map.find(node);
+  if (probe_it != probe_info_map.end()) {
+    auto &map_entry = attribute_map_[node];
+    for (const auto &info : probe_it->second) {
+      E::ExprId attr_id = info.probe_attribute->id();
+      map_entry.emplace(attr_id, attribute_substitution_map.at(attr_id));
+    }
+  }
+}
+
+void LIPFilterGenerator::deployLIPFilters(QueryPlan *execution_plan,
+                                          serialization::QueryContext *query_context_proto) const {
+  LIPFilterBuilderMap lip_filter_builder_map;
+
+  // Deploy builders
+  const auto &build_info_map = lip_filter_configuration_->getBuildInfoMap();
+  for (const auto &info : builder_infos_) {
+    const auto build_it = build_info_map.find(info.builder_node);
+    if (build_it != build_info_map.end()) {
+      deployBuilderInternal(execution_plan,
+                            query_context_proto,
+                            info.builder_node,
+                            info.builder_operator_index,
+                            build_it->second,
+                            &lip_filter_builder_map);
+    }
+  }
+
+  // Deploy probers
+  const auto &probe_info_map = lip_filter_configuration_->getProbeInfoMap();
+  for (const auto &info : prober_infos_) {
+    const auto probe_it = probe_info_map.find(info.prober_node);
+    if (probe_it != probe_info_map.end()) {
+      deployProberInteral(execution_plan,
+                          query_context_proto,
+                          info.prober_node,
+                          info.prober_operator_index,
+                          probe_it->second,
+                          lip_filter_builder_map);
+    }
+  }
+}
+
+void LIPFilterGenerator::deployBuilderInternal(
+    QueryPlan *execution_plan,
+    serialization::QueryContext *query_context_proto,
+    const physical::PhysicalPtr &builder_node,
+    const QueryPlan::DAGNodeIndex builder_operator_index,
+    const std::vector<physical::LIPFilterBuildInfo> &build_info_vec,
+    LIPFilterBuilderMap *lip_filter_builder_map) const {
+  const auto lip_deployment_index = query_context_proto->lip_filter_deployments_size();
+  auto *lip_filter_deployment_info_proto =
+      query_context_proto->add_lip_filter_deployments();
+  lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::BUILD);
+
+  const auto &builder_attribute_map = attribute_map_.at(builder_node);
+  for (const auto &info : build_info_vec) {
+    // Add the LIPFilter information into query context.
+    const QueryContext::lip_filter_id lip_filter_id = query_context_proto->lip_filters_size();
+    serialization::LIPFilter *lip_filter_proto = query_context_proto->add_lip_filters();
+    const CatalogAttribute *target_attr = builder_attribute_map.at(info.build_attribute->id());
+    const Type &attr_type = target_attr->getType();
+
+    switch (info.filter_type) {
+      case LIPFilterType::kSingleIdentityHashFilter: {
+        DCHECK(!attr_type.isVariableLength());
+        lip_filter_proto->set_lip_filter_type(
+            serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER);
+        lip_filter_proto->SetExtension(
+            serialization::SingleIdentityHashFilter::filter_cardinality, info.filter_cardinality);
+        lip_filter_proto->SetExtension(
+            serialization::SingleIdentityHashFilter::attribute_size, attr_type.minimumByteLength());
+        break;
+      }
+      default:
+        LOG(FATAL) << "Unsupported LIPFilter type";
+        break;
+    }
+
+    // Register the builder information which is needed later by the probers.
+    lip_filter_builder_map->emplace(
+        std::make_pair(info.build_attribute->id(), builder_node),
+        std::make_pair(lip_filter_id, builder_operator_index));
+
+    // Add the builder deployment information into query context.
+    auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_entries();
+    lip_filter_entry_proto->set_lip_filter_id(lip_filter_id);
+    lip_filter_entry_proto->set_attribute_id(target_attr->getID());
+    lip_filter_entry_proto->mutable_attribute_type()->CopyFrom(attr_type.getProto());
+  }
+
+  // Attach the LIPFilterDeployment information to the RelationalOperator.
+  RelationalOperator *relop =
+      execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(builder_operator_index);
+  relop->deployLIPFilters(lip_deployment_index);
+}
+
+void LIPFilterGenerator::deployProberInteral(
+    QueryPlan *execution_plan,
+    serialization::QueryContext *query_context_proto,
+    const physical::PhysicalPtr &prober_node,
+    const QueryPlan::DAGNodeIndex prober_operator_index,
+    const std::vector<physical::LIPFilterProbeInfo> &probe_info_vec,
+    const LIPFilterBuilderMap &lip_filter_builder_map) const {
+  const auto lip_deployment_index = query_context_proto->lip_filter_deployments_size();
+  auto *lip_filter_deployment_info_proto =
+      query_context_proto->add_lip_filter_deployments();
+  lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::PROBE);
+
+  const auto &prober_attribute_map = attribute_map_.at(prober_node);
+  for (const auto &info : probe_info_vec) {
+    // Find the corresponding builder for the to-be-probed LIPFilter.
+    const auto &builder_info =
+        lip_filter_builder_map.at(
+            std::make_pair(info.build_attribute->id(), info.builder));
+    const CatalogAttribute *target_attr = prober_attribute_map.at(info.probe_attribute->id());
+
+    // Add the prober deployment information into query context.
+    auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_entries();
+    lip_filter_entry_proto->set_lip_filter_id(builder_info.first);
+    lip_filter_entry_proto->set_attribute_id(target_attr->getID());
+    lip_filter_entry_proto->mutable_attribute_type()->CopyFrom(
+        target_attr->getType().getProto());
+
+    // A prober must wait until the corresponding builder has completed building
+    // the LIPFilter.
+    execution_plan->addOrUpgradeDirectDependency(prober_operator_index,
+                                                 builder_info.second,
+                                                 true /* is_pipeline_breaker */);
+  }
+
+  // Attach the LIPFilterDeployment information to the RelationalOperator.
+  RelationalOperator *relop =
+      execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(prober_operator_index);
+  relop->deployLIPFilters(lip_deployment_index);
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/query_optimizer/LIPFilterGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/LIPFilterGenerator.hpp b/query_optimizer/LIPFilterGenerator.hpp
new file mode 100644
index 0000000..9d191a1
--- /dev/null
+++ b/query_optimizer/LIPFilterGenerator.hpp
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_LIP_FILTER_GENERATOR_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_LIP_FILTER_GENERATOR_HPP_
+
+#include <map>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_optimizer/QueryPlan.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+namespace serialization { class QueryContext; }
+
+class CatalogAttribute;
+
+namespace optimizer {
+
+/** \addtogroup QueryOptimizer
+ *  @{
+ */
+
+/**
+ * @brief Generates backend LIPFilter deployments from physical plan's information.
+ */
+class LIPFilterGenerator {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param lip_filter_configuration The LIPFilter configuration information
+   *        generated by physical optimizer.
+   */
+  explicit LIPFilterGenerator(
+      const physical::LIPFilterConfigurationPtr &lip_filter_configuration)
+      : lip_filter_configuration_(lip_filter_configuration) {
+    DCHECK(lip_filter_configuration_ != nullptr);
+  }
+
+  /**
+   * @brief Collect the ExprId to CatalogAttribute mapping information for the
+   *        given physical node.
+   *
+   * @param node A physical plan node.
+   * @param attribute_substitution_map A map that maps each ExprId to the
+   *        backend relation's CatalogAttribute's.
+   */
+  void registerAttributeMap(
+      const physical::PhysicalPtr &node,
+      const std::unordered_map<expressions::ExprId, const CatalogAttribute *> &attribute_substitution_map);
+
+  /**
+   * @brief Add physical-to-execution mapping information for deploying LIPFilters
+   *        to an aggregation.
+   *
+   * @param aggregate A physical Aggregate node.
+   * @param aggregate_operator_index The index of the AggregationOperator that
+   *        corresponds to \p aggregate in the execution plan.
+   */
+  void addAggregateInfo(const physical::AggregatePtr &aggregate,
+                        const QueryPlan::DAGNodeIndex aggregate_operator_index) {
+    prober_infos_.emplace_back(aggregate, aggregate_operator_index);
+  }
+
+  /**
+   * @brief Add physical-to-execution mapping information for deploying LIPFilters
+   *        to a hash-join.
+   *
+   * @param hash_join A physical HashJoin node.
+   * @param build_operator_index The index of the BuildHashOperator that corresponds
+   *        to \p hash_join in the execution plan.
+   * @param join_operator_index The index of the HashJoinOperator that corresponds
+   *        to \p hash_join in the execution plan.
+   */
+  void addHashJoinInfo(const physical::HashJoinPtr &hash_join,
+                       const QueryPlan::DAGNodeIndex build_operator_index,
+                       const QueryPlan::DAGNodeIndex join_operator_index) {
+    builder_infos_.emplace_back(hash_join, build_operator_index);
+    prober_infos_.emplace_back(hash_join, join_operator_index);
+  }
+
+  /**
+   * @brief Add physical-to-execution mapping information for deploying LIPFilters
+   *        to a selection.
+   *
+   * @param selection A physical Selection node.
+   * @param select_operator_index The index of the SelectOperator that corresponds
+   *        to \p selection in the execution plan.
+   */
+  void addSelectionInfo(const physical::SelectionPtr &selection,
+                        const QueryPlan::DAGNodeIndex select_operator_index) {
+    prober_infos_.emplace_back(selection, select_operator_index);
+  }
+
+  /**
+   * @brief Deploy the LIPFilters to the execution plan.
+   *
+   * @param execution_plan The execution plan.
+   * @param query_context_proto QueryContext protobuf for the execution plan.
+   */
+  void deployLIPFilters(QueryPlan *execution_plan,
+                        serialization::QueryContext *query_context_proto) const;
+
+ private:
+  /**
+   * @brief Internal data structure for representing a LIPFilter builder.
+   */
+  struct BuilderInfo {
+    BuilderInfo(const physical::PhysicalPtr &builder_node_in,
+                const QueryPlan::DAGNodeIndex builder_operator_index_in)
+        : builder_node(builder_node_in),
+          builder_operator_index(builder_operator_index_in) {
+    }
+    const physical::PhysicalPtr builder_node;
+    const QueryPlan::DAGNodeIndex builder_operator_index;
+  };
+
+  /**
+   * @brief Internal data structure for representing a LIPFilter prober.
+   */
+  struct ProberInfo {
+    ProberInfo(const physical::PhysicalPtr &prober_node_in,
+               const QueryPlan::DAGNodeIndex prober_operator_index_in)
+        : prober_node(prober_node_in),
+          prober_operator_index(prober_operator_index_in) {
+    }
+    const physical::PhysicalPtr prober_node;
+    const QueryPlan::DAGNodeIndex prober_operator_index;
+  };
+
+  // Maps each LIPFilter's building attribute to the LIPFilter's id in QueryContext
+  // as well as the LIPFilter's building relational operator's index.
+  typedef std::map<std::pair<expressions::ExprId, physical::PhysicalPtr>,
+                   std::pair<QueryContext::lip_filter_id, QueryPlan::DAGNodeIndex>> LIPFilterBuilderMap;
+
+  void deployBuilderInternal(QueryPlan *execution_plan,
+                             serialization::QueryContext *query_context_proto,
+                             const physical::PhysicalPtr &builder_node,
+                             const QueryPlan::DAGNodeIndex builder_operator_index,
+                             const std::vector<physical::LIPFilterBuildInfo> &build_info_vec,
+                             LIPFilterBuilderMap *lip_filter_builder_map) const;
+
+  void deployProberInteral(QueryPlan *execution_plan,
+                           serialization::QueryContext *query_context_proto,
+                           const physical::PhysicalPtr &prober_node,
+                           const QueryPlan::DAGNodeIndex prober_operator_index,
+                           const std::vector<physical::LIPFilterProbeInfo> &probe_info_vec,
+                           const LIPFilterBuilderMap &lip_filter_builder_map) const;
+
+  const physical::LIPFilterConfigurationPtr lip_filter_configuration_;
+
+  std::vector<BuilderInfo> builder_infos_;
+  std::vector<ProberInfo> prober_infos_;
+
+  std::map<physical::PhysicalPtr, std::map<expressions::ExprId, const CatalogAttribute *>> attribute_map_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterGenerator);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif /* QUICKSTEP_QUERY_OPTIMIZER_LIP_FILTER_GENERATOR_HPP_ */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/query_optimizer/QueryPlan.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryPlan.hpp b/query_optimizer/QueryPlan.hpp
index 5cd174c..ef6dff4 100644
--- a/query_optimizer/QueryPlan.hpp
+++ b/query_optimizer/QueryPlan.hpp
@@ -74,6 +74,38 @@ class QueryPlan {
   }
 
   /**
+   * @brief Creates a link or upgrades the existing link from \p producer_operator_index
+   *        to \p consumer_operator_index in the DAG.
+   *
+   * Depending on whether there is an existing link from \p producer_operator_index
+   * to \p consumer_operator_index:
+   *   - Case 1, no existing link:
+   *         Creates a link with metadata set to is_pipeline_breaker.
+   *   - Case 2, existing link with metadata \p m:
+   *         Set m = (m | is_pipeline_break).
+   *
+   * @param consumer_operator_index The index of the consumer operator.
+   * @param producer_operator_index The index of the producer operator.
+   * @param is_pipeline_breaker True if the result from the producer cannot be
+   *                            pipelined to the consumer, otherwise false.
+   */
+  inline void addOrUpgradeDirectDependency(DAGNodeIndex consumer_operator_index,
+                                           DAGNodeIndex producer_operator_index,
+                                           bool is_pipeline_breaker) {
+    const auto &dependents = dag_operators_.getDependents(producer_operator_index);
+    const auto consumer_it = dependents.find(consumer_operator_index);
+    if (consumer_it == dependents.end()) {
+      dag_operators_.createLink(producer_operator_index,
+                                consumer_operator_index,
+                                is_pipeline_breaker);
+    } else {
+      dag_operators_.setLinkMetadata(producer_operator_index,
+                                     consumer_operator_index,
+                                     consumer_it->second | is_pipeline_breaker);
+    }
+  }
+
+  /**
    * @brief Creates dependencies for a DropTable operator with index
    *        \p drop_operator_index. If \p producer_operator_index
    *        has any dependent, creates a link from \p drop_operator_index

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index f0303e5..3eea189 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -245,6 +245,13 @@ class RelationalOperator {
     return op_index_;
   }
 
+  /**
+   * @brief Deploy a group of LIPFilters to this operator.
+   */
+  void deployLIPFilters(const QueryContext::lip_deployment_id lip_deployment_index) {
+    lip_deployment_index_ = lip_deployment_index;
+  }
+
  protected:
   /**
    * @brief Constructor
@@ -257,7 +264,8 @@ class RelationalOperator {
                               const bool blocking_dependencies_met = false)
       : query_id_(query_id),
         blocking_dependencies_met_(blocking_dependencies_met),
-        done_feeding_input_relation_(false) {}
+        done_feeding_input_relation_(false),
+        lip_deployment_index_(QueryContext::kInvalidLIPDeploymentId) {}
 
   const std::size_t query_id_;
 
@@ -265,6 +273,8 @@ class RelationalOperator {
   bool done_feeding_input_relation_;
   std::size_t op_index_;
 
+  QueryContext::lip_deployment_id lip_deployment_index_;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(RelationalOperator);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index 2232abe..b7224d2 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -15,5 +15,58 @@
 # specific language governing permissions and limitations
 # under the License.
 
+QS_PROTOBUF_GENERATE_CPP(utility_lipfilter_LIPFilter_proto_srcs
+                         utility_lipfilter_LIPFilter_proto_hdrs
+                         LIPFilter.proto)
+
 # Declare micro-libs:
-add_library(quickstep_utility_lipfilter_LIPFilter ../../empty_src.cpp LIPFilter.hpp)
\ No newline at end of file
+add_library(quickstep_utility_lipfilter_LIPFilter ../../empty_src.cpp LIPFilter.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.cpp LIPFilterAdaptiveProber.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterDeployment LIPFilterDeployment.cpp LIPFilterDeployment.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterFactory LIPFilterFactory.cpp LIPFilterFactory.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilter_proto
+            ${utility_lipfilter_LIPFilter_proto_srcs})
+add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_utility_lipfilter_LIPFilter
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_TupleIdSequence
+                      quickstep_storage_ValueAccessor
+                      quickstep_types_Type
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterBuilder
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeployment
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_types_TypeFactory
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+                      quickstep_utility_lipfilter_LIPFilterBuilder
+                      quickstep_utility_lipfilter_LIPFilter_proto)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory
+                      quickstep_utility_lipfilter_LIPFilter_proto
+                      quickstep_utility_lipfilter_SingleIdentityHashFilter
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto
+                      ${PROTOBUF_LIBRARY}
+                      quickstep_types_Type_proto)
+target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_Macros)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/utility/lip_filter/LIPFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.hpp b/utility/lip_filter/LIPFilter.hpp
index 33165ed..682d69f 100644
--- a/utility/lip_filter/LIPFilter.hpp
+++ b/utility/lip_filter/LIPFilter.hpp
@@ -20,8 +20,18 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
 
+#include <cstddef>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
 namespace quickstep {
 
+class Type;
+class ValueAccessor;
+
 /** \addtogroup Utility
  *  @{
  */
@@ -32,6 +42,60 @@ enum class LIPFilterType {
   kSingleIdentityHashFilter
 };
 
+/**
+ * @brief Base class for LIP (Lookahead Information Passing) filters.
+ */
+class LIPFilter {
+ public:
+  /**
+   * @breif Get the type of this LIPFilter.
+   *
+   * @return The type of this LIPFilter.
+   */
+  LIPFilterType getType() const {
+    return type_;
+  }
+
+  /**
+   * @brief Insert the values drawn from a ValueAccessor into this LIPFilter.
+   *
+   * @param accessor A ValueAccessor which will be used to access the values.
+   * @param attr_id The attribute id of the values to be read from accessor.
+   * @param attr_type The type of the values.
+   */
+  virtual void insertValueAccessor(ValueAccessor *accessor,
+                                   const attribute_id attr_id,
+                                   const Type *attr_type) = 0;
+
+  /**
+   * @brief Filter the given batch of tuples from a ValueAccessor. Remove any
+   *        tuple in the batch that does not have a hit in this filter.
+   *
+   * @param accessor A ValueAccessor which will be used to access the tuples.
+   * @param attr_id The attribute id of the values to be filtered.
+   * @param is_attr_nullable Whether the values can be NULL.
+   * @param batch The batch of tuple ids to be filtered. This vector will also
+   *        be updated in place in this method to hold the output tuple ids.
+   * @param batch_size The input batch size.
+   *
+   * @return The output batch size.
+   */
+  virtual std::size_t filterBatch(ValueAccessor *accessor,
+                                  const attribute_id attr_id,
+                                  const bool is_attr_nullable,
+                                  std::vector<tuple_id> *batch,
+                                  const std::size_t batch_size) const = 0;
+
+ protected:
+  explicit LIPFilter(const LIPFilterType &type)
+      : type_(type) {}
+
+ private:
+  LIPFilterType type_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilter);
+};
+
 /** @} */
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/utility/lip_filter/LIPFilter.proto
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.proto b/utility/lip_filter/LIPFilter.proto
new file mode 100644
index 0000000..def13dd
--- /dev/null
+++ b/utility/lip_filter/LIPFilter.proto
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+import "types/Type.proto";
+
+enum LIPFilterType {
+  BLOOM_FILTER = 1;
+  EXACT_FILTER = 2;
+  SINGLE_IDENTITY_HASH_FILTER = 3;
+}
+
+message LIPFilter {
+  required LIPFilterType lip_filter_type = 1;
+
+  extensions 16 to max;
+}
+
+message SingleIdentityHashFilter {
+  extend LIPFilter {
+    // All required
+    optional uint64 filter_cardinality = 16;
+    optional uint64 attribute_size = 17;
+  }
+}
+
+enum LIPFilterActionType {
+  BUILD = 1;
+  PROBE = 2;
+}
+
+message LIPFilterDeployment {
+  message Entry {
+    required uint32 lip_filter_id = 1;
+    required int32 attribute_id = 2;
+    required Type attribute_type = 3;
+  }
+
+  required LIPFilterActionType action_type = 1;
+  repeated Entry entries = 2;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/utility/lip_filter/LIPFilterAdaptiveProber.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterAdaptiveProber.hpp b/utility/lip_filter/LIPFilterAdaptiveProber.hpp
new file mode 100644
index 0000000..e1a75d6
--- /dev/null
+++ b/utility/lip_filter/LIPFilterAdaptiveProber.hpp
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+/**
+ * @brief Helper class for adaptively applying a group of LIPFilters to a
+ *        ValueAccessor. Here "adaptive" means that the application ordering
+ *        of the filters will be adjusted on the fly based on the filters' miss
+ *        rates.
+ */
+class LIPFilterAdaptiveProber {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param lip_filters The LIPFilters that will be probed.
+   * @param The target attribute ids for the LIPFilters.
+   * @param The target attribute types for the LIPFilters.
+   */
+  LIPFilterAdaptiveProber(const std::vector<LIPFilter *> &lip_filters,
+                          const std::vector<attribute_id> &attr_ids,
+                          const std::vector<const Type *> &attr_types) {
+    DCHECK_EQ(lip_filters.size(), attr_ids.size());
+    DCHECK_EQ(lip_filters.size(), attr_types.size());
+
+    probe_entries_.reserve(lip_filters.size());
+    for (std::size_t i = 0; i < lip_filters.size(); ++i) {
+      DCHECK(lip_filters[i] != nullptr);
+      probe_entries_.emplace_back(
+          new ProbeEntry(lip_filters[i], attr_ids[i], attr_types[i]));
+    }
+  }
+
+  /**
+   * @brief Destructor.
+   */
+  ~LIPFilterAdaptiveProber() {
+    for (ProbeEntry *entry : probe_entries_) {
+      delete entry;
+    }
+  }
+
+  /**
+   * @brief Apply this group of LIPFilters to the given ValueAccessor.
+   *
+   * @param accessor A ValueAccessor to be filtered.
+   * @return A TupleIdSequence for the hit tuples in the ValueAccessor.
+   */
+  TupleIdSequence* filterValueAccessor(ValueAccessor *accessor) {
+    const TupleIdSequence *existence_map = accessor->getTupleIdSequenceVirtual();
+    if (existence_map == nullptr) {
+      return filterValueAccessorNoExistenceMap(accessor);
+    } else {
+      return filterValueAccessorWithExistenceMap(accessor, existence_map);
+    }
+  }
+
+ private:
+  /**
+   * @brief Internal data structure for representing each LIPFilter probing entry.
+   */
+  struct ProbeEntry {
+    ProbeEntry(const LIPFilter *lip_filter_in,
+               const attribute_id attr_id_in,
+               const Type *attr_type_in)
+        : lip_filter(lip_filter_in),
+          attr_id(attr_id_in),
+          attr_type(attr_type_in),
+          miss(0),
+          cnt(0) {
+    }
+
+    /**
+     * @brief Whether a LIPFilter is more selective than the other.
+     */
+    static bool isBetterThan(const ProbeEntry *a,
+                             const ProbeEntry *b) {
+      return a->miss_rate > b->miss_rate;
+    }
+
+    const LIPFilter *lip_filter;
+    const attribute_id attr_id;
+    const Type *attr_type;
+    std::uint32_t miss;
+    std::uint32_t cnt;
+    float miss_rate;
+  };
+
+  /**
+   * @brief Sepecialized filterValueAccessor implementation where the given
+   *        ValueAccessor has no existence map.
+   */
+  inline TupleIdSequence* filterValueAccessorNoExistenceMap(ValueAccessor *accessor) {
+    const std::uint32_t num_tuples = accessor->getNumTuplesVirtual();
+    std::unique_ptr<TupleIdSequence> matches(new TupleIdSequence(num_tuples));
+    std::uint32_t next_batch_size = 64u;
+    std::vector<tuple_id> batch(num_tuples);
+
+    // Apply the filters in a batched manner.
+    std::uint32_t batch_start = 0;
+    do {
+      const std::uint32_t batch_size =
+          std::min(next_batch_size, num_tuples - batch_start);
+      for (std::uint32_t i = 0; i < batch_size; ++i) {
+        batch[i] = batch_start + i;
+      }
+
+      const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size);
+      for (std::uint32_t i = 0; i < num_hits; ++i) {
+        matches->set(batch[i], true);
+      }
+
+      batch_start += batch_size;
+      next_batch_size *= 2;
+    } while (batch_start < num_tuples);
+
+    return matches.release();
+  }
+
+  /**
+   * @brief Sepecialized filterValueAccessor implementation where the given
+   *        ValueAccessor has an existence map.
+   */
+  inline TupleIdSequence* filterValueAccessorWithExistenceMap(ValueAccessor *accessor,
+                                                              const TupleIdSequence *existence_map) {
+    std::unique_ptr<TupleIdSequence> matches(
+        new TupleIdSequence(existence_map->length()));
+    std::uint32_t next_batch_size = 64u;
+    std::uint32_t num_tuples_left = existence_map->numTuples();
+    std::vector<tuple_id> batch(num_tuples_left);
+
+    // Apply the filters in a batched manner.
+    TupleIdSequence::const_iterator tuple_it = existence_map->before_begin();
+    do {
+      const std::uint32_t batch_size =
+          next_batch_size < num_tuples_left ? next_batch_size : num_tuples_left;
+      for (std::uint32_t i = 0; i < batch_size; ++i) {
+        ++tuple_it;
+        batch[i] = *tuple_it;
+      }
+
+      const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size);
+      for (std::uint32_t i = 0; i < num_hits; ++i) {
+        matches->set(batch[i], true);
+      }
+
+      num_tuples_left -= batch_size;
+      next_batch_size *= 2;
+    } while (num_tuples_left > 0);
+
+    return matches.release();
+  }
+
+  /**
+   * @brief Filter the given batch of tuples from the ValueAccessor. Remove any
+   *        tuple in the batch that misses any filter.
+   */
+  inline std::size_t filterBatch(ValueAccessor *accessor,
+                                 std::vector<tuple_id> *batch,
+                                 std::uint32_t batch_size) {
+    // Apply the LIPFilters one by one to the batch and update corresponding
+    // cnt/miss statistics.
+    for (auto *entry : probe_entries_) {
+      const std::uint32_t out_size =
+          entry->lip_filter->filterBatch(accessor,
+                                         entry->attr_id,
+                                         entry->attr_type->isNullable(),
+                                         batch,
+                                         batch_size);
+      entry->cnt += batch_size;
+      entry->miss += batch_size - out_size;
+      batch_size = out_size;
+    }
+
+    // Adaptively adjust the application ordering after each batch.
+    adaptEntryOrder();
+
+    return batch_size;
+  }
+
+  /**
+   * @brief Adjust LIPFilter application ordering with regard to their miss
+   *        rates (i.e. selectivites).
+   */
+  inline void adaptEntryOrder() {
+    for (auto &entry : probe_entries_) {
+      entry->miss_rate = static_cast<float>(entry->miss) / entry->cnt;
+    }
+    std::sort(probe_entries_.begin(),
+              probe_entries_.end(),
+              ProbeEntry::isBetterThan);
+  }
+
+  std::vector<ProbeEntry *> probe_entries_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterAdaptiveProber);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/utility/lip_filter/LIPFilterBuilder.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterBuilder.hpp b/utility/lip_filter/LIPFilterBuilder.hpp
new file mode 100644
index 0000000..deb8f66
--- /dev/null
+++ b/utility/lip_filter/LIPFilterBuilder.hpp
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class Type;
+class ValueAccessor;
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+class LIPFilterBuilder;
+typedef std::shared_ptr<LIPFilterBuilder> LIPFilterBuilderPtr;
+
+/**
+ * @brief Helper class for building LIPFilters from a relation (i.e. ValueAccessor).
+ */
+class LIPFilterBuilder {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param lip_filters The LIPFilters that will be built.
+   * @param attr_ids The target attribute ids for the LIPFilters.
+   * @param attr_types The target attribute types for the LIPFilters.
+   */
+  LIPFilterBuilder(const std::vector<LIPFilter *> &lip_filters,
+                   const std::vector<attribute_id> &attr_ids,
+                   const std::vector<const Type *> &attr_types) {
+    DCHECK_EQ(lip_filters.size(), attr_ids.size());
+    DCHECK_EQ(lip_filters.size(), attr_types.size());
+
+    build_entries_.reserve(lip_filters.size());
+    for (std::size_t i = 0; i < lip_filters.size(); ++i) {
+      build_entries_.emplace_back(lip_filters[i], attr_ids[i], attr_types[i]);
+    }
+  }
+
+  /**
+   * @brief Insert all the values from the given ValueAccessor into the attached
+   *        LIPFilters with regard to each target attribute id in \p attr_ids_.
+   *
+   * @param accessor The ValueAccessor which will be used to access the values.
+   */
+  void insertValueAccessor(ValueAccessor *accessor) {
+    for (auto &entry : build_entries_) {
+      entry.lip_filter->insertValueAccessor(accessor,
+                                            entry.attr_id,
+                                            entry.attr_type);
+    }
+  }
+
+ private:
+  /**
+   * @brief Internal data structure for representing each LIPFilter building entry.
+   */
+  struct BuildEntry {
+    BuildEntry(LIPFilter *lip_filter_in,
+               const attribute_id attr_id_in,
+               const Type *attr_type_in)
+        : lip_filter(lip_filter_in),
+          attr_id(attr_id_in),
+          attr_type(attr_type_in) {
+    }
+    LIPFilter *lip_filter;
+    const attribute_id attr_id;
+    const Type *attr_type;
+  };
+
+  std::vector<BuildEntry> build_entries_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterBuilder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/utility/lip_filter/LIPFilterDeployment.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.cpp b/utility/lip_filter/LIPFilterDeployment.cpp
new file mode 100644
index 0000000..cd4d90f
--- /dev/null
+++ b/utility/lip_filter/LIPFilterDeployment.cpp
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
+
+#include <memory>
+#include <vector>
+
+#include "types/TypeFactory.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+LIPFilterDeployment::LIPFilterDeployment(
+    const serialization::LIPFilterDeployment &proto,
+    const std::vector<std::unique_ptr<LIPFilter>> &lip_filters) {
+  switch (proto.action_type()) {
+    case serialization::LIPFilterActionType::BUILD:
+      action_type_ = LIPFilterActionType::kBuild;
+      break;
+    case serialization::LIPFilterActionType::PROBE:
+      action_type_ = LIPFilterActionType::kProbe;
+      break;
+    default:
+      LOG(FATAL) << "Unsupported LIPFilterActionType: "
+                 << serialization::LIPFilterActionType_Name(proto.action_type());
+  }
+
+  for (int i = 0; i < proto.entries_size(); ++i) {
+    const auto &entry_proto = proto.entries(i);
+    lip_filters_.emplace_back(lip_filters.at(entry_proto.lip_filter_id()).get());
+    attr_ids_.emplace_back(entry_proto.attribute_id());
+    attr_types_.emplace_back(&TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+  }
+}
+
+bool LIPFilterDeployment::ProtoIsValid(
+    const serialization::LIPFilterDeployment &proto) {
+  if (proto.action_type() != serialization::LIPFilterActionType::BUILD &&
+      proto.action_type() != serialization::LIPFilterActionType::PROBE) {
+    LOG(FATAL) << "Unsupported LIPFilterActionType: "
+               << serialization::LIPFilterActionType_Name(proto.action_type());
+  }
+  if (proto.entries_size() == 0) {
+    return false;
+  }
+  for (int i = 0; i < proto.entries_size(); ++i) {
+    const auto &entry_proto = proto.entries(i);
+    if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) {
+      return false;
+    }
+  }
+  return true;
+}
+
+LIPFilterBuilder* LIPFilterDeployment::createLIPFilterBuilder() const {
+  DCHECK(action_type_ == LIPFilterActionType::kBuild);
+  return new LIPFilterBuilder(lip_filters_, attr_ids_, attr_types_);
+}
+
+LIPFilterAdaptiveProber* LIPFilterDeployment::createLIPFilterAdaptiveProber() const {
+  DCHECK(action_type_ == LIPFilterActionType::kProbe);
+  return new LIPFilterAdaptiveProber(lip_filters_, attr_ids_, attr_types_);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/utility/lip_filter/LIPFilterDeployment.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.hpp b/utility/lip_filter/LIPFilterDeployment.hpp
new file mode 100644
index 0000000..9b37f88
--- /dev/null
+++ b/utility/lip_filter/LIPFilterDeployment.hpp
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
+
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+namespace serialization { class LIPFilterDeployment; }
+
+class LIPFilter;
+class LIPFilterBuilder;
+class LIPFilterAdaptiveProber;
+class Type;
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+enum class LIPFilterActionType {
+  kBuild = 0,
+  kProbe
+};
+
+/**
+ * @brief Helper class for organizing a group of LIPFilters in the backend.
+ *        Each LIPFilterDeployment object is attached to a RelationalOperator.
+ */
+class LIPFilterDeployment {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param proto The Protocol Buffer representation of a LIPFilterDeployment.
+   * @param lip_filters The LIPFilter objects to be deployed.
+   */
+  LIPFilterDeployment(const serialization::LIPFilterDeployment &proto,
+                      const std::vector<std::unique_ptr<LIPFilter>> &lip_filters);
+  /**
+   * @brief Determine if a serialized protobuf representation of a
+   *        LIPFilterDeployment is fully-formed and valid.
+   *
+   * @param proto A serialized protobuf representation of a LIPFilterDeployment
+   *        to check for validity.
+   * @return Whether proto is fully-formed and valid.
+   **/
+  static bool ProtoIsValid(const serialization::LIPFilterDeployment &proto);
+
+  /**
+   * @brief Get the action type for this group of LIPFilters (i.e. whether
+   *        to build or probe the filters).
+   *
+   * @return The action type.
+   */
+  LIPFilterActionType getActionType() const {
+    return action_type_;
+  }
+
+  /**
+   * @brief Create a LIPFilterBuilder for this group of LIPFilters.
+   *
+   * @return A new LIPFilterBuilder object for this group of LIPFilters.
+   *         Caller should take ownership of the returned object.
+   */
+  LIPFilterBuilder* createLIPFilterBuilder() const;
+
+  /**
+   * @brief Create a LIPFilterAdaptiveProber for this group of LIPFilters.
+   *
+   * @return A new LIPFilterAdaptiveProber object for this group of LIPFilters.
+   *         Caller should take ownership of the returned object.
+   */
+  LIPFilterAdaptiveProber* createLIPFilterAdaptiveProber() const;
+
+ private:
+  LIPFilterActionType action_type_;
+
+  std::vector<LIPFilter *> lip_filters_;
+  std::vector<attribute_id> attr_ids_;
+  std::vector<const Type *> attr_types_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterDeployment);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/utility/lip_filter/LIPFilterFactory.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.cpp b/utility/lip_filter/LIPFilterFactory.cpp
new file mode 100644
index 0000000..ebc4a0e
--- /dev/null
+++ b/utility/lip_filter/LIPFilterFactory.cpp
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilterFactory.hpp"
+
+#include <cstddef>
+#include <cstdint>
+
+#include "utility/lip_filter/LIPFilter.pb.h"
+#include "utility/lip_filter/SingleIdentityHashFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter &proto) {
+  switch (proto.lip_filter_type()) {
+    case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
+      const std::size_t attr_size =
+          proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);
+      const std::size_t filter_cardinality =
+          proto.GetExtension(serialization::SingleIdentityHashFilter::filter_cardinality);
+
+      if (attr_size >= 8) {
+        return new SingleIdentityHashFilter<std::uint64_t>(filter_cardinality);
+      } else if (attr_size >= 4) {
+        return new SingleIdentityHashFilter<std::uint32_t>(filter_cardinality);
+      } else if (attr_size >= 2) {
+        return new SingleIdentityHashFilter<std::uint16_t>(filter_cardinality);
+      } else {
+        return new SingleIdentityHashFilter<std::uint8_t>(filter_cardinality);
+      }
+    }
+    // TODO(jianqiao): handle the BLOOM_FILTER and EXACT_FILTER implementations.
+    default:
+      LOG(FATAL) << "Unsupported LIP filter type: "
+                 << serialization::LIPFilterType_Name(proto.lip_filter_type());
+  }
+  return nullptr;
+}
+
+bool LIPFilterFactory::ProtoIsValid(const serialization::LIPFilter &proto) {
+  switch (proto.lip_filter_type()) {
+    case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
+      const std::size_t attr_size =
+          proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);
+      const std::size_t filter_cardinality =
+          proto.GetExtension(serialization::SingleIdentityHashFilter::filter_cardinality);
+      return (attr_size != 0 && filter_cardinality != 0);
+    }
+    default:
+      LOG(FATAL) << "Unsupported LIP filter type: "
+                 << serialization::LIPFilterType_Name(proto.lip_filter_type());
+  }
+  return false;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/utility/lip_filter/LIPFilterFactory.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.hpp b/utility/lip_filter/LIPFilterFactory.hpp
new file mode 100644
index 0000000..b8301b8
--- /dev/null
+++ b/utility/lip_filter/LIPFilterFactory.hpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_
+
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+namespace serialization { class LIPFilter; }
+
+class LIPFilter;
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+/**
+ * @brief All-static factory object that provides access to various implementations
+ *        of LIPFilters.
+ */
+class LIPFilterFactory {
+ public:
+  /**
+   * @brief Reconstruct a LIPFilter from its serialized Protocol Buffer form.
+   *
+   * @param proto The Protocol Buffer representation of a LIPFilter object,
+   * @return A new LIPFilter reconstructed from the supplied Protocol Buffer.
+   *         Caller should take ownership of the returned object.
+   */
+  static LIPFilter* ReconstructFromProto(const serialization::LIPFilter &proto);
+
+  /**
+   * @brief Check whether a serialization::LIPFilter is fully-formed and
+   *        all parts are valid.
+   *
+   * @param proto A serialized Protocol Buffer representation of a LIPFilter.
+   * @return Whether proto is fully-formed and valid.
+   **/
+  static bool ProtoIsValid(const serialization::LIPFilter &proto);
+
+ private:
+  LIPFilterFactory() {}
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterFactory);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8746eed4/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
new file mode 100644
index 0000000..2823818
--- /dev/null
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+/**
+ * @brief Specialized bloom filter that uses only one hash function. The hash
+ *        function is an "identity" function that it simply reinterpret_casts
+ *        a value's byte stream into the specified CppType as the value's hash
+ *        code.
+ *
+ * @note SingleIdentityHashFilter is most effective when applied to fixed-length
+ *       integer values. It cannot be applied to variable-length values unless
+ *       the corresponding value Type has its minimumByteLength() greater than
+ *       sizeof(CppType).
+ */
+template <typename CppType>
+class SingleIdentityHashFilter : public LIPFilter {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param filter_cardinality The cardinality of this hash filter.
+   */
+  explicit SingleIdentityHashFilter(const std::size_t filter_cardinality)
+      : LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
+        filter_cardinality_(filter_cardinality),
+        bit_array_(GetByteSize(filter_cardinality)) {
+    DCHECK_GE(filter_cardinality, 0u);
+    std::memset(bit_array_.data(),
+                0x0,
+                sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
+  }
+
+  void insertValueAccessor(ValueAccessor *accessor,
+                           const attribute_id attr_id,
+                           const Type *attr_type) override {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      if (attr_type->isNullable()) {
+        this->insertValueAccessorInternal<true>(accessor, attr_id);
+      } else {
+        this->insertValueAccessorInternal<false>(accessor, attr_id);
+      }
+    });
+  }
+
+  std::size_t filterBatch(ValueAccessor *accessor,
+                          const attribute_id attr_id,
+                          const bool is_attr_nullable,
+                          std::vector<tuple_id> *batch,
+                          const std::size_t batch_size) const override {
+    DCHECK(batch != nullptr);
+    DCHECK_LE(batch_size, batch->size());
+
+    return InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> std::size_t {  // NOLINT(build/c++11)
+      if (is_attr_nullable) {
+        return this->filterBatchInternal<true>(accessor, attr_id, batch, batch_size);
+      } else {
+        return this->filterBatchInternal<false>(accessor, attr_id, batch, batch_size);
+      }
+    });
+  }
+
+ private:
+  /**
+   * @brief Round up bit_size to multiples of 8.
+   */
+  inline static std::size_t GetByteSize(const std::size_t bit_size) {
+    return (bit_size + 7) / 8;
+  }
+
+  /**
+   * @brief Iterate through the accessor and hash values into the internal bit
+   *        array.
+   */
+  template <bool is_attr_nullable, typename ValueAccessorT>
+  inline void insertValueAccessorInternal(ValueAccessorT *accessor,
+                                          const attribute_id attr_id) {
+    accessor->beginIteration();
+    while (accessor->next()) {
+      const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+      if (!is_attr_nullable || value != nullptr) {
+        insert(value);
+      }
+    }
+  }
+
+  /**
+   * @brief Filter the given batch of tuples from a ValueAccessor. Write the
+   *        tuple ids which survive in the filtering back to \p batch.
+   */
+  template <bool is_attr_nullable, typename ValueAccessorT>
+  inline std::size_t filterBatchInternal(const ValueAccessorT *accessor,
+                                         const attribute_id attr_id,
+                                         std::vector<tuple_id> *batch,
+                                         const std::size_t batch_size) const {
+    std::size_t out_size = 0;
+    for (std::size_t i = 0; i < batch_size; ++i) {
+      const tuple_id tid = batch->at(i);
+      const void *value =
+          accessor->template getUntypedValueAtAbsolutePosition(attr_id, tid);
+      if (is_attr_nullable && value == nullptr) {
+        continue;
+      }
+      if (contains(value)) {
+        batch->at(out_size) = tid;
+        ++out_size;
+      }
+    }
+    return out_size;
+  }
+
+  /**
+   * @brief Inserts a given value into the hash filter.
+   */
+  inline void insert(const void *key_begin) {
+    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    bit_array_[hash >> 3u].fetch_or(1u << (hash & 7u), std::memory_order_relaxed);
+  }
+
+  /**
+   * @brief Test membership of a given value in the hash filter.
+   *        If true is returned, then a value may or may not be present in the hash filter.
+   *        If false is returned, a value is certainly not present in the hash filter.
+   */
+  inline bool contains(const void *key_begin) const {
+    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    return (bit_array_[hash >> 3u].load(std::memory_order_relaxed) & (1u << (hash & 7u)));
+  }
+
+  std::size_t filter_cardinality_;
+  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+
+  DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_