You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/07/26 19:37:54 UTC

[2/2] incubator-quickstep git commit: AttachBloomFilters

AttachBloomFilters


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/319d557b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/319d557b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/319d557b

Branch: refs/heads/adaptive-bloom-filters
Commit: 319d557bfab925dd0f2ca3ed6a90152ef17fcc23
Parents: 34e8935
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Tue Jul 26 15:37:47 2016 -0400
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Tue Jul 26 15:37:47 2016 -0400

----------------------------------------------------------------------
 relational_operators/HashJoinOperator.cpp |  23 +-
 storage/BloomFilterIndexSubBlock.cpp      |   4 +-
 storage/BloomFilterIndexSubBlock.hpp      |   6 -
 storage/HashTable.hpp                     | 128 +++++--
 utility/BloomFilter.hpp                   | 502 +++++++++++++++++++------
 utility/BloomFilter.proto                 |   6 +-
 utility/BloomFilterAdapter.hpp            | 120 +++---
 7 files changed, 561 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/319d557b/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index b4de108..16c0d82 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -48,7 +48,6 @@
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
-#include "utility/EventProfiler.hpp"
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -60,6 +59,11 @@ using std::vector;
 
 namespace quickstep {
 
+DEFINE_int64(bloom_adapter_batch_size, 64,
+             "Number of tuples to probe in bulk in Bloom filter adapter.");
+DEFINE_bool(adapt_bloom_filters, true,
+            "Whether to adaptively adjust the ordering of bloom filters.");
+
 namespace {
 
 // Functor passed to HashTable::getAllFromValueAccessor() to collect matching
@@ -76,6 +80,11 @@ class MapBasedJoinedTupleCollector {
     joined_tuples_[tref.block].emplace_back(tref.tuple, accessor.getCurrentPosition());
   }
 
+  inline void operator()(const tuple_id probe_tid,
+                         const TupleReference &build_tref) {
+    joined_tuples_[build_tref.block].emplace_back(build_tref.tuple, probe_tid);
+  }
+
   // Get a mutable pointer to the collected map of joined tuple ID pairs. The
   // key is inner block_id, values are vectors of joined tuple ID pairs with
   // tuple ID from the inner block on the left and the outer block on the
@@ -204,8 +213,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
                                      selection,
                                      hash_table,
                                      output_destination,
-                                     storage_manager,
-                                     op_index_),
+                                     storage_manager),
               op_index_);
         }
         started_ = true;
@@ -225,8 +233,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
                 selection,
                 hash_table,
                 output_destination,
-                storage_manager,
-                op_index_),
+                storage_manager),
             op_index_);
         ++num_workorders_generated_;
       }  // end while
