You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/18 17:21:19 UTC

[11/12] incubator-quickstep git commit: Add LIPFilter feature.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index cbbfc22..a80bcb0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -41,6 +41,7 @@ class AggregateFunction;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
 class InsertDestination;
+class LIPFilterAdaptiveProber;
 class StorageManager;
 
 /** \addtogroup Storage
@@ -156,7 +157,8 @@ class AggregationOperationState {
    * @param input_block The block ID of the storage block where the aggreates
    *        are going to be computed.
    **/
-  void aggregateBlock(const block_id input_block);
+  void aggregateBlock(const block_id input_block,
+                      LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
 
   /**
    * @brief Generate the final results for the aggregates managed by this
@@ -179,8 +181,10 @@ class AggregationOperationState {
       const std::vector<std::unique_ptr<AggregationState>> &local_state);
 
   // Aggregate on input block.
-  void aggregateBlockSingleState(const block_id input_block);
-  void aggregateBlockHashTable(const block_id input_block);
+  void aggregateBlockSingleState(const block_id input_block,
+                                 LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
+  void aggregateBlockHashTable(const block_id input_block,
+                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
 
   void finalizeSingleState(InsertDestination *output_destination);
   void finalizeHashTable(InsertDestination *output_destination);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index f05cc46..e85e005 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -643,7 +643,6 @@ target_link_libraries(quickstep_storage_FastHashTable
                       quickstep_threading_SpinSharedMutex
                       quickstep_types_Type
                       quickstep_types_TypedValue
-                      quickstep_utility_BloomFilter
                       quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_FastHashTableFactory
@@ -659,7 +658,6 @@ target_link_libraries(quickstep_storage_FastHashTableFactory
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
                       quickstep_storage_TupleReference
                       quickstep_types_TypeFactory
-                      quickstep_utility_BloomFilter
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
                       quickstep_storage_FastHashTable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/FastHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp
index 4a95cd9..74d9ee3 100644
--- a/storage/FastHashTable.hpp
+++ b/storage/FastHashTable.hpp
@@ -39,7 +39,6 @@
 #include "threading/SpinSharedMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
-#include "utility/BloomFilter.hpp"
 #include "utility/HashPair.hpp"
 #include "utility/Macros.hpp"
 
@@ -958,62 +957,6 @@ class FastHashTable : public HashTableBase<resizable,
   template <typename FunctorT>
   std::size_t forEachCompositeKeyFast(FunctorT *functor, int index) const;
 
-  /**
-   * @brief A call to this function will cause a bloom filter to be built
-   *        during the build phase of this hash table.
-   **/
-  inline void enableBuildSideBloomFilter() {
-    has_build_side_bloom_filter_ = true;
-  }
-
-  /**
-   * @brief A call to this function will cause a set of bloom filters to be
-   *        probed during the probe phase of this hash table.
-   **/
-  inline void enableProbeSideBloomFilter() {
-    has_probe_side_bloom_filter_ = true;
-  }
-
-  /**
-   * @brief This function sets the pointer to the bloom filter to be
-   *        used during the build phase of this hash table.
-   * @warning Should call enable_build_side_bloom_filter() first to enable
-   *          bloom filter usage during build phase.
-   * @note The ownership of the bloom filter lies with the caller.
-   *
-   * @param bloom_filter The pointer to the bloom filter.
-   **/
-  inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) {
-    build_bloom_filter_ = bloom_filter;
-  }
-
-  /**
-   * @brief This function adds a pointer to the list of bloom filters to be
-   *        used during the probe phase of this hash table.
-   * @warning Should call enable_probe_side_bloom_filter() first to enable
-   *          bloom filter usage during probe phase.
-   * @note The ownership of the bloom filter lies with the caller.
-   *
-   * @param bloom_filter The pointer to the bloom filter.
-   **/
-  inline void addProbeSideBloomFilter(const BloomFilter *bloom_filter) {
-    probe_bloom_filters_.emplace_back(bloom_filter);
-  }
-
-  /**
-   * @brief This function adds a vector of attribute ids corresponding to a
-   *        bloom filter used during the probe phase of this hash table.
-   * @warning Should call enable_probe_side_bloom_filter() first to enable
-   *          bloom filter usage during probe phase.
-   *
-   * @param probe_attribute_ids The vector of attribute ids to use for probing
-   *        the bloom filter.
-   **/
-  inline void addProbeSideAttributeIds(
-      std::vector<attribute_id> &&probe_attribute_ids) {
-    probe_attribute_ids_.push_back(probe_attribute_ids);
-  }
-
  protected:
   /**
    * @brief Constructor for new resizable hash table.
@@ -1318,12 +1261,6 @@ class FastHashTable : public HashTableBase<resizable,
                                    const attribute_id key_attr_id,
                                    FunctorT *functor) const;
 
-  // Data structures used for bloom filter optimized semi-joins.
-  bool has_build_side_bloom_filter_ = false;
-  bool has_probe_side_bloom_filter_ = false;
-  BloomFilter *build_bloom_filter_;
-  std::vector<const BloomFilter *> probe_bloom_filters_;
-  std::vector<std::vector<attribute_id>> probe_attribute_ids_;
   DISALLOW_COPY_AND_ASSIGN(FastHashTable);
 };
 
@@ -1449,13 +1386,6 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
                 total_entries, total_variable_key_size, &prealloc_state);
           }
         }
-        std::unique_ptr<BloomFilter> thread_local_bloom_filter;
-        if (has_build_side_bloom_filter_) {
-          thread_local_bloom_filter.reset(
-              new BloomFilter(build_bloom_filter_->getRandomSeed(),
-                              build_bloom_filter_->getNumberOfHashes(),
-                              build_bloom_filter_->getBitArraySize()));
-        }
         if (resizable) {
           while (result == HashTablePutResult::kOutOfSpace) {
             {
@@ -1474,12 +1404,6 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
                     variable_size,
                     (*functor)(*accessor),
                     using_prealloc ? &prealloc_state : nullptr);
-                // Insert into bloom filter, if enabled.
-                if (has_build_side_bloom_filter_) {
-                  thread_local_bloom_filter->insertUnSafe(
-                      static_cast<const std::uint8_t *>(key.getDataPtr()),
-                      key.getDataSize());
-                }
                 if (result == HashTablePutResult::kDuplicateKey) {
                   DEBUG_ASSERT(!using_prealloc);
                   return result;
@@ -1507,22 +1431,11 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
                                   variable_size,
                                   (*functor)(*accessor),
                                   using_prealloc ? &prealloc_state : nullptr);
-            // Insert into bloom filter, if enabled.
-            if (has_build_side_bloom_filter_) {
-              thread_local_bloom_filter->insertUnSafe(
-                  static_cast<const std::uint8_t *>(key.getDataPtr()),
-                  key.getDataSize());
-            }
             if (result != HashTablePutResult::kOK) {
               return result;
             }
           }
         }
-        // Update the build side bloom filter with thread local copy, if
-        // available.
-        if (has_build_side_bloom_filter_) {
-          build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get());
-        }
 
         return HashTablePutResult::kOK;
       });
@@ -2462,52 +2375,27 @@ void FastHashTable<resizable,
   InvokeOnAnyValueAccessor(
       accessor,
       [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-        while (accessor->next()) {
-          // Probe any bloom filters, if enabled.
-          if (has_probe_side_bloom_filter_) {
-            DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size());
-            // Check if the key is contained in the BloomFilters or not.
-            bool bloom_miss = false;
-            for (std::size_t i = 0;
-                 i < probe_bloom_filters_.size() && !bloom_miss;
-                 ++i) {
-              const BloomFilter *bloom_filter = probe_bloom_filters_[i];
-              for (const attribute_id &attr_id : probe_attribute_ids_[i]) {
-                TypedValue bloom_key = accessor->getTypedValue(attr_id);
-                if (!bloom_filter->contains(static_cast<const std::uint8_t *>(
-                                                bloom_key.getDataPtr()),
-                                            bloom_key.getDataSize())) {
-                  bloom_miss = true;
-                  break;
-                }
-              }
-            }
-            if (bloom_miss) {
-              continue;  // On a bloom filter miss, probing the hash table can
-                         // be skipped.
-            }
-          }
-
-          TypedValue key = accessor->getTypedValue(key_attr_id);
-          if (check_for_null_keys && key.isNull()) {
-            continue;
-          }
-          const std::size_t true_hash = use_scalar_literal_hash_template
-                                            ? key.getHashScalarLiteral()
-                                            : key.getHash();
-          const std::size_t adjusted_hash =
-              adjust_hashes_template ? this->AdjustHash(true_hash) : true_hash;
-          std::size_t entry_num = 0;
-          const std::uint8_t *value;
-          while (this->getNextEntryForKey(
-              key, adjusted_hash, &value, &entry_num)) {
-            (*functor)(*accessor, *value);
-            if (!allow_duplicate_keys) {
-              break;
-            }
-          }
+    while (accessor->next()) {
+      TypedValue key = accessor->getTypedValue(key_attr_id);
+      if (check_for_null_keys && key.isNull()) {
+        continue;
+      }
+      const std::size_t true_hash = use_scalar_literal_hash_template
+                                        ? key.getHashScalarLiteral()
+                                        : key.getHash();
+      const std::size_t adjusted_hash =
+          adjust_hashes_template ? this->AdjustHash(true_hash) : true_hash;
+      std::size_t entry_num = 0;
+      const std::uint8_t *value;
+      while (this->getNextEntryForKey(
+          key, adjusted_hash, &value, &entry_num)) {
+        (*functor)(*accessor, *value);
+        if (!allow_duplicate_keys) {
+          break;
         }
-      });
+      }
+    }
+  });
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/FastHashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTableFactory.hpp b/storage/FastHashTableFactory.hpp
index 6d0b693..682cc2a 100644
--- a/storage/FastHashTableFactory.hpp
+++ b/storage/FastHashTableFactory.hpp
@@ -32,7 +32,6 @@
 #include "storage/SimpleScalarSeparateChainingHashTable.hpp"
 #include "storage/TupleReference.hpp"
 #include "types/TypeFactory.hpp"
-#include "utility/BloomFilter.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -183,14 +182,11 @@ class FastHashTableFactory {
    * @param proto A protobuf description of a resizable HashTable.
    * @param storage_manager The StorageManager to use (a StorageBlob will be
    *        allocated to hold the HashTable's contents).
-   * @param bloom_filters A vector of pointers to bloom filters that may be used
-   *        during hash table construction in build/probe phase.
    * @return A new resizable HashTable with parameters specified by proto.
    **/
   static FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>*
       CreateResizableFromProto(const serialization::HashTable &proto,
-                               StorageManager *storage_manager,
-                               const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
+                               StorageManager *storage_manager) {
     DCHECK(ProtoIsValid(proto))
         << "Attempted to create HashTable from invalid proto description:\n"
         << proto.DebugString();
@@ -204,35 +200,6 @@ class FastHashTableFactory {
                                       key_types,
                                       proto.estimated_num_entries(),
                                       storage_manager);
-
-    // TODO(ssaurabh): These lazy initializations can be moved from here and pushed to the
-    //                 individual implementations of the hash table constructors.
-
-    // Check if there are any build side bloom filter defined on the hash table.
-    if (proto.build_side_bloom_filter_id_size() > 0) {
-      hash_table->enableBuildSideBloomFilter();
-      hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
-    }
-
-    // Check if there are any probe side bloom filters defined on the hash table.
-    if (proto.probe_side_bloom_filters_size() > 0) {
-      hash_table->enableProbeSideBloomFilter();
-      // Add as many probe bloom filters as defined by the proto.
-      for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
-        // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
-        const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
-        hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
-
-        // Add the attribute ids corresponding to this probe bloom filter.
-        std::vector<attribute_id> probe_attribute_ids;
-        for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
-          const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
-          probe_attribute_ids.push_back(probe_attribute_id);
-        }
-        hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
-      }
-    }
-
     return hash_table;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index f2dcb03..786a9bb 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -981,61 +981,6 @@ class HashTable : public HashTableBase<resizable,
   template <typename FunctorT>
   std::size_t forEachCompositeKey(FunctorT *functor) const;
 