@@ -423,8 +430,6 @@ void HashInnerJoinWorkOrder::execute() {
   BlockReference probe_block(
       storage_manager_->getBlock(block_id_, probe_relation_));
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-  auto *container = simple_profiler.getContainer();
-  container->setContext(getOperatorIndex());
 
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
   MapBasedJoinedTupleCollector collector;
@@ -445,7 +450,6 @@ void HashInnerJoinWorkOrder::execute() {
   const relation_id build_relation_id = build_relation_.getID();
   const relation_id probe_relation_id = probe_relation_.getID();
 
-  auto *output_line = container->getEventLine(getOperatorIndex());
   for (std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
            &build_block_entry : *collector.getJoinedTuples()) {
     BlockReference build_block =
@@ -507,9 +511,6 @@ void HashInnerJoinWorkOrder::execute() {
                                                                   probe_accessor.get(),
                                                                   build_block_entry.second));
     }
-    output_line->emplace_back();
-    output_line->back().endEvent();
-    output_line->back().setPayload(temp_result.getNumTuples());
 
     // NOTE(chasseur): calling the bulk-insert method of InsertDestination once
     // for each pair of joined blocks incurs some extra overhead that could be

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/319d557b/storage/BloomFilterIndexSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/BloomFilterIndexSubBlock.cpp b/storage/BloomFilterIndexSubBlock.cpp
index e806217..a40f69f 100644
--- a/storage/BloomFilterIndexSubBlock.cpp
+++ b/storage/BloomFilterIndexSubBlock.cpp
@@ -55,7 +55,6 @@ BloomFilterIndexSubBlock::BloomFilterIndexSubBlock(const TupleStorageSubBlock &t
                     sub_block_memory_size),
       is_initialized_(false),
       is_consistent_(false),
-      random_seed_(kBloomFilterSeed),
       bit_array_size_in_bytes_(description.GetExtension(
                                    BloomFilterIndexSubBlockDescription::bloom_filter_size)) {
   CHECK(DescriptionIsValid(relation_, description_))
@@ -74,8 +73,7 @@ BloomFilterIndexSubBlock::BloomFilterIndexSubBlock(const TupleStorageSubBlock &t
   const std::uint32_t salt_count = description.GetExtension(BloomFilterIndexSubBlockDescription::number_of_hashes);
 
   // Initialize the bloom_filter_ data structure to operate on bit_array.
-  bloom_filter_.reset(new BloomFilter(random_seed_,
-                                      salt_count,
+  bloom_filter_.reset(new BloomFilter(salt_count,
                                       bit_array_size_in_bytes_,
                                       bit_array_.get(),
                                       is_bloom_filter_initialized));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/319d557b/storage/BloomFilterIndexSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/BloomFilterIndexSubBlock.hpp b/storage/BloomFilterIndexSubBlock.hpp
index 4925673..8c81156 100644
--- a/storage/BloomFilterIndexSubBlock.hpp
+++ b/storage/BloomFilterIndexSubBlock.hpp
@@ -65,11 +65,6 @@ class BloomFilterIndexSubBlock : public IndexSubBlock {
     kSelectivityNone
   };
 
-  /**
-   * @brief A random seed to initialize the bloom filter hash functions.
-   **/
-  static const std::uint64_t kBloomFilterSeed = 0xA5A5A5A55A5A5A5AULL;
-
   BloomFilterIndexSubBlock(const TupleStorageSubBlock &tuple_store,
                            const IndexSubBlockDescription &description,
                            const bool new_block,
@@ -179,7 +174,6 @@ class BloomFilterIndexSubBlock : public IndexSubBlock {
  private:
   bool is_initialized_;
   bool is_consistent_;
-  const std::uint64_t random_seed_;
   const std::uint64_t bit_array_size_in_bytes_;
   std::vector<attribute_id> indexed_attribute_ids_;
   std::unique_ptr<unsigned char> bit_array_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/319d557b/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index 2887854..13cd83c 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -41,12 +41,14 @@
 #include "types/TypedValue.hpp"
 #include "utility/BloomFilter.hpp"
 #include "utility/BloomFilterAdapter.hpp"
-#include "utility/EventProfiler.hpp"
 #include "utility/HashPair.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
+DECLARE_int64(bloom_adapter_batch_size);
+DECLARE_bool(adapt_bloom_filters);
+
 /** \addtogroup Storage
  *  @{
  */
@@ -1482,8 +1484,7 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
     }
     std::unique_ptr<BloomFilter> thread_local_bloom_filter;
     if (has_build_side_bloom_filter_) {
-      thread_local_bloom_filter.reset(new BloomFilter(build_bloom_filter_->getRandomSeed(),
-                                                      build_bloom_filter_->getNumberOfHashes(),
+      thread_local_bloom_filter.reset(new BloomFilter(build_bloom_filter_->getNumberOfHashes(),
                                                       build_bloom_filter_->getBitArraySize()));
     }
     if (resizable) {
@@ -2232,6 +2233,7 @@ inline std::size_t HashTable<ValueT, resizable, serializable, force_key_copy, al
   }
 }
 
+
 template <typename ValueT,
           bool resizable,
           bool serializable,
@@ -2249,47 +2251,97 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
   InvokeOnAnyValueAccessor(
       accessor,
       [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-    auto *container = simple_profiler.getContainer();
-    const int op_index = container->getContext();
-    auto *hash_probe_line = container->getEventLine(op_index + 100);
-    hash_probe_line->emplace_back();
     std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter;
     if (has_probe_side_bloom_filter_) {
-      bloom_filter_adapter.reset(
-          new BloomFilterAdapter(probe_bloom_filters_, probe_attribute_ids_));
-    }
-    int hash_probe_cnt = 0;
-    int bloom_probe_cnt = 0;
-    while (accessor->next()) {
-      if (has_probe_side_bloom_filter_ && bloom_filter_adapter->miss(accessor, &bloom_probe_cnt)) {
-        continue;  // On a bloom filter miss, probing the hash table can be skipped.
-      }
 
-      ++hash_probe_cnt;
-      TypedValue key = accessor->getTypedValue(key_attr_id);
-      if (check_for_null_keys && key.isNull()) {
-        continue;
-      }
-      const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
-                                                                     : key.getHash();
-      const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
-                                                               : true_hash;
-      std::size_t entry_num = 0;
-      const ValueT *value;
-      while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
-        (*functor)(*accessor, *value);
-        if (!allow_duplicate_keys) {
-          break;
+      // Find (and cache) the size of each attribute in the probe lists.
+      // NOTE(nav): This code uses the accessor to get the size,
+      // and hence only works if there's at least one tuple.
+      std::vector<std::vector<std::size_t>> attr_size_vectors;
+      attr_size_vectors.reserve(probe_attribute_ids_.size());
+      for (const auto &probe_attr_vector : probe_attribute_ids_) {
+        std::vector<std::size_t> attr_sizes;
+        attr_sizes.reserve(probe_attr_vector.size());
+        for (const auto &probe_attr : probe_attr_vector) {
+          // Using the function below is the easiest way to get attribute size
+          // here. The template parameter check_null is set to false to ensure
+          // that we get the size even if the first attr value is null.
+          auto val_and_size = accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, probe_attr);
+          attr_sizes.push_back(val_and_size.second);
         }
+        attr_size_vectors.push_back(attr_sizes);
       }
+
+      bloom_filter_adapter.reset(new BloomFilterAdapter(
+              probe_bloom_filters_, probe_attribute_ids_, attr_size_vectors));
+
+      // We want to have large batch sizes for cache efficiency while probeing,
+      // but small batch sizes to ensure that the adaptation logic kicks in
+      // (and does early). We use exponentially increasing batch sizes to
+      // achieve a balance between the two.
+      //
+      // We also keep track of num_tuples_left in the block, to ensure that
+      // we don't reserve an unnecessarily large vector.
+      std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size;
+      std::uint32_t num_tuples_left = accessor->getNumTuples();
+      std::vector<tuple_id> batch(num_tuples_left);
+
+      do {
+        const std::uint32_t batch_size =
+            batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
+        for (std::size_t i = 0; i < batch_size; ++i) {
+          accessor->next();
+          batch.push_back(accessor->getCurrentPosition());
+        }
+
+        std::size_t num_hits;
+        if (FLAGS_adapt_bloom_filters) {
+          num_hits = bloom_filter_adapter->bulkProbe<true>(accessor, batch);
+        } else {
+          num_hits = bloom_filter_adapter->bulkProbe<false>(accessor, batch);
+        }
+
+        for (std::size_t i = 0; i < num_hits; ++i){
+          const tuple_id probe_tid = batch[i];
+          TypedValue key = accessor->getTypedValueAtAbsolutePosition(key_attr_id, probe_tid);
+          if (check_for_null_keys && key.isNull()) {
+            continue;
+          }
+          const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
+                                                                        : key.getHash();
+          const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
+                                                                  : true_hash;
+          std::size_t entry_num = 0;
+          const ValueT *value;
+          while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
+            (*functor)(probe_tid, *value);
+            if (!allow_duplicate_keys)
+              break;
+          }
+        }
+        num_tuples_left -= batch_size;
+        batch_size_try = batch_size * 2;
+      } while (!accessor->iterationFinished());
     }
-    hash_probe_line->back().endEvent();
-    hash_probe_line->back().setPayload(hash_probe_cnt);
-    if (bloom_probe_cnt > 0) {
-      auto *bloom_probe_line = container->getEventLine(op_index + 200);
-      bloom_probe_line->emplace_back();
-      bloom_probe_line->back().endEvent();
-      bloom_probe_line->back().setPayload(bloom_probe_cnt);
+
+    else { // no Bloom filters to probe
+      while(accessor->next()) {
+        TypedValue key = accessor->getTypedValue(key_attr_id);
+        if (check_for_null_keys && key.isNull()) {
+          continue;
+        }
+        const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
+                                      : key.getHash();
+        const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
+                                          : true_hash;
+        std::size_t entry_num = 0;
+        const ValueT *value;
+        while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
+          (*functor)(*accessor, *value);
+          if (!allow_duplicate_keys)
+            break;
+        }
+      }
     }
   });
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/319d557b/utility/BloomFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/BloomFilter.hpp b/utility/BloomFilter.hpp
index b93df84..973ca14 100644
--- a/utility/BloomFilter.hpp
+++ b/utility/BloomFilter.hpp
@@ -26,6 +26,7 @@
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <cstring>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -44,11 +45,358 @@ namespace quickstep {
  *  @{
  */
 
+class BloomFilterOriginal;
+class BloomFilterBlocked;
+typedef BloomFilterBlocked BloomFilter;
+
+/**
+ * @brief A "blocked" version of Bloom Filter based on this paper:
+ *        Putze, Felix, Peter Sanders, and Johannes Singler.
+ *        "Cache-, hash-and space-efficient bloom filters."
+ *        International Workshop on Experimental and Efficient Algorithms.
+ *        Springer Berlin Heidelberg, 2007.
+ **/
+class BloomFilterBlocked {
+ public:
+  static const std::uint8_t kNumBitsPerByte = 8;
+  static const std::uint8_t kMaxNumHashFns = 4;
+
+  // This union allows us to read/write position in convenient fashion,
+  // through nested structs and their bitfield members
+  //
+  // A position can simply be a 32-bit hash
+  // Or it can be a cache line (block of 512 bits) and position within it
+  // Or it can be a byte (block of 8 bits) and position within it
+  union Position {
+    std::uint32_t hash;
+    struct CacheLinePosition {
+      unsigned index_in_line : 9;
+      unsigned line_num : 23;
+    } cache_line_pos;
+    struct BytePosition {
+      unsigned index_in_byte : 3;
+      unsigned byte_num : 29;
+    } byte_pos;
+  };
+
+  // This Bloom filter implementation requires the bit array to be a
+  // multiple of the cache-line size. So we either have to round up to a 
+  // multiple (default behavior) or round down to a multiple.
+  // Rounding up is usually preferable but rounding down is necessary when
+  // we are given a bit array that we don't control the size of, in the
+  // constructor.
+  static std::uint64_t getNearestAllowedSize(
+      const std::uint64_t approx_size,
+      bool round_down = false) {
+    if (round_down)
+      return (approx_size / kCacheLineBytes) * kCacheLineBytes;
+    return ((approx_size + kCacheLineBytes - 1)/ kCacheLineBytes) * kCacheLineBytes;
+  }
+
+
+  /**
+   * @brief Constructor.
+   * @note When no bit_array is being passed to the constructor,
+   *       then the bit_array is owned and managed by this class.
+   *
+   * @param hash_fn_count The number of hash functions used by this bloom filter.
+   * @param bit_array_size_in_bytes Size of the bit array.
+   **/
+  BloomFilterBlocked(const std::uint8_t hash_fn_count,
+              const std::uint64_t bit_array_size_in_bytes)
+      : hash_fn_count_(hash_fn_count),
+        array_size_in_bytes_(getNearestAllowedSize(bit_array_size_in_bytes)),
+        is_bit_array_owner_(true),
+        bit_array_(new std::uint8_t[array_size_in_bytes_]) {
+    reset();
+  }
+
+  /**
+   * @brief Constructor.
+   * @note When a bit_array is passed as an argument to the constructor,
+   *       then the ownership of the bit array lies with the caller.
+   *
+   * @param hash_fn_count The number of hash functions used by this bloom filter.
+   * @param bit_array_size_in_bytes Size of the bit array.
+   * @param bit_array A pointer to the memory region that is used to store bit array.
+   * @param is_initialized A boolean that indicates whether to zero-out the region
+   *                       before use or not.
+   **/
+  BloomFilterBlocked(const std::uint8_t hash_fn_count,
+              const std::uint64_t bit_array_size_in_bytes,
+              std::uint8_t *bit_array,
+              const bool is_initialized)
+      : hash_fn_count_(hash_fn_count),
+        array_size_in_bytes_(getNearestAllowedSize(bit_array_size_in_bytes, true)),
+        is_bit_array_owner_(false),
+        bit_array_(bit_array) {  // Owned by the calling method.
+    if (!is_initialized) {
+      reset();
+    }
+  }
+
+  /**
+   * @brief Constructor.
+   * @note When a bloom filter proto is passed as an initializer,
+   *       then the bit_array is owned and managed by this class.
+   *
+   * @param bloom_filter_proto The protobuf representation of a
+   *        bloom filter configuration.
+   **/
+  explicit BloomFilterBlocked(const serialization::BloomFilter &bloom_filter_proto)
+      : hash_fn_count_(bloom_filter_proto.number_of_hashes()),
+        array_size_in_bytes_(bloom_filter_proto.bloom_filter_size()),
+        is_bit_array_owner_(true),
+        bit_array_(new std::uint8_t[array_size_in_bytes_]) {
+    reset();
+  }
+
+  /**
+   * @brief Destructor.
+   **/
+  ~BloomFilterBlocked() {
+    if (is_bit_array_owner_) {
+      bit_array_.reset();
+    } else {
+      bit_array_.release();
+    }
+  }
+
+  static bool ProtoIsValid(const serialization::BloomFilter &bloom_filter_proto) {
+    return bloom_filter_proto.IsInitialized();
+  }
+
+  /**
+   * @brief Zeros out the contents of the bit array.
+   **/
+  inline void reset() {
+    // Initialize the bit_array with all zeros.
+    std::fill_n(bit_array_.get(), array_size_in_bytes_, 0x00);
+    inserted_element_count_ = 0;
+  }
+
+  /**
+   * @brief Get the number of hash functions used in this bloom filter.
+   *
+   * @return Returns the number of hash functions.
+   **/
+  inline std::uint8_t getNumberOfHashes() const {
+    return hash_fn_count_;
+  }
+
+  /**
+   * @brief Get the size of the bit array in bytes for this bloom filter.
+   *
+   * @return Returns the bit array size (in bytes).
+   **/
+  inline std::uint64_t getBitArraySize() const {
+    return array_size_in_bytes_;
+  }
+
+  /**
+   * @brief Get the constant pointer to the bit array for this bloom filter
+   *
+   * @return Returns constant pointer to the bit array.
+   **/
+  inline const std::uint8_t* getBitArray() const {
+    return bit_array_.get();
+  }
+
+  template <typename T>
+  void insert(const T &value) {
+    insert(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
+  }
+
+  /**
+   * @brief Inserts a given value into the bloom filter in a thread-safe manner.
+   *
+   * @param key_begin A pointer to the value being inserted.
+   * @param length Size of the value being inserted in bytes.
+   */
+  inline void insert(const std::uint8_t *key_begin, const std::size_t length) {
+      SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
+      insertUnSafe(key_begin, length);
+  }
+
+  template <typename T>
+  void insertUnSafe(const T &value) {
+    insertUnSafe(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
+  }
+
+  /**
+   * @brief Inserts a given value into the bloom filter.
+   * @Warning This is a faster thread-unsafe version of the insert() function.
+   *          The caller needs to ensure the thread safety.
+   *
+   * @param key_begin A pointer to the value being inserted.
+   * @param length Size of the value being inserted in bytes.
+   */
+  inline void insertUnSafe(const std::uint8_t *key_begin, const std::size_t length) {
+    Position first_pos = getFirstPosition(key_begin, length);
+    setBitAtPosition(first_pos);
+    Position other_pos;
+    for (std::uint8_t i = 1; i <hash_fn_count_; ++i) {
+      other_pos = getOtherPosition(key_begin, length, first_pos, i);
+      setBitAtPosition(other_pos);
+    }
+    ++inserted_element_count_;
+  }
+
+  template <typename T>
+  bool contains(const T &value) {
+    return contains(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
+  }
+
+  /**
+   * @brief Test membership of a given value in the bloom filter.
+   *        If true is returned, then a value may or may not be present in the bloom filter.
+   *        If false is returned, a value is certainly not present in the bloom filter.
+   *
+   * @note The membersip test does not require any locks, because the assumption is that
+   *       the bloom filter will only be used after it has been built.
+   *
+   * @param key_begin A pointer to the value being tested for membership.
+   * @param length Size of the value being inserted in bytes.
+   */
+  inline bool contains(
+      const std::uint8_t *__restrict__ key_begin,
+      const std::size_t length) const {
+    Position first_pos = getFirstPosition(key_begin, length);
+    if (!getBitAtPosition(first_pos)) {
+      return false;
+    }
+    Position other_pos;
+    for (std::uint8_t i = 1; i < hash_fn_count_; ++i) {
+      other_pos = getOtherPosition(key_begin, length, first_pos, i);
+      if (!getBitAtPosition(other_pos)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * @brief Perform a bitwise-OR of the given Bloom filter with this bloom filter.
+   *        Essentially, it does a union of this bloom filter with the passed bloom filter.
+   *
+   * @param bloom_filter A const pointer to the bloom filter object to do bitwise-OR with.
+   */
+  inline void bitwiseOr(const BloomFilterBlocked *bloom_filter) {
+    SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
+    for (std::size_t byte_index = 0; byte_index < bloom_filter->getBitArraySize(); ++byte_index) {
+      (bit_array_.get())[byte_index] |= bloom_filter->getBitArray()[byte_index];
+    }
+  }
+
+  /**
+   * @brief Return the number of elements currently inserted into bloom filter.
+   *
+   * @return The number of elements inserted into bloom filter.
+   **/
+  inline std::uint32_t element_count() const {
+    return inserted_element_count_;
+  }
+
+ protected:
+  Position getFirstPosition(const std::uint8_t *begin, std::size_t length) const {
+    Position pos;
+    pos.hash = hash_identity(begin, length);
+    return pos;
+  }
+
+  Position getOtherPosition(
+      const std::uint8_t *begin,
+      std::size_t length,
+      const Position first_pos,
+      const std::uint8_t index) const {
+    Position pos;
+    pos.hash = hash_multiplicative(begin, length, hash_fn_[index-1]);
+    pos.cache_line_pos.line_num = first_pos.cache_line_pos.line_num;
+    return pos;
+  }
+
+  void fillPosition(
+      const std::uint8_t *begin,
+      std::size_t length,
+      const std::uint8_t index,
+      Position positions[]) const {
+    if (index == 0)
+      positions[0].hash = hash_identity(begin, length);
+    else {
+      positions[index].hash = hash_multiplicative(begin, length, hash_fn_[index-1]);
+      positions[index].cache_line_pos.line_num = positions[0].cache_line_pos.line_num;
+    }
+  }
+
+  void setBitAtPosition(const Position &pos) {
+    (bit_array_.get())[pos.byte_pos.byte_num] |= (1 << pos.byte_pos.index_in_byte);
+  }
+
+  bool getBitAtPosition(const Position &pos) const {
+    return (bit_array_.get())[pos.byte_pos.byte_num] & (1 << pos.byte_pos.index_in_byte);
+  }
+
+  inline std::uint32_t hash_identity(
+      const std::uint8_t *__restrict__ begin,
+      std::size_t length) const {
+    std::uint32_t hash;
+    if (length >= 4)
+      hash = *reinterpret_cast<const std::uint32_t*> (begin);
+    else
+      std::memcpy(&hash, begin, length);
+    return hash % (array_size_in_bytes_ * kNumBitsPerByte);
+  }
+
+  inline std::uint32_t hash_multiplicative(
+      const std::uint8_t *__restrict__ begin,
+      std::size_t length,
+      const std::uint64_t multiplier) const {
+    std::uint32_t hash = 0;
+    std::size_t bytes_hashed = 0;
+    if (length >= 4) {
+      while (bytes_hashed < length) {
+        auto val = *reinterpret_cast<const std::uint32_t *>(begin + bytes_hashed);
+        hash += (multiplier * val) >> 24;
+        bytes_hashed += 4;
+      }
+    }
+    while (bytes_hashed < length) {
+      std::uint8_t val = *(begin + bytes_hashed);
+      hash += (multiplier * val) >> 24;
+      bytes_hashed++;
+    }
+    return hash;//  % (array_size_in_bytes_ * kNumBitsPerByte);
+  }
+
+ private:
+  const std::uint32_t hash_fn_count_;
+  const std::uint64_t array_size_in_bytes_;
+  std::uint32_t inserted_element_count_;
+  const bool is_bit_array_owner_;
+
+  static constexpr std::uint64_t kKnuthGoldenRatioNumber = 2654435761;
+  const std::uint64_t hash_fn_[kMaxNumHashFns] = { // hash_fn_[i] is 2**(i+1) - 1
+    0x00000001 * kKnuthGoldenRatioNumber, // 0x00000003, 0x00000007, 0x0000000f,
+    // 0x0000001f * kKnuthGoldenRatioNumber, // 0x0000003f, 0x0000007f, 0x000000ff,
+    0x000001ff * kKnuthGoldenRatioNumber, // 0x000003ff, 0x000007ff, 0x00000fff,
+    // 0x00001fff * kKnuthGoldenRatioNumber, // 0x00003fff, 0x00007fff, 0x0000ffff,
+    0x0001ffff * kKnuthGoldenRatioNumber, // 0x0003ffff, 0x0007ffff, 0x000fffff,
+    // 0x001fffff * kKnuthGoldenRatioNumber, // 0x003fffff, 0x007fffff, 0x00ffffff,
+    0x01ffffff * kKnuthGoldenRatioNumber, // 0x03ffffff, 0x07ffffff, 0x0fffffff,
+    // 0x1fffffff * kKnuthGoldenRatioNumber  // 0x3fffffff, 0x7fffffff, 0xffffffff
+    };
+
+  alignas(kCacheLineBytes) std::unique_ptr<std::uint8_t> bit_array_;
+  alignas(kCacheLineBytes) mutable SpinSharedMutex<false> bloom_filter_insert_mutex_;
+
+  DISALLOW_COPY_AND_ASSIGN(BloomFilterBlocked);
+};
+
 /**
  * @brief A simple Bloom Filter implementation with basic primitives
  *        based on Partow's Bloom Filter implementation.
  **/
-class BloomFilter {
+class BloomFilterOriginal {
  public:
   static const uint32_t kNumBitsPerByte = 8;
 
@@ -57,21 +405,17 @@ class BloomFilter {
    * @note When no bit_array is being passed to the constructor,
    *       then the bit_array is owned and managed by this class.
    *
-   * @param random_seed A random_seed that generates unique hash functions.
    * @param hash_fn_count The number of hash functions used by this bloom filter.
    * @param bit_array_size_in_bytes Size of the bit array.
    **/
-  BloomFilter(const std::uint64_t random_seed,
-              const std::size_t hash_fn_count,
+  BloomFilterOriginal(const std::size_t hash_fn_count,
               const std::uint64_t bit_array_size_in_bytes)
-      : random_seed_(random_seed),
-        hash_fn_count_(hash_fn_count),
+      : hash_fn_count_(hash_fn_count),
         array_size_in_bytes_(bit_array_size_in_bytes),
         array_size_(array_size_in_bytes_ * kNumBitsPerByte),
         bit_array_(new std::uint8_t[array_size_in_bytes_]),
         is_bit_array_owner_(true) {
     reset();
-    generate_unique_hash_fn();
   }
 
   /**
@@ -79,20 +423,17 @@ class BloomFilter {
    * @note When a bit_array is passed as an argument to the constructor,
    *       then the ownership of the bit array lies with the caller.
    *
-   * @param random_seed A random_seed that generates unique hash functions.
    * @param hash_fn_count The number of hash functions used by this bloom filter.
    * @param bit_array_size_in_bytes Size of the bit array.
    * @param bit_array A pointer to the memory region that is used to store bit array.
    * @param is_initialized A boolean that indicates whether to zero-out the region
    *                       before use or not.
    **/
-  BloomFilter(const std::uint64_t random_seed,
-              const std::size_t hash_fn_count,
+  BloomFilterOriginal(const std::size_t hash_fn_count,
               const std::uint64_t bit_array_size_in_bytes,
               std::uint8_t *bit_array,
               const bool is_initialized)
-      : random_seed_(random_seed),
-        hash_fn_count_(hash_fn_count),
+      : hash_fn_count_(hash_fn_count),
         array_size_in_bytes_(bit_array_size_in_bytes),
         array_size_(bit_array_size_in_bytes * kNumBitsPerByte),
         bit_array_(bit_array),  // Owned by the calling method.
@@ -100,7 +441,6 @@ class BloomFilter {
     if (!is_initialized) {
       reset();
     }
-    generate_unique_hash_fn();
   }
 
   /**
@@ -111,21 +451,19 @@ class BloomFilter {
    * @param bloom_filter_proto The protobuf representation of a
    *        bloom filter configuration.
    **/
-  explicit BloomFilter(const serialization::BloomFilter &bloom_filter_proto)
-      : random_seed_(bloom_filter_proto.bloom_filter_seed()),
-        hash_fn_count_(bloom_filter_proto.number_of_hashes()),
+  explicit BloomFilterOriginal(const serialization::BloomFilter &bloom_filter_proto)
+      : hash_fn_count_(bloom_filter_proto.number_of_hashes()),
         array_size_in_bytes_(bloom_filter_proto.bloom_filter_size()),
         array_size_(array_size_in_bytes_ * kNumBitsPerByte),
         bit_array_(new std::uint8_t[array_size_in_bytes_]),
         is_bit_array_owner_(true) {
     reset();
-    generate_unique_hash_fn();
   }
 
   /**
    * @brief Destructor.
    **/
-  ~BloomFilter() {
+  ~BloomFilterOriginal() {
     if (is_bit_array_owner_) {
       bit_array_.reset();
     } else {
@@ -147,15 +485,6 @@ class BloomFilter {
   }
 
   /**
-   * @brief Get the random seed that was used to initialize this bloom filter.
-   *
-   * @return Returns the random seed.
-   **/
-  inline std::uint64_t getRandomSeed() const {
-    return random_seed_;
-  }
-
-  /**
    * @brief Get the number of hash functions used in this bloom filter.
    *
    * @return Returns the number of hash functions.
@@ -198,7 +527,7 @@ class BloomFilter {
 
     // Determine all the bit positions that are required to be set.
     for (std::size_t i = 0; i < hash_fn_count_; ++i) {
-      compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit);
+      compute_indices(hash_multiplicative(key_begin, length, hash_fn_[i]), &bit_index, &bit);
       modified_bit_positions.push_back(std::make_pair(bit_index, bit));
     }
 
@@ -243,7 +572,7 @@ class BloomFilter {
     std::size_t bit = 0;
 
     for (std::size_t i = 0; i < hash_fn_count_; ++i) {
-      compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit);
+      compute_indices(hash_multiplicative(key_begin, length, hash_fn_[i]), &bit_index, &bit);
       (bit_array_.get())[bit_index / kNumBitsPerByte] |= (1 << bit);
     }
 
@@ -265,7 +594,7 @@ class BloomFilter {
     std::size_t bit_index = 0;
     std::size_t bit = 0;
     for (std::size_t i = 0; i < hash_fn_count_; ++i) {
-      compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit);
+      compute_indices(hash_multiplicative(key_begin, length, hash_fn_[i]), &bit_index, &bit);
       if (((bit_array_.get())[bit_index / kNumBitsPerByte] & (1 << bit)) != (1 << bit)) {
         return false;
       }
@@ -279,7 +608,7 @@ class BloomFilter {
    *
    * @param bloom_filter A const pointer to the bloom filter object to do bitwise-OR with.
    */
-  inline void bitwiseOr(const BloomFilter *bloom_filter) {
+  inline void bitwiseOr(const BloomFilterOriginal *bloom_filter) {
     SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
     for (std::size_t byte_index = 0; byte_index < bloom_filter->getBitArraySize(); ++byte_index) {
       (bit_array_.get())[byte_index] |= bloom_filter->getBitArray()[byte_index];
@@ -301,95 +630,28 @@ class BloomFilter {
     *bit = *bit_index % kNumBitsPerByte;
   }
 
-  void generate_unique_hash_fn() {
-    hash_fn_.reserve(hash_fn_count_);
-    const std::uint32_t predef_hash_fn_count = 128;
-    static const std::uint32_t predef_hash_fn[predef_hash_fn_count] = {
-       0xAAAAAAAA, 0x55555555, 0x33333333, 0xCCCCCCCC,
-       0x66666666, 0x99999999, 0xB5B5B5B5, 0x4B4B4B4B,
-       0xAA55AA55, 0x55335533, 0x33CC33CC, 0xCC66CC66,
-       0x66996699, 0x99B599B5, 0xB54BB54B, 0x4BAA4BAA,
-       0xAA33AA33, 0x55CC55CC, 0x33663366, 0xCC99CC99,
-       0x66B566B5, 0x994B994B, 0xB5AAB5AA, 0xAAAAAA33,
-       0x555555CC, 0x33333366, 0xCCCCCC99, 0x666666B5,
-       0x9999994B, 0xB5B5B5AA, 0xFFFFFFFF, 0xFFFF0000,
-       0xB823D5EB, 0xC1191CDF, 0xF623AEB3, 0xDB58499F,
-       0xC8D42E70, 0xB173F616, 0xA91A5967, 0xDA427D63,
-       0xB1E8A2EA, 0xF6C0D155, 0x4909FEA3, 0xA68CC6A7,
-       0xC395E782, 0xA26057EB, 0x0CD5DA28, 0x467C5492,
-       0xF15E6982, 0x61C6FAD3, 0x9615E352, 0x6E9E355A,
-       0x689B563E, 0x0C9831A8, 0x6753C18B, 0xA622689B,
-       0x8CA63C47, 0x42CC2884, 0x8E89919B, 0x6EDBD7D3,
-       0x15B6796C, 0x1D6FDFE4, 0x63FF9092, 0xE7401432,
-       0xEFFE9412, 0xAEAEDF79, 0x9F245A31, 0x83C136FC,
-       0xC3DA4A8C, 0xA5112C8C, 0x5271F491, 0x9A948DAB,
-       0xCEE59A8D, 0xB5F525AB, 0x59D13217, 0x24E7C331,
-       0x697C2103, 0x84B0A460, 0x86156DA9, 0xAEF2AC68,
-       0x23243DA5, 0x3F649643, 0x5FA495A8, 0x67710DF8,
-       0x9A6C499E, 0xDCFB0227, 0x46A43433, 0x1832B07A,
-       0xC46AFF3C, 0xB9C8FFF0, 0xC9500467, 0x34431BDF,
-       0xB652432B, 0xE367F12B, 0x427F4C1B, 0x224C006E,
-       0x2E7E5A89, 0x96F99AA5, 0x0BEB452A, 0x2FD87C39,
-       0x74B2E1FB, 0x222EFD24, 0xF357F60C, 0x440FCB1E,
-       0x8BBE030F, 0x6704DC29, 0x1144D12F, 0x948B1355,
-       0x6D8FD7E9, 0x1C11A014, 0xADD1592F, 0xFB3C712E,
-       0xFC77642F, 0xF9C4CE8C, 0x31312FB9, 0x08B0DD79,
-       0x318FA6E7, 0xC040D23D, 0xC0589AA7, 0x0CA5C075,
-       0xF874B172, 0x0CF914D5, 0x784D3280, 0x4E8CFEBC,
-       0xC569F575, 0xCDB2A091, 0x2CC016B4, 0x5C5F4421
-    };
-    if (hash_fn_count_ <= predef_hash_fn_count) {
-      std::copy(predef_hash_fn, predef_hash_fn + hash_fn_count_, hash_fn_.begin());
-      for (std::uint32_t i = 0; i < hash_fn_.size(); ++i) {
-        hash_fn_[i] = hash_fn_[i] * hash_fn_[(i + 3) % hash_fn_count_] + static_cast<std::uint32_t>(random_seed_);
+  inline std::uint32_t hash_multiplicative(
+      const std::uint8_t *begin,
+      std::size_t remaining_length,
+      const std::uint64_t multiplier) const {
+    std::uint32_t hash = 0;
+    std::size_t bytes_hashed = 0;
+    if (remaining_length >= 4) {
+      while (bytes_hashed < remaining_length) {
+        auto val = *reinterpret_cast<const std::uint32_t *>(begin + bytes_hashed);
+        hash += (multiplier * val) >> 32;
+        bytes_hashed += 4;
       }
-    } else {
-      LOG(FATAL) << "Requested number of hash functions is too large.";
     }
-  }
-
-  inline std::uint32_t hash_ap(const std::uint8_t *begin, std::size_t remaining_length, std::uint32_t hash) const {
-    const std::uint8_t *itr = begin;
-    std::uint32_t loop = 0;
-    while (remaining_length >= 8) {
-      const std::uint32_t &i1 = *(reinterpret_cast<const std::uint32_t*>(itr)); itr += sizeof(std::uint32_t);
-      const std::uint32_t &i2 = *(reinterpret_cast<const std::uint32_t*>(itr)); itr += sizeof(std::uint32_t);
-      hash ^= (hash <<  7) ^  i1 * (hash >> 3) ^ (~((hash << 11) + (i2 ^ (hash >> 5))));
-      remaining_length -= 8;
-    }
-    if (remaining_length) {
-      if (remaining_length >= 4) {
-        const std::uint32_t &i = *(reinterpret_cast<const std::uint32_t*>(itr));
-        if (loop & 0x01) {
-          hash ^= (hash <<  7) ^  i * (hash >> 3);
-        } else {
-          hash ^= (~((hash << 11) + (i ^ (hash >> 5))));
-        }
-        ++loop;
-        remaining_length -= 4;
-        itr += sizeof(std::uint32_t);
-      }
-      if (remaining_length >= 2) {
-        const std::uint16_t &i = *(reinterpret_cast<const std::uint16_t*>(itr));
-        if (loop & 0x01) {
-          hash ^= (hash <<  7) ^  i * (hash >> 3);
-        } else {
-          hash ^= (~((hash << 11) + (i ^ (hash >> 5))));
-        }
-        ++loop;
-        remaining_length -= 2;
-        itr += sizeof(std::uint16_t);
-      }
-      if (remaining_length) {
-        hash += ((*itr) ^ (hash * 0xA5A5A5A5)) + loop;
-      }
+    while (bytes_hashed < remaining_length) {
+      std::uint8_t val = *(begin + bytes_hashed);
+      hash += (multiplier * val) >> 32;
+      bytes_hashed++;
     }
     return hash;
   }
 
  private:
-  const std::uint64_t random_seed_;
-  std::vector<std::uint32_t> hash_fn_;
   const std::uint32_t hash_fn_count_;
   std::uint64_t array_size_in_bytes_;
   std::uint64_t array_size_;
@@ -397,9 +659,21 @@ class BloomFilter {
   std::uint32_t inserted_element_count_;
   const bool is_bit_array_owner_;
 
+  static constexpr std::uint64_t kKnuthGoldenRatioNumber = 2654435761;
+  static constexpr std::size_t kMaxNumHashFns = 8;
+  const std::uint64_t hash_fn_[kMaxNumHashFns] = { // hash_fn_[i] is 2**(i+1) - 1
+    0x00000001 * kKnuthGoldenRatioNumber, // 0x00000003, 0x00000007, 0x0000000f,
+    0x0000001f * kKnuthGoldenRatioNumber, // 0x0000003f, 0x0000007f, 0x000000ff,
+    0x000001ff * kKnuthGoldenRatioNumber, // 0x000003ff, 0x000007ff, 0x00000fff,
+    0x00001fff * kKnuthGoldenRatioNumber, // 0x00003fff, 0x00007fff, 0x0000ffff,
+    0x0001ffff * kKnuthGoldenRatioNumber, // 0x0003ffff, 0x0007ffff, 0x000fffff,
+    0x001fffff * kKnuthGoldenRatioNumber, // 0x003fffff, 0x007fffff, 0x00ffffff,
+    0x01ffffff * kKnuthGoldenRatioNumber, // 0x03ffffff, 0x07ffffff, 0x0fffffff,
+    0x1fffffff * kKnuthGoldenRatioNumber  // 0x3fffffff, 0x7fffffff, 0xffffffff
+    };
   alignas(kCacheLineBytes) mutable SpinSharedMutex<false> bloom_filter_insert_mutex_;
 
-  DISALLOW_COPY_AND_ASSIGN(BloomFilter);
+  DISALLOW_COPY_AND_ASSIGN(BloomFilterOriginal);
 };
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/319d557b/utility/BloomFilter.proto
----------------------------------------------------------------------
diff --git a/utility/BloomFilter.proto b/utility/BloomFilter.proto
index 8dd9163..b5d14a9 100644
--- a/utility/BloomFilter.proto
+++ b/utility/BloomFilter.proto
@@ -21,10 +21,8 @@ message BloomFilter {
   // The default values were determined from empirical experiments.
   // These values control the amount of false positivity that
   // is expected from Bloom Filter.
-  // - Default seed for initializing family of hashes = 0xA5A5A5A55A5A5A5A.
   // - Default bloom filter size = 10 KB.
   // - Default number of hash functions used in bloom filter = 5.
-  optional fixed64 bloom_filter_seed = 1 [default = 0xA5A5A5A55A5A5A5A];
-  optional uint32 bloom_filter_size = 2 [default = 10000];
-  optional uint32 number_of_hashes = 3 [default = 5];
+  optional uint32 bloom_filter_size = 1 [default = 10000];
+  optional uint32 number_of_hashes = 2 [default = 5];
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/319d557b/utility/BloomFilterAdapter.hpp
----------------------------------------------------------------------
diff --git a/utility/BloomFilterAdapter.hpp b/utility/BloomFilterAdapter.hpp
index 43db6c3..fbd6957 100644
--- a/utility/BloomFilterAdapter.hpp
+++ b/utility/BloomFilterAdapter.hpp
@@ -40,84 +40,100 @@ namespace quickstep {
 class BloomFilterAdapter {
  public:
   BloomFilterAdapter(const std::vector<const BloomFilter*> &bloom_filters,
-                     const std::vector<std::vector<attribute_id>> &attribute_ids)
-      : num_bloom_filters_(bloom_filters.size()) {
+                     const std::vector<std::vector<attribute_id>> attribute_ids,
+                     const std::vector<std::vector<std::size_t>> attr_sizes) {
     DCHECK_EQ(bloom_filters.size(), attribute_ids.size());
+    DCHECK_EQ(bloom_filters.size(), attr_sizes.size());
 
-    bloom_filter_entries_.reserve(num_bloom_filters_);
-    bloom_filter_entry_indices_.reserve(num_bloom_filters_);
-
-    for (std::size_t i = 0; i < num_bloom_filters_; ++i) {
-      bloom_filter_entries_.emplace_back(bloom_filters[i], attribute_ids[i]);
-      bloom_filter_entry_indices_.emplace_back(i);
+    bloom_filter_entries_.reserve(bloom_filters.size());
+    for (std::size_t i = 0; i < bloom_filters.size(); ++i) {
+      bloom_filter_entries_.emplace_back(
+          new BloomFilterEntry(
+              bloom_filters[i], attribute_ids[i], attr_sizes[i]));
     }
   }
 
-  template <typename ValueAccessorT>
-  inline bool miss(const ValueAccessorT *accessor, int *probe_cnt) {
-    return missImpl<ValueAccessorT, true>(accessor, probe_cnt);
+  ~BloomFilterAdapter() {
+    for (auto &entry : bloom_filter_entries_) {
+      delete entry;
+    }
   }
 
-  template <typename ValueAccessorT, bool adapt_filters>
-  inline bool missImpl(const ValueAccessorT *accessor, int *probe_cnt) {
-    for (std::size_t i = 0; i < num_bloom_filters_; ++i) {
-      const std::size_t entry_idx = bloom_filter_entry_indices_[i];
-      BloomFilterEntry &entry = bloom_filter_entries_[entry_idx];
-      if (adapt_filters) {
-        ++entry.cnt;
-      }
-
-      const BloomFilter *bloom_filter = entry.bloom_filter;
-      for (const attribute_id &attr_id : entry.attribute_ids) {
-        ++(*probe_cnt);
-        const std::pair<const void*, std::size_t> value_and_byte_length =
-            accessor->getUntypedValueAndByteLength(attr_id);
-        if (!bloom_filter->contains(static_cast<const std::uint8_t*>(value_and_byte_length.first),
-                                    value_and_byte_length.second)) {
-          if (adapt_filters) {
-            // Record miss
-            ++entry.miss;
-
-            // Update entry order
-            if (i > 0) {
-              const std::size_t prev_entry_idx = bloom_filter_entry_indices_[i-1];
-              if (entry.isBetterThan(bloom_filter_entries_[prev_entry_idx])) {
-                bloom_filter_entry_indices_[i-1] = entry_idx;
-                bloom_filter_entry_indices_[i] = prev_entry_idx;
-              }
-            }
-          }
-          return true;
-        }
-      }
+  template <bool adapt_filters, typename ValueAccessorT>
+  inline std::size_t bulkProbe(const ValueAccessorT *accessor,
+                               std::vector<tuple_id> &batch) {
+    std::size_t out_size = batch.size();
+    for (auto &entry : bloom_filter_entries_) {
+      out_size = bulkProbeBloomFilterEntry(*entry, accessor, batch, out_size);
     }
-    return false;
+    adaptEntryOrder();
+    return out_size;
   }
 
  private:
   struct BloomFilterEntry {
     BloomFilterEntry(const BloomFilter *in_bloom_filter,
-                     const std::vector<attribute_id> &in_attribute_ids)
+                     const std::vector<attribute_id> &in_attribute_ids,
+                     const std::vector<std::size_t> &in_attribute_sizes)
         : bloom_filter(in_bloom_filter),
           attribute_ids(in_attribute_ids),
+          attribute_sizes(in_attribute_sizes),
           miss(0),
           cnt(0) {
     }
 
-    inline bool isBetterThan(const BloomFilterEntry& other) {
-      return static_cast<std::uint64_t>(miss) * other.cnt
-                 > static_cast<std::uint64_t>(cnt + 5) * (other.miss + 5);
+    static bool isBetterThan(const BloomFilterEntry *a,
+                             const BloomFilterEntry *b) {
+      return a->miss_rate > b->miss_rate;
     }
 
     const BloomFilter *bloom_filter;
-    const std::vector<attribute_id> &attribute_ids;
+    const std::vector<attribute_id> attribute_ids;
+    const std::vector<std::size_t> attribute_sizes;
     std::uint32_t miss;
     std::uint32_t cnt;
+    float miss_rate;
   };
 
-  const std::size_t num_bloom_filters_;
-  std::vector<BloomFilterEntry> bloom_filter_entries_;
-  std::vector<std::size_t> bloom_filter_entry_indices_;
+  template <bool adapt_filters, typename ValueAccessorT>
+  inline std::size_t bulkProbeBloomFilterEntry(
+      BloomFilterEntry &entry,
+      const ValueAccessorT *accessor,
+      std::vector<tuple_id> &batch,
+      const std::size_t in_size) {
+    std::size_t out_size = 0;
+    const BloomFilter *bloom_filter = entry.bloom_filter;
+
+    for (std::size_t t = 0; t < in_size; ++t) {
+      tuple_id tid = batch[t];
+      for (std::size_t a = 0; a < entry.attribute_ids.size(); ++a) {
+        const attribute_id &attr_id = entry.attribute_ids[a];
+        const std::size_t size = entry.attribute_sizes[a];
+        const auto value = static_cast<const std::uint8_t*>(
+            accessor->getUntypedValueAtAbsolutePosition(attr_id, tid));
+        if (bloom_filter->contains(value, size)) {
+          batch[out_size] = tid;
+          ++out_size;
+        }
+      }
+    }
+    if (adapt_filters) {
+      entry.cnt += in_size;
+      entry.miss += (in_size - out_size);
+    }
+    return out_size;
+  }
+
+  inline void adaptEntryOrder() {
+    for (auto &entry : bloom_filter_entries_) {
+      entry->miss_rate = static_cast<float>(entry->miss) / entry->cnt;
+    }
+    std::sort(bloom_filter_entries_.begin(),
+              bloom_filter_entries_.end(),
+              BloomFilterEntry::isBetterThan);
+  }
+
+  std::vector<BloomFilterEntry *> bloom_filter_entries_;
 
   DISALLOW_COPY_AND_ASSIGN(BloomFilterAdapter);
 };