-  /**
-   * @brief A call to this function will cause a bloom filter to be built
-   *        during the build phase of this hash table.
-   **/
-  inline void enableBuildSideBloomFilter() {
-    has_build_side_bloom_filter_ = true;
-  }
-
-  /**
-   * @brief A call to this function will cause a set of bloom filters to be
-   *        probed during the probe phase of this hash table.
-   **/
-  inline void enableProbeSideBloomFilter() {
-    has_probe_side_bloom_filter_ = true;
-  }
-
-  /**
-   * @brief This function sets the pointer to the bloom filter to be
-   *        used during the build phase of this hash table.
-   * @warning Should call enable_build_side_bloom_filter() first to enable
-   *          bloom filter usage during build phase.
-   * @note The ownership of the bloom filter lies with the caller.
-   *
-   * @param bloom_filter The pointer to the bloom filter.
-   **/
-  inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) {
-    build_bloom_filter_ = bloom_filter;
-  }
-
-  /**
-   * @brief This function adds a pointer to the list of bloom filters to be
-   *        used during the probe phase of this hash table.
-   * @warning Should call enable_probe_side_bloom_filter() first to enable
-   *          bloom filter usage during probe phase.
-   * @note The ownership of the bloom filter lies with the caller.
-   *
-   * @param bloom_filter The pointer to the bloom filter.
-   **/
-  inline void addProbeSideBloomFilter(const BloomFilter *bloom_filter) {
-    probe_bloom_filters_.emplace_back(bloom_filter);
-  }
-
-  /**
-   * @brief This function adds a vector of attribute ids corresponding to a
-   *        bloom filter used during the probe phase of this hash table.
-   * @warning Should call enable_probe_side_bloom_filter() first to enable
-   *          bloom filter usage during probe phase.
-   *
-   * @param probe_attribute_ids The vector of attribute ids to use for probing
-   *        the bloom filter.
-   **/
-  inline void addProbeSideAttributeIds(std::vector<attribute_id> &&probe_attribute_ids) {
-    probe_attribute_ids_.push_back(probe_attribute_ids);
-  }
-
  protected:
   /**
    * @brief Constructor for new resizable hash table.
@@ -1316,13 +1261,6 @@ class HashTable : public HashTableBase<resizable,
                                    const attribute_id key_attr_id,
                                    FunctorT *functor) const;
 
-  // Data structures used for bloom filter optimized semi-joins.
-  bool has_build_side_bloom_filter_ = false;
-  bool has_probe_side_bloom_filter_ = false;
-  BloomFilter *build_bloom_filter_;
-  std::vector<const BloomFilter*> probe_bloom_filters_;
-  std::vector<std::vector<attribute_id>> probe_attribute_ids_;
-
   DISALLOW_COPY_AND_ASSIGN(HashTable);
 };
 
@@ -1467,12 +1405,6 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                                         &prealloc_state);
       }
     }
-    std::unique_ptr<BloomFilter> thread_local_bloom_filter;
-    if (has_build_side_bloom_filter_) {
-      thread_local_bloom_filter.reset(new BloomFilter(build_bloom_filter_->getRandomSeed(),
-                                                      build_bloom_filter_->getNumberOfHashes(),
-                                                      build_bloom_filter_->getBitArraySize()));
-    }
     if (resizable) {
       while (result == HashTablePutResult::kOutOfSpace) {
         {
@@ -1488,11 +1420,6 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                        variable_size,
                                        (*functor)(*accessor),
                                        using_prealloc ? &prealloc_state : nullptr);
-            // Insert into bloom filter, if enabled.
-            if (has_build_side_bloom_filter_) {
-              thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
-                                                      key.getDataSize());
-            }
             if (result == HashTablePutResult::kDuplicateKey) {
               DEBUG_ASSERT(!using_prealloc);
               return result;
@@ -1518,20 +1445,11 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                    variable_size,
                                    (*functor)(*accessor),
                                    using_prealloc ? &prealloc_state : nullptr);
-        // Insert into bloom filter, if enabled.
-        if (has_build_side_bloom_filter_) {
-          thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
-                                                  key.getDataSize());
-        }
         if (result != HashTablePutResult::kOK) {
           return result;
         }
       }
     }
-    // Update the build side bloom filter with thread local copy, if available.
-    if (has_build_side_bloom_filter_) {
-      build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get());
-    }
 
     return HashTablePutResult::kOK;
   });
@@ -2237,27 +2155,6 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
       accessor,
       [&](auto *accessor) -> void {  // NOLINT(build/c++11)
     while (accessor->next()) {
-      // Probe any bloom filters, if enabled.
-      if (has_probe_side_bloom_filter_) {
-        DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size());
-        // Check if the key is contained in the BloomFilters or not.
-        bool bloom_miss = false;
-        for (std::size_t i = 0; i < probe_bloom_filters_.size() && !bloom_miss; ++i) {
-          const BloomFilter *bloom_filter = probe_bloom_filters_[i];
-          for (const attribute_id &attr_id : probe_attribute_ids_[i]) {
-            TypedValue bloom_key = accessor->getTypedValue(attr_id);
-            if (!bloom_filter->contains(static_cast<const std::uint8_t*>(bloom_key.getDataPtr()),
-                                        bloom_key.getDataSize())) {
-              bloom_miss = true;
-              break;
-            }
-          }
-        }
-        if (bloom_miss) {
-          continue;  // On a bloom filter miss, probing the hash table can be skipped.
-        }
-      }
-
       TypedValue key = accessor->getTypedValue(key_attr_id);
       if (check_for_null_keys && key.isNull()) {
         continue;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index ade30d8..1d4ccb0 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -34,10 +34,4 @@ message HashTable {
   required HashTableImplType hash_table_impl_type = 1;
   repeated Type key_types = 2;
   required uint64 estimated_num_entries = 3;
-  repeated uint32 build_side_bloom_filter_id = 4;
-  message ProbeSideBloomFilter {
-    required uint32 probe_side_bloom_filter_id = 1;
-    repeated uint32 probe_side_attr_ids = 2;
-  }
-  repeated ProbeSideBloomFilter probe_side_bloom_filters = 6;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 40b39de..d690557 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -295,14 +295,11 @@ class HashTableFactory {
    * @param proto A protobuf description of a resizable HashTable.
    * @param storage_manager The StorageManager to use (a StorageBlob will be
    *        allocated to hold the HashTable's contents).
-   * @param bloom_filters A vector of pointers to bloom filters that may be used
-   *        during hash table construction in build/probe phase.
    * @return A new resizable HashTable with parameters specified by proto.
    **/
   static HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>*
       CreateResizableFromProto(const serialization::HashTable &proto,
-                               StorageManager *storage_manager,
-                               const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
+                               StorageManager *storage_manager) {
     DCHECK(ProtoIsValid(proto))
         << "Attempted to create HashTable from invalid proto description:\n"
         << proto.DebugString();
@@ -316,35 +313,6 @@ class HashTableFactory {
                                       key_types,
                                       proto.estimated_num_entries(),
                                       storage_manager);
-
-    // TODO(ssaurabh): These lazy initializations can be moved from here and pushed to the
-    //                 individual implementations of the hash table constructors.
-
-    // Check if there are any build side bloom filter defined on the hash table.
-    if (proto.build_side_bloom_filter_id_size() > 0) {
-      hash_table->enableBuildSideBloomFilter();
-      hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
-    }
-
-    // Check if there are any probe side bloom filters defined on the hash table.
-    if (proto.probe_side_bloom_filters_size() > 0) {
-      hash_table->enableProbeSideBloomFilter();
-      // Add as many probe bloom filters as defined by the proto.
-      for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
-        // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
-        const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
-        hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
-
-        // Add the attribute ids corresponding to this probe bloom filter.
-        std::vector<attribute_id> probe_attribute_ids;
-        for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
-          const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
-          probe_attribute_ids.push_back(probe_attribute_id);
-        }
-        hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
-      }
-    }
-
     return hash_table;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index ec5990f..7c16c34 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -60,6 +60,7 @@
 #include "types/containers/Tuple.hpp"
 #include "types/operations/comparisons/ComparisonUtil.hpp"
 #include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 
 #include "glog/logging.h"
 
@@ -341,20 +342,30 @@ void StorageBlock::sample(const bool is_block_sample,
 
 void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
                           const Predicate *predicate,
-                          InsertDestinationInterface *destination) const {
+                          InsertDestinationInterface *destination,
+                          LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const {
   ColumnVectorsValueAccessor temp_result;
   {
     SubBlocksReference sub_blocks_ref(*tuple_store_,
                                       indices_,
                                       indices_consistent_);
 
+    std::unique_ptr<ValueAccessor> base_accessor(tuple_store_->createValueAccessor());
     std::unique_ptr<TupleIdSequence> matches;
+
+    if (lip_filter_adaptive_prober != nullptr) {
+      matches.reset(lip_filter_adaptive_prober->filterValueAccessor(base_accessor.get()));
+    }
+
+    if (predicate != nullptr) {
+      matches.reset(getMatchesForPredicate(predicate));
+    }
+
     std::unique_ptr<ValueAccessor> accessor;
-    if (predicate == nullptr) {
-      accessor.reset(tuple_store_->createValueAccessor());
+    if (matches == nullptr) {
+      accessor.reset(base_accessor.release());
     } else {
-      matches.reset(getMatchesForPredicate(predicate));
-      accessor.reset(tuple_store_->createValueAccessor(matches.get()));
+      accessor.reset(base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
     }
 
     for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection.begin();
@@ -371,14 +382,24 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
 
 void StorageBlock::selectSimple(const std::vector<attribute_id> &selection,
                                 const Predicate *predicate,
-                                InsertDestinationInterface *destination) const {
-  std::unique_ptr<ValueAccessor> accessor;
+                                InsertDestinationInterface *destination,
+                                LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const {
+  std::unique_ptr<ValueAccessor> base_accessor(tuple_store_->createValueAccessor());
   std::unique_ptr<TupleIdSequence> matches;
-  if (predicate == nullptr) {
-    accessor.reset(tuple_store_->createValueAccessor());
-  } else {
+
+  if (lip_filter_adaptive_prober != nullptr) {
+    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(base_accessor.get()));
+  }
+
+  if (predicate != nullptr) {
     matches.reset(getMatchesForPredicate(predicate));
-    accessor.reset(tuple_store_->createValueAccessor(matches.get()));
+  }
+
+  std::unique_ptr<ValueAccessor> accessor;
+  if (matches == nullptr) {
+    accessor.reset(base_accessor.release());
+  } else {
+    accessor.reset(base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
   }
 
   destination->bulkInsertTuplesWithRemappedAttributes(selection,
@@ -389,37 +410,28 @@ AggregationState* StorageBlock::aggregate(
     const AggregationHandle &handle,
     const std::vector<std::unique_ptr<const Scalar>> &arguments,
     const std::vector<attribute_id> *arguments_as_attributes,
-    const Predicate *predicate,
-    std::unique_ptr<TupleIdSequence> *reuse_matches) const {
-  // If there is a filter predicate that hasn't already been evaluated,
-  // evaluate it now and save the results for other aggregates on this same
-  // block.
-  if (predicate && !*reuse_matches) {
-    reuse_matches->reset(getMatchesForPredicate(predicate));
-  }
-
+    const TupleIdSequence *filter) const {
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   // If all the arguments to this aggregate are plain relation attributes,
   // aggregate directly on a ValueAccessor from this block to avoid a copy.
   if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
     DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
         << "Mismatch between number of arguments and number of attribute_ids";
-    return aggregateHelperValueAccessor(handle, *arguments_as_attributes, reuse_matches->get());
+    return aggregateHelperValueAccessor(handle, *arguments_as_attributes, filter);
   }
   // TODO(shoban): We may want to optimize for ScalarLiteral here.
 #endif
 
   // Call aggregateHelperColumnVector() to materialize each argument as a
   // ColumnVector, then aggregate over those.
-  return aggregateHelperColumnVector(handle, arguments, reuse_matches->get());
+  return aggregateHelperColumnVector(handle, arguments, filter);
 }
 
 void StorageBlock::aggregateGroupBy(
     const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
     const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const Predicate *predicate,
+    const TupleIdSequence *filter,
     AggregationStateHashTableBase *hash_table,
-    std::unique_ptr<TupleIdSequence> *reuse_matches,
     std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
   DCHECK_GT(group_by.size(), 0u)
       << "Called aggregateGroupBy() with zero GROUP BY expressions";
@@ -438,23 +450,7 @@ void StorageBlock::aggregateGroupBy(
   // this aggregate, as well as the GROUP BY expression values.
   ColumnVectorsValueAccessor temp_result;
   {
-    std::unique_ptr<ValueAccessor> accessor;
-    if (predicate) {
-      if (!*reuse_matches) {
-        // If there is a filter predicate that hasn't already been evaluated,
-        // evaluate it now and save the results for other aggregates on this
-        // same block.
-        reuse_matches->reset(getMatchesForPredicate(predicate));
-      }
-
-      // Create a filtered ValueAccessor that only iterates over predicate
-      // matches.
-      accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
-    } else {
-      // Create a ValueAccessor that iterates over all tuples in this block
-      accessor.reset(tuple_store_->createValueAccessor());
-    }
-
+    std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
     attribute_id attr_id = 0;
 
     // First, put GROUP BY keys into 'temp_result'.
@@ -503,9 +499,8 @@ void StorageBlock::aggregateDistinct(
     const std::vector<std::unique_ptr<const Scalar>> &arguments,
     const std::vector<attribute_id> *arguments_as_attributes,
     const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const Predicate *predicate,
+    const TupleIdSequence *filter,
     AggregationStateHashTableBase *distinctify_hash_table,
-    std::unique_ptr<TupleIdSequence> *reuse_matches,
     std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
   DCHECK_GT(arguments.size(), 0u)
       << "Called aggregateDistinct() with zero argument expressions";
@@ -517,22 +512,7 @@ void StorageBlock::aggregateDistinct(
   // this aggregate, as well as the GROUP BY expression values.
   ColumnVectorsValueAccessor temp_result;
   {
-    std::unique_ptr<ValueAccessor> accessor;
-    if (predicate) {
-      if (!*reuse_matches) {
-        // If there is a filter predicate that hasn't already been evaluated,
-        // evaluate it now and save the results for other aggregates on this
-        // same block.
-        reuse_matches->reset(getMatchesForPredicate(predicate));
-      }
-
-      // Create a filtered ValueAccessor that only iterates over predicate
-      // matches.
-      accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
-    } else {
-      // Create a ValueAccessor that iterates over all tuples in this block
-      accessor.reset(tuple_store_->createValueAccessor());
-    }
+    std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
     // If all the arguments to this aggregate are plain relation attributes,
@@ -1246,23 +1226,36 @@ bool StorageBlock::rebuildIndexes(bool short_circuit) {
   return all_indices_consistent_;
 }
 
-TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate) const {
+TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate,
+                                                      const TupleIdSequence *filter) const {
   if (predicate == nullptr) {
-    return tuple_store_->getExistenceMap();
+    TupleIdSequence *matched = tuple_store_->getExistenceMap();
+    if (filter != nullptr) {
+      matched->intersectWith(*filter);
+    }
+    return matched;
   }
 
   std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor());
-  std::unique_ptr<TupleIdSequence> existence_map;
-  if (!tuple_store_->isPacked()) {
-    existence_map.reset(tuple_store_->getExistenceMap());
-  }
   SubBlocksReference sub_blocks_ref(*tuple_store_,
                                     indices_,
                                     indices_consistent_);
-  return predicate->getAllMatches(value_accessor.get(),
-                                  &sub_blocks_ref,
-                                  nullptr,
-                                  existence_map.get());
+
+  if (!tuple_store_->isPacked()) {
+    std::unique_ptr<TupleIdSequence> existence_map(tuple_store_->getExistenceMap());
+    if (filter != nullptr) {
+      existence_map->intersectWith(*filter);
+    }
+    return predicate->getAllMatches(value_accessor.get(),
+                                    &sub_blocks_ref,
+                                    nullptr,
+                                    existence_map.get());
+  } else {
+    return predicate->getAllMatches(value_accessor.get(),
+                                    &sub_blocks_ref,
+                                    nullptr,
+                                    filter);
+  }
 }
 
 std::unordered_map<attribute_id, TypedValue>* StorageBlock::generateUpdatedValues(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index bab5bab..77fb137 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -44,6 +44,7 @@ class AggregationState;
 class CatalogRelationSchema;
 class ColumnVector;
 class InsertDestinationInterface;
+class LIPFilterAdaptiveProber;
 class Predicate;
 class Scalar;
 class StorageBlockLayout;
@@ -312,6 +313,9 @@ class StorageBlock : public StorageBlockBase {
       const std::vector<attribute_id> &attribute_map,
       ValueAccessor *accessor);
 
+  TupleIdSequence* getMatchesForPredicate(const Predicate *predicate,
+                                          const TupleIdSequence *filter = nullptr) const;
+
   /**
    * @brief Perform a random sampling of data on  the StorageBlock. The number
    *       of records sampled is determined by the sample percentage in case of
@@ -349,7 +353,8 @@ class StorageBlock : public StorageBlockBase {
    **/
   void select(const std::vector<std::unique_ptr<const Scalar>> &selection,
               const Predicate *predicate,
-              InsertDestinationInterface *destination) const;
+              InsertDestinationInterface *destination,
+              LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const;
 
   /**
    * @brief Perform a simple SELECT query on this StorageBlock which only
@@ -372,7 +377,8 @@ class StorageBlock : public StorageBlockBase {
    **/
   void selectSimple(const std::vector<attribute_id> &selection,
                     const Predicate *predicate,
-                    InsertDestinationInterface *destination) const;
+                    InsertDestinationInterface *destination,
+                    LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const;
 
   /**
    * @brief Perform non GROUP BY aggregation on the tuples in the this storage
@@ -412,8 +418,7 @@ class StorageBlock : public StorageBlockBase {
       const AggregationHandle &handle,
       const std::vector<std::unique_ptr<const Scalar>> &arguments,
       const std::vector<attribute_id> *arguments_as_attributes,
-      const Predicate *predicate,
-      std::unique_ptr<TupleIdSequence> *reuse_matches) const;
+      const TupleIdSequence *filter) const;
 
   /**
    * @brief Perform GROUP BY aggregation on the tuples in the this storage
@@ -461,9 +466,8 @@ class StorageBlock : public StorageBlockBase {
   void aggregateGroupBy(
       const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
       const std::vector<std::unique_ptr<const Scalar>> &group_by,
-      const Predicate *predicate,
+      const TupleIdSequence *filter,
       AggregationStateHashTableBase *hash_table,
-      std::unique_ptr<TupleIdSequence> *reuse_matches,
       std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
 
   /**
@@ -505,9 +509,8 @@ class StorageBlock : public StorageBlockBase {
                          const std::vector<std::unique_ptr<const Scalar>> &arguments,
                          const std::vector<attribute_id> *arguments_as_attributes,
                          const std::vector<std::unique_ptr<const Scalar>> &group_by,
-                         const Predicate *predicate,
+                         const TupleIdSequence *filter,
                          AggregationStateHashTableBase *distinctify_hash_table,
-                         std::unique_ptr<TupleIdSequence> *reuse_matches,
                          std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
 
   /**
@@ -627,8 +630,6 @@ class StorageBlock : public StorageBlockBase {
   // StorageBlock's header.
   bool rebuildIndexes(bool short_circuit);
 
-  TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
-
   std::unordered_map<attribute_id, TypedValue>* generateUpdatedValues(
       const ValueAccessor &accessor,
       const tuple_id tuple,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/DAG.hpp
----------------------------------------------------------------------
diff --git a/utility/DAG.hpp b/utility/DAG.hpp
index a1f2619..b35f2b5 100644
--- a/utility/DAG.hpp
+++ b/utility/DAG.hpp
@@ -293,8 +293,10 @@ class DAG {
      *                      node at node_index.
      **/
      inline void addDependent(const size_type_nodes node_index, const LinkMetadataT &link_metadata) {
-       DCHECK(dependents_with_metadata_.find(node_index) == dependents_with_metadata_.end());
-       dependents_with_metadata_.emplace(node_index, link_metadata);
+//       DCHECK(dependents_with_metadata_.find(node_index) == dependents_with_metadata_.end());
+//       dependents_with_metadata_.emplace(node_index, link_metadata);
+       // TODO(jianqiao): implement upsert
+       dependents_with_metadata_[node_index] = link_metadata;
      }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index 2232abe..d78f5cd 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -15,5 +15,44 @@
 # specific language governing permissions and limitations
 # under the License.
 
+QS_PROTOBUF_GENERATE_CPP(utility_lipfilter_LIPFilter_proto_srcs
+                         utility_lipfilter_LIPFilter_proto_hdrs
+                         LIPFilter.proto)
+
 # Declare micro-libs:
-add_library(quickstep_utility_lipfilter_LIPFilter ../../empty_src.cpp LIPFilter.hpp)
\ No newline at end of file
+add_library(quickstep_utility_lipfilter_LIPFilter LIPFilter.cpp LIPFilter.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.cpp LIPFilterAdaptiveProber.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterDeployment LIPFilterDeployment.cpp LIPFilterDeployment.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterFactory LIPFilterFactory.cpp LIPFilterFactory.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilter_proto
+            ${utility_lipfilter_LIPFilter_proto_srcs})
+add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterBuilder
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeployment
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_types_TypeFactory
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_lipfilter_LIPFilter_proto)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory
+                      quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_lipfilter_LIPFilter_proto
+                      quickstep_utility_lipfilter_SingleIdentityHashFilter
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto
+                      ${PROTOBUF_LIBRARY}
+                      quickstep_types_Type_proto)
+target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
+                      quickstep_storage_StorageConstants
+                      quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_Macros)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilter.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.cpp b/utility/lip_filter/LIPFilter.cpp
new file mode 100644
index 0000000..92bfab1
--- /dev/null
+++ b/utility/lip_filter/LIPFilter.cpp
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilter.hpp"
+
+namespace quickstep {
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.hpp b/utility/lip_filter/LIPFilter.hpp
index 33165ed..0df3c18 100644
--- a/utility/lip_filter/LIPFilter.hpp
+++ b/utility/lip_filter/LIPFilter.hpp
@@ -20,8 +20,20 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
 
+#include <cstddef>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
 namespace quickstep {
 
+class Type;
+class ValueAccessor;
+
 /** \addtogroup Utility
  *  @{
  */
@@ -32,6 +44,35 @@ enum class LIPFilterType {
   kSingleIdentityHashFilter
 };
 
+class LIPFilter {
+ public:
+  LIPFilterType getType() const {
+    return type_;
+  }
+
+  virtual void insertValueAccessor(ValueAccessor *accessor,
+                                   const attribute_id attr_id,
+                                   const Type *attr_type) = 0;
+
+  virtual std::size_t filterBatch(ValueAccessor *accessor,
+                                  const attribute_id attr_id,
+                                  const bool is_attr_nullable,
+                                  std::vector<tuple_id> *batch,
+                                  const std::size_t batch_size) const = 0;
+
+  virtual std::size_t onesCount() const = 0;
+
+ protected:
+  LIPFilter(const LIPFilterType &type)
+      : type_(type) {
+  }
+
+ private:
+  LIPFilterType type_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilter);
+};
+
 /** @} */
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilter.proto
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.proto b/utility/lip_filter/LIPFilter.proto
new file mode 100644
index 0000000..def13dd
--- /dev/null
+++ b/utility/lip_filter/LIPFilter.proto
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+import "types/Type.proto";
+
+enum LIPFilterType {
+  BLOOM_FILTER = 1;
+  EXACT_FILTER = 2;
+  SINGLE_IDENTITY_HASH_FILTER = 3;
+}
+
+message LIPFilter {
+  required LIPFilterType lip_filter_type = 1;
+
+  extensions 16 to max;
+}
+
+message SingleIdentityHashFilter {
+  extend LIPFilter {
+    // All required
+    optional uint64 filter_cardinality = 16;
+    optional uint64 attribute_size = 17;
+  }
+}
+
+enum LIPFilterActionType {
+  BUILD = 1;
+  PROBE = 2;
+}
+
+message LIPFilterDeployment {
+  message Entry {
+    required uint32 lip_filter_id = 1;
+    required int32 attribute_id = 2;
+    required Type attribute_type = 3;
+  }
+
+  required LIPFilterActionType action_type = 1;
+  repeated Entry entries = 2;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterAdaptiveProber.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterAdaptiveProber.hpp b/utility/lip_filter/LIPFilterAdaptiveProber.hpp
new file mode 100644
index 0000000..af42446
--- /dev/null
+++ b/utility/lip_filter/LIPFilterAdaptiveProber.hpp
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/SingleIdentityHashFilter.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+class LIPFilterAdaptiveProber {
+ public:
+  LIPFilterAdaptiveProber(const std::vector<LIPFilter *> &lip_filters,
+                          const std::vector<attribute_id> &attr_ids,
+                          const std::vector<const Type *> &attr_types) {
+    DCHECK_EQ(lip_filters.size(), attr_ids.size());
+    DCHECK_EQ(lip_filters.size(), attr_types.size());
+
+    probe_entries_.reserve(lip_filters.size());
+    for (std::size_t i = 0; i < lip_filters.size(); ++i) {
+      probe_entries_.emplace_back(
+          new ProbeEntry(lip_filters[i], attr_ids[i], attr_types[i]));
+    }
+  }
+
+  ~LIPFilterAdaptiveProber() {
+    for (ProbeEntry *entry : probe_entries_) {
+      delete entry;
+    }
+  }
+
+  TupleIdSequence* filterValueAccessor(ValueAccessor *accessor) {
+    const TupleIdSequence *existence_map = accessor->getTupleIdSequenceVirtual();
+    if (existence_map == nullptr) {
+      return filterValueAccessorNoExistenceMap(accessor);
+    } else {
+      return filterValueAccessorWithExistenceMap(accessor, existence_map);
+    }
+  }
+
+ private:
+  struct ProbeEntry {
+    ProbeEntry(const LIPFilter *lip_filter_in,
+               const attribute_id attr_id_in,
+               const Type *attr_type_in)
+        : lip_filter(lip_filter_in),
+          attr_id(attr_id_in),
+          attr_type(attr_type_in),
+          miss(0),
+          cnt(0) {
+    }
+    static bool isBetterThan(const ProbeEntry *a,
+                             const ProbeEntry *b) {
+      return a->miss_rate > b->miss_rate;
+    }
+    const LIPFilter *lip_filter;
+    const attribute_id attr_id;
+    const Type *attr_type;
+    std::uint32_t miss;
+    std::uint32_t cnt;
+    float miss_rate;
+  };
+
+
+  inline TupleIdSequence* filterValueAccessorNoExistenceMap(ValueAccessor *accessor) {
+    const std::uint32_t num_tuples = accessor->getNumTuplesVirtual();
+    std::unique_ptr<TupleIdSequence> matches(new TupleIdSequence(num_tuples));
+    std::uint32_t next_batch_size = 64u;
+    std::vector<tuple_id> batch(num_tuples);
+
+    std::uint32_t batch_start = 0;
+    do {
+      const std::uint32_t batch_size =
+          std::min(next_batch_size, num_tuples - batch_start);
+      for (std::uint32_t i = 0; i < batch_size; ++i) {
+        batch[i] = batch_start + i;
+      }
+
+      const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size);
+      for (std::uint32_t i = 0; i < num_hits; ++i) {
+        matches->set(batch[i], true);
+      }
+
+      batch_start += batch_size;
+      next_batch_size *= 2;
+    } while (batch_start < num_tuples);
+
+    return matches.release();
+  }
+
+  inline TupleIdSequence* filterValueAccessorWithExistenceMap(ValueAccessor *accessor,
+                                                              const TupleIdSequence *existence_map) {
+    std::unique_ptr<TupleIdSequence> matches(
+        new TupleIdSequence(existence_map->length()));
+    std::uint32_t next_batch_size = 64u;
+    std::uint32_t num_tuples_left = existence_map->numTuples();
+    std::vector<tuple_id> batch(num_tuples_left);
+
+    TupleIdSequence::const_iterator tuple_it = existence_map->before_begin();
+    do {
+      const std::uint32_t batch_size =
+          next_batch_size < num_tuples_left ? next_batch_size : num_tuples_left;
+      for (std::uint32_t i = 0; i < batch_size; ++i) {
+        ++tuple_it;
+        batch[i] = *tuple_it;
+      }
+
+      const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size);
+      for (std::uint32_t i = 0; i < num_hits; ++i) {
+        matches->set(batch[i], true);
+      }
+
+      num_tuples_left -= batch_size;
+      next_batch_size *= 2;
+    } while (num_tuples_left > 0);
+
+    return matches.release();
+  }
+
+  inline std::size_t filterBatch(ValueAccessor *accessor,
+                                 std::vector<tuple_id> *batch,
+                                 std::uint32_t batch_size) {
+    for (auto *entry : probe_entries_) {
+      const std::uint32_t out_size =
+          entry->lip_filter->filterBatch(accessor,
+                                         entry->attr_id,
+                                         entry->attr_type->isNullable(),
+                                         batch,
+                                         batch_size);
+      entry->cnt += batch_size;
+      entry->miss += batch_size - out_size;
+      batch_size = out_size;
+    }
+    adaptEntryOrder();
+    return batch_size;
+  }
+
+  inline void adaptEntryOrder() {
+    for (auto &entry : probe_entries_) {
+      entry->miss_rate = static_cast<float>(entry->miss) / entry->cnt;
+    }
+    std::sort(probe_entries_.begin(),
+              probe_entries_.end(),
+              ProbeEntry::isBetterThan);
+  }
+
+  std::vector<ProbeEntry *> probe_entries_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterAdaptiveProber);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterBuilder.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterBuilder.hpp b/utility/lip_filter/LIPFilterBuilder.hpp
new file mode 100644
index 0000000..0a2d465
--- /dev/null
+++ b/utility/lip_filter/LIPFilterBuilder.hpp
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_
+
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class ValueAccessor;
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+class LIPFilterBuilder;
+typedef std::shared_ptr<LIPFilterBuilder> LIPFilterBuilderPtr;
+
+class LIPFilterBuilder {
+ public:
+  LIPFilterBuilder(const std::vector<LIPFilter *> &lip_filters,
+                   const std::vector<attribute_id> &attr_ids,
+                   const std::vector<const Type *> &attr_types) {
+    DCHECK_EQ(lip_filters.size(), attr_ids.size());
+    DCHECK_EQ(lip_filters.size(), attr_types.size());
+
+    build_entries_.reserve(lip_filters.size());
+    for (std::size_t i = 0; i < lip_filters.size(); ++i) {
+      build_entries_.emplace_back(lip_filters[i], attr_ids[i], attr_types[i]);
+    }
+  }
+
+  void insertValueAccessor(ValueAccessor *accessor) {
+    for (auto &entry : build_entries_) {
+      entry.lip_filter->insertValueAccessor(accessor, entry.attr_id, entry.attr_type);
+    }
+  }
+
+ private:
+  struct BuildEntry {
+    BuildEntry(LIPFilter *lip_filter_in,
+               const attribute_id attr_id_in,
+               const Type *attr_type_in)
+        : lip_filter(lip_filter_in),
+          attr_id(attr_id_in),
+          attr_type(attr_type_in) {
+    }
+    LIPFilter *lip_filter;
+    const attribute_id attr_id;
+    const Type *attr_type;
+  };
+
+  std::vector<BuildEntry> build_entries_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterBuilder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterDeployment.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.cpp b/utility/lip_filter/LIPFilterDeployment.cpp
new file mode 100644
index 0000000..0ac396b
--- /dev/null
+++ b/utility/lip_filter/LIPFilterDeployment.cpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
+
+#include "types/TypeFactory.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+LIPFilterDeployment::LIPFilterDeployment(
+    const serialization::LIPFilterDeployment &proto,
+    const std::vector<std::unique_ptr<LIPFilter>> &lip_filters) {
+  switch (proto.action_type()) {
+    case serialization::LIPFilterActionType::BUILD:
+      action_type_ = LIPFilterActionType::kBuild;
+      break;
+    case serialization::LIPFilterActionType::PROBE:
+      action_type_ = LIPFilterActionType::kProbe;
+      break;
+    default:
+      LOG(FATAL) << "Unsupported LIPFilterActionType: "
+                 << serialization::LIPFilterActionType_Name(proto.action_type());
+  }
+
+  for (int i = 0; i < proto.entries_size(); ++i) {
+    const auto &entry_proto = proto.entries(i);
+    lip_filters_.emplace_back(lip_filters.at(entry_proto.lip_filter_id()).get());
+    attr_ids_.emplace_back(entry_proto.attribute_id());
+    attr_types_.emplace_back(&TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+  }
+}
+
+bool LIPFilterDeployment::ProtoIsValid(
+    const serialization::LIPFilterDeployment &proto) {
+  return true;
+}
+
+LIPFilterBuilder* LIPFilterDeployment::createLIPFilterBuilder() const {
+  DCHECK(action_type_ == LIPFilterActionType::kBuild);
+  return new LIPFilterBuilder(lip_filters_, attr_ids_, attr_types_);
+}
+
+LIPFilterAdaptiveProber* LIPFilterDeployment::createLIPFilterAdaptiveProber() const {
+  DCHECK(action_type_ == LIPFilterActionType::kProbe);
+  return new LIPFilterAdaptiveProber(lip_filters_, attr_ids_, attr_types_);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterDeployment.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.hpp b/utility/lip_filter/LIPFilterDeployment.hpp
new file mode 100644
index 0000000..d939e85
--- /dev/null
+++ b/utility/lip_filter/LIPFilterDeployment.hpp
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
+
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+
+namespace quickstep {
+
+class LIPFilterBuilder;
+class LIPFilterAdaptiveProber;
+class Type;
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+enum class LIPFilterActionType {
+  kBuild = 0,
+  kProbe
+};
+
+class LIPFilterDeployment {
+ public:
+  LIPFilterDeployment(const serialization::LIPFilterDeployment &proto,
+                      const std::vector<std::unique_ptr<LIPFilter>> &lip_filters);
+
+  static bool ProtoIsValid(const serialization::LIPFilterDeployment &proto);
+
+  LIPFilterActionType getActionType() const {
+    return action_type_;
+  }
+
+  LIPFilterBuilder* createLIPFilterBuilder() const;
+
+  LIPFilterAdaptiveProber* createLIPFilterAdaptiveProber() const;
+
+ private:
+  LIPFilterActionType action_type_;
+  std::vector<LIPFilter *> lip_filters_;
+  std::vector<attribute_id> attr_ids_;
+  std::vector<const Type *> attr_types_;
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterFactory.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.cpp b/utility/lip_filter/LIPFilterFactory.cpp
new file mode 100644
index 0000000..f0e7725
--- /dev/null
+++ b/utility/lip_filter/LIPFilterFactory.cpp
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilterFactory.hpp"
+
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/SingleIdentityHashFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter &proto) {
+  switch (proto.lip_filter_type()) {
+    case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
+      const std::size_t attr_size =
+          proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);
+      const std::size_t filter_cardinality =
+          proto.GetExtension(serialization::SingleIdentityHashFilter::filter_cardinality);
+
+      if (attr_size >= 8) {
+        return new SingleIdentityHashFilter<std::uint64_t>(filter_cardinality);
+      } else if (attr_size >= 4) {
+        return new SingleIdentityHashFilter<std::uint32_t>(filter_cardinality);
+      } else if (attr_size >= 2) {
+        return new SingleIdentityHashFilter<std::uint16_t>(filter_cardinality);
+      } else {
+        return new SingleIdentityHashFilter<std::uint8_t>(filter_cardinality);
+      }
+    }
+    default:
+      LOG(FATAL) << "Unsupported LIP filter type: "
+                 << serialization::LIPFilterType_Name(proto.lip_filter_type());
+  }
+}
+
+bool LIPFilterFactory::ProtoIsValid(const serialization::LIPFilter &proto) {
+  return true;
+}
+
+}  // namespace quickstep
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterFactory.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.hpp b/utility/lip_filter/LIPFilterFactory.hpp
new file mode 100644
index 0000000..6a94ae4
--- /dev/null
+++ b/utility/lip_filter/LIPFilterFactory.hpp
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_
+
+#include <vector>
+
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+
+namespace quickstep {
+
+class LIPFilter;
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+class LIPFilterFactory {
+ public:
+  static LIPFilter* ReconstructFromProto(const serialization::LIPFilter &proto);
+
+  static bool ProtoIsValid(const serialization::LIPFilter &proto);
+
+ private:
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterFactory);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
new file mode 100644
index 0000000..0258c24
--- /dev/null
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
+
+#include <vector>
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <utility>
+#include <vector>
+
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/BitManipulation.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+template <typename CppType>
+class SingleIdentityHashFilter : public LIPFilter {
+ public:
+  SingleIdentityHashFilter(const std::size_t filter_cardinality)
+      : LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
+        filter_cardinality_(filter_cardinality),
+        bit_array_(GetByteSize(filter_cardinality)) {
+    std::memset(bit_array_.data(),
+                0x0,
+                sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
+  }
+
+  void insertValueAccessor(ValueAccessor *accessor,
+                           const attribute_id attr_id,
+                           const Type *attr_type) override {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      if (attr_type->isNullable()) {
+        insertValueAccessorInternal<true>(accessor, attr_id);
+      } else {
+        insertValueAccessorInternal<false>(accessor, attr_id);
+      }
+    });
+  }
+
+  std::size_t filterBatch(ValueAccessor *accessor,
+                          const attribute_id attr_id,
+                          const bool is_attr_nullable,
+                          std::vector<tuple_id> *batch,
+                          const std::size_t batch_size) const override {
+    return InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> std::size_t {  // NOLINT(build/c++11)
+      if (is_attr_nullable) {
+        return filterBatchInternal<true>(accessor, attr_id, batch, batch_size);
+      } else {
+        return filterBatchInternal<false>(accessor, attr_id, batch, batch_size);
+      }
+    });
+  }
+
+  std::size_t onesCount() const override {
+    std::size_t count = 0;
+    for (std::size_t i = 0; i < bit_array_.size(); ++i) {
+      count += population_count<std::uint8_t>(bit_array_[i].load(std::memory_order_relaxed));
+    }
+    return count;
+  }
+
+  /**
+   * @brief Inserts a given value into the hash filter.
+   *
+   * @param key_begin A pointer to the value being inserted.
+   */
+  inline void insert(const void *key_begin) {
+    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    bit_array_[hash >> 3].fetch_or(1 << (hash & 0x7), std::memory_order_relaxed);
+  }
+
+  /**
+   * @brief Test membership of a given value in the hash filter.
+   *        If true is returned, then a value may or may not be present in the hash filter.
+   *        If false is returned, a value is certainly not present in the hash filter.
+   *
+   * @param key_begin A pointer to the value being tested for membership.
+   */
+  inline bool contains(const void *key_begin) const {
+    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    return ((bit_array_[hash >> 3].load(std::memory_order_relaxed) & (1 << (hash & 0x7))) > 0);
+  }
+
+ private:
+  inline static std::size_t GetByteSize(const std::size_t bit_size) {
+    return (bit_size + 7) / 8;
+  }
+
+  template <bool is_attr_nullable, typename ValueAccessorT>
+  inline void insertValueAccessorInternal(ValueAccessorT *accessor,
+                                          const attribute_id attr_id) {
+    accessor->beginIteration();
+    while (accessor->next()) {
+      const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+      if (!is_attr_nullable || value != nullptr) {
+        insert(value);
+      }
+    }
+  }
+
+  template <bool is_attr_nullable, typename ValueAccessorT>
+  inline std::size_t filterBatchInternal(const ValueAccessorT *accessor,
+                                         const attribute_id attr_id,
+                                         std::vector<tuple_id> *batch,
+                                         const std::size_t batch_size) const {
+    std::size_t out_size = 0;
+    for (std::size_t i = 0; i < batch_size; ++i) {
+      const tuple_id tid = batch->at(i);
+      const void *value =
+          accessor->template getUntypedValueAtAbsolutePosition(attr_id, tid);
+      if (is_attr_nullable && value == nullptr) {
+        continue;
+      }
+      if (contains(value)) {
+        batch->at(out_size) = tid;
+        ++out_size;
+      }
+    }
+    return out_size;
+  }
+
+  std::size_t filter_cardinality_;
+  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+
+  DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_