You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by na...@apache.org on 2016/07/13 04:49:35 UTC

[1/5] incubator-quickstep git commit: Add BloomFilterBlocked and test

Repository: incubator-quickstep
Updated Branches:
  refs/heads/expt_bloom_filter_hash_fn e7c279205 -> 0161a4dee


Add BloomFilterBlocked and test


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/dd7d7b0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/dd7d7b0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/dd7d7b0e

Branch: refs/heads/expt_bloom_filter_hash_fn
Commit: dd7d7b0e119146a3ff236862a6065d5fa346bce3
Parents: e7c2792
Author: Navneet Potti <na...@gmail.com>
Authored: Tue Jul 12 12:40:40 2016 -0500
Committer: Navneet Potti <na...@gmail.com>
Committed: Tue Jul 12 12:40:40 2016 -0500

----------------------------------------------------------------------
 utility/BloomFilter.hpp                | 341 ++++++++++++++++++++++++++--
 utility/tests/BloomFilter_unittest.cpp | 139 +++++++-----
 2 files changed, 408 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dd7d7b0e/utility/BloomFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/BloomFilter.hpp b/utility/BloomFilter.hpp
index ec6b2b7..2acd2a9 100644
--- a/utility/BloomFilter.hpp
+++ b/utility/BloomFilter.hpp
@@ -44,11 +44,310 @@ namespace quickstep {
  *  @{
  */
 
+class BloomFilterOriginal;
+class BloomFilterBlocked;
+typedef BloomFilterBlocked BloomFilter;
+
+#define PREV_CACHE_LINE_SIZE_MULTIPLE(n)  ((n)/kCacheLineBytes) * kCacheLineBytes
+
+/**
+ * @brief A "blocked" version of Bloom Filter based on this paper:
+ *        Putze, Felix, Peter Sanders, and Johannes Singler.
+ *        "Cache-, hash-and space-efficient bloom filters."
+ *        International Workshop on Experimental and Efficient Algorithms.
+ *        Springer Berlin Heidelberg, 2007.
+ **/
+class BloomFilterBlocked {
+ public:
+  static const std::uint8_t kNumBitsPerByte = 8;
+  static const std::uint8_t kMaxNumHashFns = 8;
+
+  // This union allows us to read/write position in convenient fashion,
+  // through nested structs and their bitfield members
+  //
+  // A position can simply be a 32-bit hash
+  // Or it can be a cache line (block of 512 bits) and position within it
+  // Or it can be a byte (block of 8 bits) and position within it
+  union Position {
+    std::uint32_t hash;
+    struct CacheLinePosition {
+      unsigned index_in_line : 9;
+      unsigned line_num : 23;
+    } cache_line_pos;
+    struct BytePosition {
+      unsigned index_in_byte : 3;
+      unsigned byte_num : 29;
+    } byte_pos;
+  };
+
+  /**
+   * @brief Constructor.
+   * @note When no bit_array is being passed to the constructor,
+   *       then the bit_array is owned and managed by this class.
+   *
+   * @param hash_fn_count The number of hash functions used by this bloom filter.
+   * @param bit_array_size_in_bytes Size of the bit array.
+   **/
+  BloomFilterBlocked(const std::uint8_t hash_fn_count,
+              const std::uint64_t bit_array_size_in_bytes)
+      : hash_fn_count_(hash_fn_count),
+        array_size_in_bytes_(PREV_CACHE_LINE_SIZE_MULTIPLE(bit_array_size_in_bytes)),
+        is_bit_array_owner_(true),
+        bit_array_(new std::uint8_t[array_size_in_bytes_]) {
+    reset();
+  }
+
+  /**
+   * @brief Constructor.
+   * @note When a bit_array is passed as an argument to the constructor,
+   *       then the ownership of the bit array lies with the caller.
+   *
+   * @param hash_fn_count The number of hash functions used by this bloom filter.
+   * @param bit_array_size_in_bytes Size of the bit array.
+   * @param bit_array A pointer to the memory region that is used to store bit array.
+   * @param is_initialized A boolean that indicates whether to zero-out the region
+   *                       before use or not.
+   **/
+  BloomFilterBlocked(const std::uint8_t hash_fn_count,
+              const std::uint64_t bit_array_size_in_bytes,
+              std::uint8_t *bit_array,
+              const bool is_initialized)
+      : hash_fn_count_(hash_fn_count),
+        array_size_in_bytes_(PREV_CACHE_LINE_SIZE_MULTIPLE(bit_array_size_in_bytes)),
+        is_bit_array_owner_(false),
+        bit_array_(bit_array) {  // Owned by the calling method.
+    if (!is_initialized) {
+      reset();
+    }
+  }
+
+  /**
+   * @brief Constructor.
+   * @note When a bloom filter proto is passed as an initializer,
+   *       then the bit_array is owned and managed by this class.
+   *
+   * @param bloom_filter_proto The protobuf representation of a
+   *        bloom filter configuration.
+   **/
+  explicit BloomFilterBlocked(const serialization::BloomFilter &bloom_filter_proto)
+      : hash_fn_count_(bloom_filter_proto.number_of_hashes()),
+        array_size_in_bytes_(bloom_filter_proto.bloom_filter_size()),
+        is_bit_array_owner_(true),
+        bit_array_(new std::uint8_t[array_size_in_bytes_]) {
+    reset();
+  }
+
+  /**
+   * @brief Destructor.
+   **/
+  ~BloomFilterBlocked() {
+    if (is_bit_array_owner_) {
+      bit_array_.reset();
+    } else {
+      bit_array_.release();
+    }
+  }
+
+  static bool ProtoIsValid(const serialization::BloomFilter &bloom_filter_proto) {
+    return bloom_filter_proto.IsInitialized();
+  }
+
+  /**
+   * @brief Zeros out the contents of the bit array.
+   **/
+  inline void reset() {
+    // Initialize the bit_array with all zeros.
+    std::fill_n(bit_array_.get(), array_size_in_bytes_, 0x00);
+    inserted_element_count_ = 0;
+  }
+
+  /**
+   * @brief Get the number of hash functions used in this bloom filter.
+   *
+   * @return Returns the number of hash functions.
+   **/
+  inline std::uint8_t getNumberOfHashes() const {
+    return hash_fn_count_;
+  }
+
+  /**
+   * @brief Get the size of the bit array in bytes for this bloom filter.
+   *
+   * @return Returns the bit array size (in bytes).
+   **/
+  inline std::uint64_t getBitArraySize() const {
+    return array_size_in_bytes_;
+  }
+
+  /**
+   * @brief Get the constant pointer to the bit array for this bloom filter
+   *
+   * @return Returns constant pointer to the bit array.
+   **/
+  inline const std::uint8_t* getBitArray() const {
+    return bit_array_.get();
+  }
+
+  template <typename T>
+  void insert(const T value) {
+    insert(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
+  }
+
+  /**
+   * @brief Inserts a given value into the bloom filter in a thread-safe manner.
+   *
+   * @param key_begin A pointer to the value being inserted.
+   * @param length Size of the value being inserted in bytes.
+   */
+  inline void insert(const std::uint8_t *key_begin, const std::size_t length) {
+    Position positions[kMaxNumHashFns];
+    getPositions(key_begin, length, positions);
+    {
+      SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
+      for (std::uint8_t i = 0; i < hash_fn_count_; ++i)
+        setBitAtPosition(positions[i]);
+    }
+    ++inserted_element_count_;
+  }
+
+  template <typename T>
+  void insertUnSafe(const T value) {
+    insertUnSafe(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
+  }
+
+  /**
+   * @brief Inserts a given value into the bloom filter.
+   * @Warning This is a faster thread-unsafe version of the insert() function.
+   *          The caller needs to ensure the thread safety.
+   *
+   * @param key_begin A pointer to the value being inserted.
+   * @param length Size of the value being inserted in bytes.
+   */
+  inline void insertUnSafe(const std::uint8_t *key_begin, const std::size_t length) {
+    Position positions[kMaxNumHashFns];
+    getPositions(key_begin, length, positions);
+    for (std::uint8_t i = 0; i < hash_fn_count_; ++i)
+      setBitAtPosition(positions[i]);
+    ++inserted_element_count_;
+  }
+
+  template <typename T>
+  bool contains(const T value) {
+    return contains(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
+  }
+
+  /**
+   * @brief Test membership of a given value in the bloom filter.
+   *        If true is returned, then a value may or may not be present in the bloom filter.
+   *        If false is returned, a value is certainly not present in the bloom filter.
+   *
+   * @note The membersip test does not require any locks, because the assumption is that
+   *       the bloom filter will only be used after it has been built.
+   *
+   * @param key_begin A pointer to the value being tested for membership.
+   * @param length Size of the value being inserted in bytes.
+   */
+  inline bool contains(const std::uint8_t *key_begin, const std::size_t length) const {
+    Position positions[kMaxNumHashFns];
+    getPositions(key_begin, length, positions);
+    for (std::uint8_t i = 0; i < hash_fn_count_; ++i)
+      if (!getBitAtPosition(positions[i]))
+        return false;
+    return true;
+  }
+
+  /**
+   * @brief Perform a bitwise-OR of the given Bloom filter with this bloom filter.
+   *        Essentially, it does a union of this bloom filter with the passed bloom filter.
+   *
+   * @param bloom_filter A const pointer to the bloom filter object to do bitwise-OR with.
+   */
+  inline void bitwiseOr(const BloomFilterBlocked *bloom_filter) {
+    SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
+    for (std::size_t byte_index = 0; byte_index < bloom_filter->getBitArraySize(); ++byte_index) {
+      (bit_array_.get())[byte_index] |= bloom_filter->getBitArray()[byte_index];
+    }
+  }
+
+  /**
+   * @brief Return the number of elements currently inserted into bloom filter.
+   *
+   * @return The number of elements inserted into bloom filter.
+   **/
+  inline std::uint32_t element_count() const {
+    return inserted_element_count_;
+  }
+
+ protected:
+  void getPositions(
+      const std::uint8_t *begin,
+      std::size_t length,
+      Position positions[]) const {
+    positions[0].hash = hash_multiplicative(begin, length, hash_fn_[0]);
+    for (std::size_t i = 1; i < hash_fn_count_; ++i) {
+      positions[i].hash = hash_multiplicative(begin, length, hash_fn_[i]);
+      positions[i].cache_line_pos.line_num = positions[0].cache_line_pos.line_num;
+    }
+  }
+
+  void setBitAtPosition(const Position &pos) {
+    (bit_array_.get())[pos.byte_pos.byte_num] |= (1 << pos.byte_pos.index_in_byte);
+  }
+
+  bool getBitAtPosition(const Position &pos) const {
+    return (bit_array_.get())[pos.byte_pos.byte_num] & (1 << pos.byte_pos.index_in_byte);
+  }
+
+  inline std::uint32_t hash_multiplicative(
+      const std::uint8_t *begin,
+      std::size_t length,
+      const std::uint64_t multiplier) const {
+    std::uint32_t hash = 0;
+    std::size_t bytes_hashed = 0;
+    if (length >= 4) {
+      while (bytes_hashed < length) {
+        auto val = *reinterpret_cast<const std::uint32_t *>(begin + bytes_hashed);
+        hash += (multiplier * val) >> 24;
+        bytes_hashed += 4;
+      }
+    }
+    while (bytes_hashed < length) {
+      std::uint8_t val = *(begin + bytes_hashed);
+      hash += (multiplier * val) >> 32;
+      bytes_hashed++;
+    }
+    return hash % (array_size_in_bytes_ * kNumBitsPerByte);
+  }
+
+ private:
+  const std::uint32_t hash_fn_count_;
+  const std::uint64_t array_size_in_bytes_;
+  std::uint32_t inserted_element_count_;
+  const bool is_bit_array_owner_;
+
+  static constexpr std::uint64_t kKnuthGoldenRatioNumber = 2654435761;
+  const std::uint64_t hash_fn_[kMaxNumHashFns] = { // hash_fn_[i] is 2**(i+1) - 1
+    0x00000001 * kKnuthGoldenRatioNumber, // 0x00000003, 0x00000007, 0x0000000f,
+    0x0000001f * kKnuthGoldenRatioNumber, // 0x0000003f, 0x0000007f, 0x000000ff,
+    0x000001ff * kKnuthGoldenRatioNumber, // 0x000003ff, 0x000007ff, 0x00000fff,
+    0x00001fff * kKnuthGoldenRatioNumber, // 0x00003fff, 0x00007fff, 0x0000ffff,
+    0x0001ffff * kKnuthGoldenRatioNumber, // 0x0003ffff, 0x0007ffff, 0x000fffff,
+    0x001fffff * kKnuthGoldenRatioNumber, // 0x003fffff, 0x007fffff, 0x00ffffff,
+    0x01ffffff * kKnuthGoldenRatioNumber, // 0x03ffffff, 0x07ffffff, 0x0fffffff,
+    0x1fffffff * kKnuthGoldenRatioNumber  // 0x3fffffff, 0x7fffffff, 0xffffffff
+    };
+
+  alignas(kCacheLineBytes) std::unique_ptr<std::uint8_t> bit_array_;
+  alignas(kCacheLineBytes) mutable SpinSharedMutex<false> bloom_filter_insert_mutex_;
+
+  DISALLOW_COPY_AND_ASSIGN(BloomFilterBlocked);
+};
+
 /**
  * @brief A simple Bloom Filter implementation with basic primitives
  *        based on Partow's Bloom Filter implementation.
  **/
-class BloomFilter {
+class BloomFilterOriginal {
  public:
   static const uint32_t kNumBitsPerByte = 8;
 
@@ -60,7 +359,7 @@ class BloomFilter {
    * @param hash_fn_count The number of hash functions used by this bloom filter.
    * @param bit_array_size_in_bytes Size of the bit array.
    **/
-  BloomFilter(const std::size_t hash_fn_count,
+  BloomFilterOriginal(const std::size_t hash_fn_count,
               const std::uint64_t bit_array_size_in_bytes)
       : hash_fn_count_(hash_fn_count),
         array_size_in_bytes_(bit_array_size_in_bytes),
@@ -81,7 +380,7 @@ class BloomFilter {
    * @param is_initialized A boolean that indicates whether to zero-out the region
    *                       before use or not.
    **/
-  BloomFilter(const std::size_t hash_fn_count,
+  BloomFilterOriginal(const std::size_t hash_fn_count,
               const std::uint64_t bit_array_size_in_bytes,
               std::uint8_t *bit_array,
               const bool is_initialized)
@@ -103,7 +402,7 @@ class BloomFilter {
    * @param bloom_filter_proto The protobuf representation of a
    *        bloom filter configuration.
    **/
-  explicit BloomFilter(const serialization::BloomFilter &bloom_filter_proto)
+  explicit BloomFilterOriginal(const serialization::BloomFilter &bloom_filter_proto)
       : hash_fn_count_(bloom_filter_proto.number_of_hashes()),
         array_size_in_bytes_(bloom_filter_proto.bloom_filter_size()),
         array_size_(array_size_in_bytes_ * kNumBitsPerByte),
@@ -115,7 +414,7 @@ class BloomFilter {
   /**
    * @brief Destructor.
    **/
-  ~BloomFilter() {
+  ~BloomFilterOriginal() {
     if (is_bit_array_owner_) {
       bit_array_.reset();
     } else {
@@ -260,7 +559,7 @@ class BloomFilter {
    *
    * @param bloom_filter A const pointer to the bloom filter object to do bitwise-OR with.
    */
-  inline void bitwiseOr(const BloomFilter *bloom_filter) {
+  inline void bitwiseOr(const BloomFilterOriginal *bloom_filter) {
     SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
     for (std::size_t byte_index = 0; byte_index < bloom_filter->getBitArraySize(); ++byte_index) {
       (bit_array_.get())[byte_index] |= bloom_filter->getBitArray()[byte_index];
@@ -285,19 +584,19 @@ class BloomFilter {
   inline std::uint32_t hash_multiplicative(
       const std::uint8_t *begin,
       std::size_t remaining_length,
-      const std::size_t multiplier) const {
+      const std::uint64_t multiplier) const {
     std::uint32_t hash = 0;
     std::size_t bytes_hashed = 0;
     if (remaining_length >= 4) {
       while (bytes_hashed < remaining_length) {
         auto val = *reinterpret_cast<const std::uint32_t *>(begin + bytes_hashed);
-        hash = multiplier * hash + val;
+        hash += (multiplier * val) >> 32;
         bytes_hashed += 4;
       }
     }
     while (bytes_hashed < remaining_length) {
       std::uint8_t val = *(begin + bytes_hashed);
-      hash = multiplier * hash + val;
+      hash += (multiplier * val) >> 32;
       bytes_hashed++;
     }
     return hash;
@@ -310,20 +609,22 @@ class BloomFilter {
   std::unique_ptr<std::uint8_t> bit_array_;
   std::uint32_t inserted_element_count_;
   const bool is_bit_array_owner_;
-  static constexpr std::size_t kMaxNumHashFns = 32;
-  const std::uint32_t hash_fn_[kMaxNumHashFns] = { // hash_fn_[i] is 2**(i+1) - 1
-      0x00000001, 0x00000003, 0x00000007, 0x0000000f,
-      0x0000001f, 0x0000003f, 0x0000007f, 0x000000ff,
-      0x000001ff, 0x000003ff, 0x000007ff, 0x00000fff,
-      0x00001fff, 0x00003fff, 0x00007fff, 0x0000ffff,
-      0x0001ffff, 0x0003ffff, 0x0007ffff, 0x000fffff,
-      0x001fffff, 0x003fffff, 0x007fffff, 0x00ffffff,
-      0x01ffffff, 0x03ffffff, 0x07ffffff, 0x0fffffff,
-      0x1fffffff, 0x3fffffff, 0x7fffffff, 0xffffffff
+
+  static constexpr std::uint64_t kKnuthGoldenRatioNumber = 2654435761;
+  static constexpr std::size_t kMaxNumHashFns = 8;
+  const std::uint64_t hash_fn_[kMaxNumHashFns] = { // hash_fn_[i] is 2**(i+1) - 1
+    0x00000001 * kKnuthGoldenRatioNumber, // 0x00000003, 0x00000007, 0x0000000f,
+    0x0000001f * kKnuthGoldenRatioNumber, // 0x0000003f, 0x0000007f, 0x000000ff,
+    0x000001ff * kKnuthGoldenRatioNumber, // 0x000003ff, 0x000007ff, 0x00000fff,
+    0x00001fff * kKnuthGoldenRatioNumber, // 0x00003fff, 0x00007fff, 0x0000ffff,
+    0x0001ffff * kKnuthGoldenRatioNumber, // 0x0003ffff, 0x0007ffff, 0x000fffff,
+    0x001fffff * kKnuthGoldenRatioNumber, // 0x003fffff, 0x007fffff, 0x00ffffff,
+    0x01ffffff * kKnuthGoldenRatioNumber, // 0x03ffffff, 0x07ffffff, 0x0fffffff,
+    0x1fffffff * kKnuthGoldenRatioNumber  // 0x3fffffff, 0x7fffffff, 0xffffffff
     };
   alignas(kCacheLineBytes) mutable SpinSharedMutex<false> bloom_filter_insert_mutex_;
 
-  DISALLOW_COPY_AND_ASSIGN(BloomFilter);
+  DISALLOW_COPY_AND_ASSIGN(BloomFilterOriginal);
 };
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dd7d7b0e/utility/tests/BloomFilter_unittest.cpp
----------------------------------------------------------------------
diff --git a/utility/tests/BloomFilter_unittest.cpp b/utility/tests/BloomFilter_unittest.cpp
index 03ac9f1..3bb2218 100644
--- a/utility/tests/BloomFilter_unittest.cpp
+++ b/utility/tests/BloomFilter_unittest.cpp
@@ -26,62 +26,97 @@
 
 namespace quickstep {
 
-class BloomFilterTest : public ::testing::Test {
+class BloomFilterBlockedTest : public ::testing::Test {
  public:
-  static const std::uint32_t kNumberOfHashFunctions = 20;
-  static const std::uint32_t kBloomFilterSize = 100000;  // in bytes.
+  const std::uint8_t kNumberOfHashFunctions = 3;
+  const std::uint32_t kNumElements = 1000;
+  const std::uint64_t kBloomFilterSize = kNumElements;  // 8 bits per element
 };
 
-TEST_F(BloomFilterTest, BloomFilterInsertTest) {
-  // This test inserts an element into a known bloom filter size
-  // with given number of hash functions,
-  // and tests whether the expected bits are set or not.
-  std::unique_ptr<std::uint8_t> bit_array;
-  std::unique_ptr<BloomFilter> bloom_filter;
-
-  const std::uint32_t num_hashes = 2;
-  const std::uint32_t bloom_filter_size = 1;
-
-  bit_array.reset(new std::uint8_t[bloom_filter_size]);
-  bloom_filter.reset(new BloomFilter(num_hashes,
-                                     bloom_filter_size,
-                                     bit_array.get(),
-                                     false));
-
-  const std::uint8_t data = 7u;
-  const std::uint8_t expected_bit_array = 136u;
-  bloom_filter->insert(&data, 1);
-  EXPECT_EQ(*bit_array, expected_bit_array);
+TEST_F(BloomFilterBlockedTest, BloomFilterBlockedContainsTest) {
+  // Set up data to be inserted into Bloom filter
+  std::vector<std::uint32_t> data;
+  data.reserve(kNumElements);
+  for (std::uint32_t i = 0; i < kNumElements; ++i)
+    data.push_back(i);
+
+  // Create a Bloom filter and insert elements into it
+  BloomFilterBlocked bloom_filter(kNumberOfHashFunctions, kBloomFilterSize);
+  for (auto &i : data)
+    bloom_filter.insert(i);
+
+  // Make sure that all elements were inserted correctly
+  EXPECT_EQ(bloom_filter.element_count(), kNumElements);
+  for (auto &i : data)
+    EXPECT_TRUE(bloom_filter.contains(i));
+
+  // Make sure that the false positive rate is not too high
+  // False positive rate should be < 10% for 8 bits per element
+  std::vector<std::uint32_t> false_positives;
+  false_positives.reserve(kNumElements);
+  for (std::uint32_t i = kNumElements; i < 2 * kNumElements; ++i)
+    if (bloom_filter.contains(i))
+      false_positives.push_back(i);
+  EXPECT_LT(false_positives.size(), kNumElements * 0.1);
 }
 
-TEST_F(BloomFilterTest, BloomFilterContainsTest) {
-  std::unique_ptr<std::uint8_t> bit_array;
-  std::unique_ptr<BloomFilter> bloom_filter;
-
-  bit_array.reset(new std::uint8_t[kBloomFilterSize]);
-  bloom_filter.reset(new BloomFilter(kNumberOfHashFunctions,
-                                     kBloomFilterSize,
-                                     bit_array.get(),
-                                     false));
-
-  // Insert a set of values in the bloom filter.
-  const std::uint32_t data[] = { 4, 60, 100, 9999 };
-  const std::vector<std::uint32_t> data_vector(data, data + sizeof(data) / sizeof(data[0]));
-  for (const std::uint32_t &value : data_vector) {
-    bloom_filter->insert(reinterpret_cast<const uint8_t*>(&value), sizeof(std::uint32_t));
-  }
-
-  // Test the values, which were inserted, are present in the bloom filter.
-  for (const std::uint32_t &value : data_vector) {
-    EXPECT_TRUE(bloom_filter->contains(reinterpret_cast<const uint8_t*>(&value), sizeof(std::uint32_t)));
-  }
-
-  // Test the values, which were not inserted in the bloom filter, are not present.
-  const std::uint32_t missing[] = { 5, 99, 999, 11111 };
-  const std::vector<std::uint32_t> missing_vector(missing, missing + sizeof(missing) / sizeof(missing[0]));
-  for (const std::uint32_t &value : missing_vector) {
-    EXPECT_FALSE(bloom_filter->contains(reinterpret_cast<const uint8_t*>(&value), sizeof(std::uint32_t)));
-  }
-}
+
+// class BloomFilterTest : public ::testing::Test {
+//  public:
+//   static const std::uint32_t kNumberOfHashFunctions = 20;
+//   static const std::uint32_t kBloomFilterSize = 100000;  // in bytes.
+// };
+
+// TEST_F(BloomFilterTest, BloomFilterInsertTest) {
+//   // This test inserts an element into a known bloom filter size
+//   // with given number of hash functions,
+//   // and tests whether the expected bits are set or not.
+//   std::unique_ptr<std::uint8_t> bit_array;
+//   std::unique_ptr<BloomFilter> bloom_filter;
+
+//   const std::uint32_t num_hashes = 2;
+//   const std::uint32_t bloom_filter_size = 1;
+
+//   bit_array.reset(new std::uint8_t[bloom_filter_size]);
+//   bloom_filter.reset(new BloomFilter(num_hashes,
+//                                      bloom_filter_size,
+//                                      bit_array.get(),
+//                                      false));
+
+//   const std::uint8_t data = 7u;
+//   const std::uint8_t expected_bit_array = 136u;
+//   bloom_filter->insert(&data, 1);
+//   EXPECT_EQ(*bit_array, expected_bit_array);
+// }
+
+// TEST_F(BloomFilterTest, BloomFilterContainsTest) {
+//   std::unique_ptr<std::uint8_t> bit_array;
+//   std::unique_ptr<BloomFilter> bloom_filter;
+
+//   bit_array.reset(new std::uint8_t[kBloomFilterSize]);
+//   bloom_filter.reset(new BloomFilter(kNumberOfHashFunctions,
+//                                      kBloomFilterSize,
+//                                      bit_array.get(),
+//                                      false));
+
+//   // Insert a set of values in the bloom filter.
+//   const std::uint32_t data[] = { 4, 60, 100, 9999 };
+//   const std::vector<std::uint32_t> data_vector(data, data + sizeof(data) / sizeof(data[0]));
+//   for (const std::uint32_t &value : data_vector) {
+//     bloom_filter->insert(reinterpret_cast<const uint8_t*>(&value), sizeof(std::uint32_t));
+//   }
+
+//   // Test the values, which were inserted, are present in the bloom filter.
+//   for (const std::uint32_t &value : data_vector) {
+//     EXPECT_TRUE(bloom_filter->contains(reinterpret_cast<const uint8_t*>(&value), sizeof(std::uint32_t)));
+//   }
+
+//   // Test the values, which were not inserted in the bloom filter, are not present.
+//   const std::uint32_t missing[] = { 5, 99, 999, 11111 };
+//   const std::vector<std::uint32_t> missing_vector(missing, missing + sizeof(missing) / sizeof(missing[0]));
+//   for (const std::uint32_t &value : missing_vector) {
+//     EXPECT_FALSE(bloom_filter->contains(reinterpret_cast<const uint8_t*>(&value), sizeof(std::uint32_t)));
+//   }
+// }
 
 }  // namespace quickstep


[2/5] incubator-quickstep git commit: Try to speed up Blocked Bloom Filter

Posted by na...@apache.org.
Try to speed up Blocked Bloom Filter


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/36d0f4c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/36d0f4c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/36d0f4c9

Branch: refs/heads/expt_bloom_filter_hash_fn
Commit: 36d0f4c9ec17cb5eaf1361f5a7975626ab0c9901
Parents: dd7d7b0
Author: Navneet Potti <na...@gmail.com>
Authored: Tue Jul 12 19:28:15 2016 -0500
Committer: Navneet Potti <na...@gmail.com>
Committed: Tue Jul 12 19:28:15 2016 -0500

----------------------------------------------------------------------
 query_optimizer/ExecutionHeuristics.cpp |   2 +-
 utility/BloomFilter.hpp                 | 106 +++++++++++++++++++--------
 2 files changed, 75 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/36d0f4c9/query_optimizer/ExecutionHeuristics.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.cpp b/query_optimizer/ExecutionHeuristics.cpp
index 38b7c50..6b30d83 100644
--- a/query_optimizer/ExecutionHeuristics.cpp
+++ b/query_optimizer/ExecutionHeuristics.cpp
@@ -116,7 +116,7 @@ void ExecutionHeuristics::setBloomFilterProperties(serialization::BloomFilter *b
                                                    const std::size_t hash_join_index) {
   auto cardinality = hash_joins_[hash_join_index].estimated_build_relation_cardinality_;
   bloom_filter_proto->set_bloom_filter_size(
-      (FLAGS_bloom_num_bits_per_tuple * cardinality)/kNumBitsPerByte);
+      BloomFilter::getNearestAllowedSize((FLAGS_bloom_num_bits_per_tuple * cardinality)/kNumBitsPerByte));
   bloom_filter_proto->set_number_of_hashes(FLAGS_bloom_num_hash_fns);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/36d0f4c9/utility/BloomFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/BloomFilter.hpp b/utility/BloomFilter.hpp
index 2acd2a9..49d00e7 100644
--- a/utility/BloomFilter.hpp
+++ b/utility/BloomFilter.hpp
@@ -26,6 +26,7 @@
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <cstring>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -60,7 +61,7 @@ typedef BloomFilterBlocked BloomFilter;
 class BloomFilterBlocked {
  public:
   static const std::uint8_t kNumBitsPerByte = 8;
-  static const std::uint8_t kMaxNumHashFns = 8;
+  static const std::uint8_t kMaxNumHashFns = 4;
 
   // This union allows us to read/write position in convenient fashion,
   // through nested structs and their bitfield members
@@ -80,6 +81,11 @@ class BloomFilterBlocked {
     } byte_pos;
   };
 
+  static std::uint64_t getNearestAllowedSize(const std::uint64_t approx_size) {
+    return (approx_size / kCacheLineBytes) * kCacheLineBytes;
+  }
+
+
   /**
    * @brief Constructor.
    * @note When no bit_array is being passed to the constructor,
@@ -189,7 +195,7 @@ class BloomFilterBlocked {
   }
 
   template <typename T>
-  void insert(const T value) {
+  void insert(const T &value) {
     insert(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
   }
 
@@ -200,18 +206,12 @@ class BloomFilterBlocked {
    * @param length Size of the value being inserted in bytes.
    */
   inline void insert(const std::uint8_t *key_begin, const std::size_t length) {
-    Position positions[kMaxNumHashFns];
-    getPositions(key_begin, length, positions);
-    {
       SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
-      for (std::uint8_t i = 0; i < hash_fn_count_; ++i)
-        setBitAtPosition(positions[i]);
-    }
-    ++inserted_element_count_;
+      insertUnSafe(key_begin, length);
   }
 
   template <typename T>
-  void insertUnSafe(const T value) {
+  void insertUnSafe(const T &value) {
     insertUnSafe(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
   }
 
@@ -224,15 +224,19 @@ class BloomFilterBlocked {
    * @param length Size of the value being inserted in bytes.
    */
   inline void insertUnSafe(const std::uint8_t *key_begin, const std::size_t length) {
-    Position positions[kMaxNumHashFns];
-    getPositions(key_begin, length, positions);
-    for (std::uint8_t i = 0; i < hash_fn_count_; ++i)
-      setBitAtPosition(positions[i]);
+    Position first_pos = getFirstPosition(key_begin, length);
+    if (!getBitAtPosition(first_pos))
+      setBitAtPosition(first_pos);
+    Position other_pos;
+    for (std::uint8_t i = 1; i <hash_fn_count_; ++i) {
+      other_pos = getOtherPosition(key_begin, length, first_pos, i);
+      setBitAtPosition(other_pos);
+    }
     ++inserted_element_count_;
   }
 
   template <typename T>
-  bool contains(const T value) {
+  bool contains(const T &value) {
     return contains(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
   }
 
@@ -247,12 +251,20 @@ class BloomFilterBlocked {
    * @param key_begin A pointer to the value being tested for membership.
    * @param length Size of the value being inserted in bytes.
    */
-  inline bool contains(const std::uint8_t *key_begin, const std::size_t length) const {
-    Position positions[kMaxNumHashFns];
-    getPositions(key_begin, length, positions);
-    for (std::uint8_t i = 0; i < hash_fn_count_; ++i)
-      if (!getBitAtPosition(positions[i]))
+  inline bool contains(
+      const std::uint8_t *__restrict__ key_begin,
+      const std::size_t length) const {
+    Position first_pos = getFirstPosition(key_begin, length);
+    if (!getBitAtPosition(first_pos)) {
+      return false;
+    }
+    Position other_pos;
+    for (std::uint8_t i = 1; i < hash_fn_count_; ++i) {
+      other_pos = getOtherPosition(key_begin, length, first_pos, i);
+      if (!getBitAtPosition(other_pos)) {
         return false;
+      }
+    }
     return true;
   }
 
@@ -279,14 +291,33 @@ class BloomFilterBlocked {
   }
 
  protected:
-  void getPositions(
+  Position getFirstPosition(const std::uint8_t *begin, std::size_t length) const {
+    Position pos;
+    pos.hash = hash_identity(begin, length);
+    return pos;
+  }
+
+  Position getOtherPosition(
+      const std::uint8_t *begin,
+      std::size_t length,
+      const Position first_pos,
+      const std::uint8_t index) const {
+    Position pos;
+    pos.hash = hash_multiplicative(begin, length, hash_fn_[index-1]);
+    pos.cache_line_pos.line_num = first_pos.cache_line_pos.line_num;
+    return pos;
+  }
+
+  void fillPosition(
       const std::uint8_t *begin,
       std::size_t length,
+      const std::uint8_t index,
       Position positions[]) const {
-    positions[0].hash = hash_multiplicative(begin, length, hash_fn_[0]);
-    for (std::size_t i = 1; i < hash_fn_count_; ++i) {
-      positions[i].hash = hash_multiplicative(begin, length, hash_fn_[i]);
-      positions[i].cache_line_pos.line_num = positions[0].cache_line_pos.line_num;
+    if (index == 0)
+      positions[0].hash = hash_identity(begin, length);
+    else {
+      positions[index].hash = hash_multiplicative(begin, length, hash_fn_[index-1]);
+      positions[index].cache_line_pos.line_num = positions[0].cache_line_pos.line_num;
     }
   }
 
@@ -298,8 +329,19 @@ class BloomFilterBlocked {
     return (bit_array_.get())[pos.byte_pos.byte_num] & (1 << pos.byte_pos.index_in_byte);
   }
 
+  inline std::uint32_t hash_identity(
+      const std::uint8_t *__restrict__ begin,
+      std::size_t length) const {
+    std::uint32_t hash;
+    if (length >= 4)
+      hash = *reinterpret_cast<const std::uint32_t*> (begin);
+    else
+      std::memcpy(&hash, begin, length);
+    return hash % (array_size_in_bytes_ * kNumBitsPerByte);
+  }
+
   inline std::uint32_t hash_multiplicative(
-      const std::uint8_t *begin,
+      const std::uint8_t *__restrict__ begin,
       std::size_t length,
       const std::uint64_t multiplier) const {
     std::uint32_t hash = 0;
@@ -313,10 +355,10 @@ class BloomFilterBlocked {
     }
     while (bytes_hashed < length) {
       std::uint8_t val = *(begin + bytes_hashed);
-      hash += (multiplier * val) >> 32;
+      hash += (multiplier * val) >> 24;
       bytes_hashed++;
     }
-    return hash % (array_size_in_bytes_ * kNumBitsPerByte);
+    return hash;//  % (array_size_in_bytes_ * kNumBitsPerByte);
   }
 
  private:
@@ -328,13 +370,13 @@ class BloomFilterBlocked {
   static constexpr std::uint64_t kKnuthGoldenRatioNumber = 2654435761;
   const std::uint64_t hash_fn_[kMaxNumHashFns] = { // hash_fn_[i] is 2**(i+1) - 1
     0x00000001 * kKnuthGoldenRatioNumber, // 0x00000003, 0x00000007, 0x0000000f,
-    0x0000001f * kKnuthGoldenRatioNumber, // 0x0000003f, 0x0000007f, 0x000000ff,
+    // 0x0000001f * kKnuthGoldenRatioNumber, // 0x0000003f, 0x0000007f, 0x000000ff,
     0x000001ff * kKnuthGoldenRatioNumber, // 0x000003ff, 0x000007ff, 0x00000fff,
-    0x00001fff * kKnuthGoldenRatioNumber, // 0x00003fff, 0x00007fff, 0x0000ffff,
+    // 0x00001fff * kKnuthGoldenRatioNumber, // 0x00003fff, 0x00007fff, 0x0000ffff,
     0x0001ffff * kKnuthGoldenRatioNumber, // 0x0003ffff, 0x0007ffff, 0x000fffff,
-    0x001fffff * kKnuthGoldenRatioNumber, // 0x003fffff, 0x007fffff, 0x00ffffff,
+    // 0x001fffff * kKnuthGoldenRatioNumber, // 0x003fffff, 0x007fffff, 0x00ffffff,
     0x01ffffff * kKnuthGoldenRatioNumber, // 0x03ffffff, 0x07ffffff, 0x0fffffff,
-    0x1fffffff * kKnuthGoldenRatioNumber  // 0x3fffffff, 0x7fffffff, 0xffffffff
+    // 0x1fffffff * kKnuthGoldenRatioNumber  // 0x3fffffff, 0x7fffffff, 0xffffffff
     };
 
   alignas(kCacheLineBytes) std::unique_ptr<std::uint8_t> bit_array_;


[4/5] incubator-quickstep git commit: Allow collector to take an explicit probe tuple ID

Posted by na...@apache.org.
Allow collector to take an explicit probe tuple ID


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/fa815ff7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/fa815ff7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/fa815ff7

Branch: refs/heads/expt_bloom_filter_hash_fn
Commit: fa815ff7589bcc1d40f38a6fec8ad93ffe43e086
Parents: dd4bfd6
Author: Navneet Potti <na...@gmail.com>
Authored: Tue Jul 12 23:48:13 2016 -0500
Committer: Navneet Potti <na...@gmail.com>
Committed: Tue Jul 12 23:48:13 2016 -0500

----------------------------------------------------------------------
 relational_operators/HashJoinOperator.cpp | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fa815ff7/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 667df1e..6248a67 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -75,6 +75,11 @@ class MapBasedJoinedTupleCollector {
     joined_tuples_[tref.block].emplace_back(tref.tuple, accessor.getCurrentPosition());
   }
 
+  inline void operator()(const tuple_id probe_tid,
+                         const TupleReference &build_tref) {
+    joined_tuples_[build_tref.block].emplace_back(build_tref.tuple, probe_tid);
+  }
+
   // Get a mutable pointer to the collected map of joined tuple ID pairs. The
   // key is inner block_id, values are vectors of joined tuple ID pairs with
   // tuple ID from the inner block on the left and the outer block on the


[5/5] incubator-quickstep git commit: Probe Bloom filter adapter in batches

Posted by na...@apache.org.
Probe Bloom filter adapter in batches


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0161a4de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0161a4de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0161a4de

Branch: refs/heads/expt_bloom_filter_hash_fn
Commit: 0161a4dee295ea4efe8c8d9ebdf56759e2c56da2
Parents: fa815ff
Author: Navneet Potti <na...@gmail.com>
Authored: Tue Jul 12 23:49:05 2016 -0500
Committer: Navneet Potti <na...@gmail.com>
Committed: Tue Jul 12 23:49:05 2016 -0500

----------------------------------------------------------------------
 storage/HashTable.hpp          |  87 ++++++++++++++++-----------
 utility/BloomFilterAdapter.hpp | 117 ++++++++++++++++++++----------------
 2 files changed, 115 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0161a4de/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index a4b09fb..b666e0c 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -2254,15 +2254,16 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
       // Find (and cache) the size of each attribute in the probe lists.
       // NOTE(nav): This code uses the accessor to get the size,
       // and hence only works if there's at least one tuple.
-      // NOTE(nav): This code is meant to be called only for PKs, and so
-      // we assume non-null values.
       std::vector<std::vector<std::size_t>> attr_size_vectors;
       attr_size_vectors.reserve(probe_attribute_ids_.size());
       for (const auto &probe_attr_vector : probe_attribute_ids_) {
         std::vector<std::size_t> attr_sizes;
         attr_sizes.reserve(probe_attr_vector.size());
         for (const auto &probe_attr : probe_attr_vector) {
-          auto val_and_size = accessor->getUntypedValueAndByteLengthAtAbsolutePosition(0, probe_attr);
+          // Using the function below is the easiest way to get attribute size
+          // here. The template parameter check_null is set to false to ensure
+          // that we get the size even if the first attr value is null.
+          auto val_and_size = accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, probe_attr);
           attr_sizes.push_back(val_and_size.second);
         }
         attr_size_vectors.push_back(attr_sizes);
@@ -2270,44 +2271,58 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
 
       bloom_filter_adapter.reset(new BloomFilterAdapter(
               probe_bloom_filters_, probe_attribute_ids_, attr_size_vectors));
+
+      static const uint32_t kMaxBatchSize = 4000;
+      std::vector<tuple_id> batch;
+      batch.reserve(kMaxBatchSize);
+
+      do {
+        while (batch.size() < kMaxBatchSize && accessor->next())
+          batch.push_back(accessor->getCurrentPosition());
+
+        std::size_t num_hits = bloom_filter_adapter->bulkProbe(accessor, batch);
+
+        for (std::size_t t = 0; t < num_hits; ++t){
+          tuple_id probe_tid = batch[t];
+          TypedValue key = accessor->getTypedValueAtAbsolutePosition(key_attr_id, probe_tid);
+          if (check_for_null_keys && key.isNull()) {
+            continue;
+          }
+          const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
+                                                                        : key.getHash();
+          const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
+                                                                  : true_hash;
+          std::size_t entry_num = 0;
+          const ValueT *value;
+          while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
+            (*functor)(probe_tid, *value);
+            if (!allow_duplicate_keys)
+              break;
+          }
+        }
+        batch.clear();
+      } while (!accessor->iterationFinished());
     }
-    // std::size_t numFalsePositives = 0;
-    // std::size_t numMisses = 0;
-    // std::size_t numHits = 0;
-    while (accessor->next()) {
-      if (has_probe_side_bloom_filter_ && bloom_filter_adapter->miss(accessor)) {
-        // numMisses++;
-        continue;  // On a bloom filter miss, probing the hash table can be skipped.
-      }
 
-      TypedValue key = accessor->getTypedValue(key_attr_id);
-      if (check_for_null_keys && key.isNull()) {
-        continue;
-      }
-      const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
-                                                                     : key.getHash();
-      const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
-                                                               : true_hash;
-      std::size_t entry_num = 0;
-      const ValueT *value;
-      bool wasHit = false;
-      while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
-        wasHit = true;
-        (*functor)(*accessor, *value);
-        if (!allow_duplicate_keys) {
-          break;
+    else { // no Bloom filters to probe
+      while(accessor->next()) {
+        TypedValue key = accessor->getTypedValue(key_attr_id);
+        if (check_for_null_keys && key.isNull()) {
+          continue;
+        }
+        const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
+                                      : key.getHash();
+        const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
+                                          : true_hash;
+        std::size_t entry_num = 0;
+        const ValueT *value;
+        while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
+          (*functor)(*accessor, *value);
+          if (!allow_duplicate_keys)
+            break;
         }
       }
-
-      // if (!wasHit)
-      //   numFalsePositives++;
-      // else
-      //   numHits++;
-
     }
-    // std::cerr << "numFalsePositives: " << numFalsePositives
-    //           << " numMisses: " << numMisses
-    //           << " numHits: " << numHits << "\n";
   });
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0161a4de/utility/BloomFilterAdapter.hpp
----------------------------------------------------------------------
diff --git a/utility/BloomFilterAdapter.hpp b/utility/BloomFilterAdapter.hpp
index 10d1450..f1916dc 100644
--- a/utility/BloomFilterAdapter.hpp
+++ b/utility/BloomFilterAdapter.hpp
@@ -40,59 +40,33 @@ namespace quickstep {
 class BloomFilterAdapter {
  public:
   BloomFilterAdapter(const std::vector<const BloomFilter*> &bloom_filters,
-                     const std::vector<std::vector<attribute_id>> &attribute_ids,
-                     const std::vector<std::vector<std::size_t>> &attr_sizes)
-      : num_bloom_filters_(bloom_filters.size()) {
+                     const std::vector<std::vector<attribute_id>> attribute_ids,
+                     const std::vector<std::vector<std::size_t>> attr_sizes) {
     DCHECK_EQ(bloom_filters.size(), attribute_ids.size());
     DCHECK_EQ(bloom_filters.size(), attr_sizes.size());
 
-    bloom_filter_entries_.reserve(num_bloom_filters_);
-    bloom_filter_entry_indices_.reserve(num_bloom_filters_);
-
-    for (std::size_t i = 0; i < num_bloom_filters_; ++i) {
-      bloom_filter_entries_.emplace_back(bloom_filters[i], attribute_ids[i], attr_sizes[i]);
-      bloom_filter_entry_indices_.emplace_back(i);
+    bloom_filter_entries_.reserve(bloom_filters.size());
+    for (std::size_t i = 0; i < bloom_filters.size(); ++i) {
+      auto entry = new BloomFilterEntry(
+          bloom_filters[i], attribute_ids[i], attr_sizes[i]);
+      bloom_filter_entries_.push_back(entry);
     }
   }
 
-  template <typename ValueAccessorT>
-  inline bool miss(const ValueAccessorT *accessor) {
-    return missImpl<ValueAccessorT, true>(accessor);
+  ~BloomFilterAdapter() {
+    for (auto &entry : bloom_filter_entries_)
+      delete entry;
   }
 
-  template <typename ValueAccessorT, bool adapt_filters>
-  inline bool missImpl(const ValueAccessorT *accessor) {
-    for (std::size_t i = 0; i < num_bloom_filters_; ++i) {
-      const std::size_t entry_idx = bloom_filter_entry_indices_[i];
-      BloomFilterEntry &entry = bloom_filter_entries_[entry_idx];
-      if (adapt_filters) {
-        ++entry.cnt;
-      }
-
-      const BloomFilter *bloom_filter = entry.bloom_filter;
-      for (std::size_t i = 0; i < entry.attribute_ids.size(); ++i) {
-        const attribute_id &attr_id = entry.attribute_ids[i];
-        const std::size_t size = entry.attribute_sizes[i];
-        auto value = static_cast<const std::uint8_t*>(accessor->template getUntypedValue<false>(attr_id));
-        if (!bloom_filter->contains(value, size)) {
-          if (adapt_filters) {
-            // Record miss
-            ++entry.miss;
-
-            // Update entry order
-            if (!(entry.miss % kNumMissesBeforeAdapt) && i > 0) {
-              const std::size_t prev_entry_idx = bloom_filter_entry_indices_[i-1];
-              if (entry.isBetterThan(bloom_filter_entries_[prev_entry_idx])) {
-                bloom_filter_entry_indices_[i-1] = entry_idx;
-                bloom_filter_entry_indices_[i] = prev_entry_idx;
-              }
-            }
-          }
-          return true;
-        }
-      }
-    }
-    return false;
+  template <typename ValueAccessorT, bool adapt_filters = true>
+  std::size_t bulkProbe(
+      const ValueAccessorT *accessor,
+      std::vector<tuple_id> &batch) {
+    std::size_t out_size = batch.size();
+    for (auto &entry : bloom_filter_entries_)
+      out_size = bulkProbeBloomFilterEntry(*entry, accessor, batch, out_size);
+    adaptEntryOrder();
+    return out_size;
   }
 
  private:
@@ -107,22 +81,59 @@ class BloomFilterAdapter {
           cnt(0) {
     }
 
-    inline bool isBetterThan(const BloomFilterEntry& other) {
-      return static_cast<std::uint64_t>(miss) * other.cnt
-                 > static_cast<std::uint64_t>(cnt + 5) * (other.miss + 5);
+    static bool isBetterThan(const BloomFilterEntry *a,
+                             const BloomFilterEntry *b) {
+      return a->miss_rate > b->miss_rate;
+      // return static_cast<std::uint64_t>(a.miss) * b.cnt
+      //            > static_cast<std::uint64_t>(a.cnt + 5) * (b.miss + 5);
     }
 
     const BloomFilter *bloom_filter;
-    const std::vector<attribute_id> &attribute_ids;
+    std::vector<attribute_id> attribute_ids;
     std::vector<std::size_t> attribute_sizes;
     std::uint32_t miss;
     std::uint32_t cnt;
+    float miss_rate;
   };
 
-  const std::size_t kNumMissesBeforeAdapt = 64;
-  const std::size_t num_bloom_filters_;
-  std::vector<BloomFilterEntry> bloom_filter_entries_;
-  std::vector<std::size_t> bloom_filter_entry_indices_;
+  template <typename ValueAccessorT, bool adapt_filters = true>
+  std::size_t bulkProbeBloomFilterEntry(
+      BloomFilterEntry &entry,
+      const ValueAccessorT *accessor,
+      std::vector<tuple_id> &batch,
+      const std::size_t in_size) {
+    std::size_t out_size = 0;
+    const BloomFilter *bloom_filter = entry.bloom_filter;
+
+    for (std::size_t t = 0; t < in_size; ++t) {
+      tuple_id tid = batch[t];
+      for (std::size_t a = 0; a < entry.attribute_ids.size(); ++a) {
+        const attribute_id &attr_id = entry.attribute_ids[a];
+        const std::size_t size = entry.attribute_sizes[a];
+        auto value = static_cast<const std::uint8_t*>(
+            accessor->getUntypedValueAtAbsolutePosition(attr_id, tid));
+        if (bloom_filter->contains(value, size)) {
+          batch[out_size] = tid;
+          ++out_size;
+        }
+      }
+    }
+    if (adapt_filters) {
+      entry.cnt += in_size;
+      entry.miss += (in_size - out_size);
+    }
+    return out_size;
+  }
+
+  void adaptEntryOrder() {
+    for (auto &entry : bloom_filter_entries_)
+      entry->miss_rate = static_cast<float>(entry->miss) / entry->cnt;
+    std::sort(bloom_filter_entries_.begin(),
+              bloom_filter_entries_.end(),
+              BloomFilterEntry::isBetterThan);
+  }
+
+  std::vector<BloomFilterEntry *> bloom_filter_entries_;
 
   DISALLOW_COPY_AND_ASSIGN(BloomFilterAdapter);
 };


[3/5] incubator-quickstep git commit: Speed up Bloom Filter Adapter: caching attr sizes

Posted by na...@apache.org.
Speed up Bloom Filter Adapter: caching attr sizes


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/dd4bfd65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/dd4bfd65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/dd4bfd65

Branch: refs/heads/expt_bloom_filter_hash_fn
Commit: dd4bfd65dc4330ae189422f677dd57f9b5f30ce5
Parents: 36d0f4c
Author: Navneet Potti <na...@gmail.com>
Authored: Tue Jul 12 19:29:08 2016 -0500
Committer: Navneet Potti <na...@gmail.com>
Committed: Tue Jul 12 19:29:08 2016 -0500

----------------------------------------------------------------------
 storage/HashTable.hpp          | 45 ++++++++++++++++++++++++++-----------
 utility/BloomFilterAdapter.hpp | 24 ++++++++++++--------
 2 files changed, 47 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dd4bfd65/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index 1e153f2..a4b09fb 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -2250,15 +2250,33 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
       [&](auto *accessor) -> void {  // NOLINT(build/c++11)
     std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter;
     if (has_probe_side_bloom_filter_) {
-      bloom_filter_adapter.reset(
-          new BloomFilterAdapter(probe_bloom_filters_, probe_attribute_ids_));
+
+      // Find (and cache) the size of each attribute in the probe lists.
+      // NOTE(nav): This code uses the accessor to get the size,
+      // and hence only works if there's at least one tuple.
+      // NOTE(nav): This code is meant to be called only for PKs, and so
+      // we assume non-null values.
+      std::vector<std::vector<std::size_t>> attr_size_vectors;
+      attr_size_vectors.reserve(probe_attribute_ids_.size());
+      for (const auto &probe_attr_vector : probe_attribute_ids_) {
+        std::vector<std::size_t> attr_sizes;
+        attr_sizes.reserve(probe_attr_vector.size());
+        for (const auto &probe_attr : probe_attr_vector) {
+          auto val_and_size = accessor->getUntypedValueAndByteLengthAtAbsolutePosition(0, probe_attr);
+          attr_sizes.push_back(val_and_size.second);
+        }
+        attr_size_vectors.push_back(attr_sizes);
+      }
+
+      bloom_filter_adapter.reset(new BloomFilterAdapter(
+              probe_bloom_filters_, probe_attribute_ids_, attr_size_vectors));
     }
-    std::size_t numFalsePositives = 0;
-    std::size_t numMisses = 0;
-    std::size_t numHits = 0;
+    // std::size_t numFalsePositives = 0;
+    // std::size_t numMisses = 0;
+    // std::size_t numHits = 0;
     while (accessor->next()) {
       if (has_probe_side_bloom_filter_ && bloom_filter_adapter->miss(accessor)) {
-        numMisses++;
+        // numMisses++;
         continue;  // On a bloom filter miss, probing the hash table can be skipped.
       }
 
@@ -2280,15 +2298,16 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
           break;
         }
       }
-      if (!wasHit)
-        numFalsePositives++;
-      else
-        numHits++;
+
+      // if (!wasHit)
+      //   numFalsePositives++;
+      // else
+      //   numHits++;
 
     }
-    std::cerr << "numFalsePositives: " << numFalsePositives
-              << " numMisses: " << numMisses
-              << " numHits: " << numHits << "\n";
+    // std::cerr << "numFalsePositives: " << numFalsePositives
+    //           << " numMisses: " << numMisses
+    //           << " numHits: " << numHits << "\n";
   });
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dd4bfd65/utility/BloomFilterAdapter.hpp
----------------------------------------------------------------------
diff --git a/utility/BloomFilterAdapter.hpp b/utility/BloomFilterAdapter.hpp
index 5deb275..10d1450 100644
--- a/utility/BloomFilterAdapter.hpp
+++ b/utility/BloomFilterAdapter.hpp
@@ -40,15 +40,17 @@ namespace quickstep {
 class BloomFilterAdapter {
  public:
   BloomFilterAdapter(const std::vector<const BloomFilter*> &bloom_filters,
-                     const std::vector<std::vector<attribute_id>> &attribute_ids)
+                     const std::vector<std::vector<attribute_id>> &attribute_ids,
+                     const std::vector<std::vector<std::size_t>> &attr_sizes)
       : num_bloom_filters_(bloom_filters.size()) {
     DCHECK_EQ(bloom_filters.size(), attribute_ids.size());
+    DCHECK_EQ(bloom_filters.size(), attr_sizes.size());
 
     bloom_filter_entries_.reserve(num_bloom_filters_);
     bloom_filter_entry_indices_.reserve(num_bloom_filters_);
 
     for (std::size_t i = 0; i < num_bloom_filters_; ++i) {
-      bloom_filter_entries_.emplace_back(bloom_filters[i], attribute_ids[i]);
+      bloom_filter_entries_.emplace_back(bloom_filters[i], attribute_ids[i], attr_sizes[i]);
       bloom_filter_entry_indices_.emplace_back(i);
     }
   }
@@ -68,17 +70,17 @@ class BloomFilterAdapter {
       }
 
       const BloomFilter *bloom_filter = entry.bloom_filter;
-      for (const attribute_id &attr_id : entry.attribute_ids) {
-        const std::pair<const void*, std::size_t> value_and_byte_length =
-            accessor->getUntypedValueAndByteLength(attr_id);
-        if (!bloom_filter->contains(static_cast<const std::uint8_t*>(value_and_byte_length.first),
-                                    value_and_byte_length.second)) {
+      for (std::size_t i = 0; i < entry.attribute_ids.size(); ++i) {
+        const attribute_id &attr_id = entry.attribute_ids[i];
+        const std::size_t size = entry.attribute_sizes[i];
+        auto value = static_cast<const std::uint8_t*>(accessor->template getUntypedValue<false>(attr_id));
+        if (!bloom_filter->contains(value, size)) {
           if (adapt_filters) {
             // Record miss
             ++entry.miss;
 
             // Update entry order
-            if (i > 0) {
+            if (!(entry.miss % kNumMissesBeforeAdapt) && i > 0) {
               const std::size_t prev_entry_idx = bloom_filter_entry_indices_[i-1];
               if (entry.isBetterThan(bloom_filter_entries_[prev_entry_idx])) {
                 bloom_filter_entry_indices_[i-1] = entry_idx;
@@ -96,9 +98,11 @@ class BloomFilterAdapter {
  private:
   struct BloomFilterEntry {
     BloomFilterEntry(const BloomFilter *in_bloom_filter,
-                     const std::vector<attribute_id> &in_attribute_ids)
+                     const std::vector<attribute_id> &in_attribute_ids,
+                     const std::vector<std::size_t> &in_attribute_sizes)
         : bloom_filter(in_bloom_filter),
           attribute_ids(in_attribute_ids),
+          attribute_sizes(in_attribute_sizes),
           miss(0),
           cnt(0) {
     }
@@ -110,10 +114,12 @@ class BloomFilterAdapter {
 
     const BloomFilter *bloom_filter;
     const std::vector<attribute_id> &attribute_ids;
+    std::vector<std::size_t> attribute_sizes;
     std::uint32_t miss;
     std::uint32_t cnt;
   };
 
+  const std::size_t kNumMissesBeforeAdapt = 64;
   const std::size_t num_bloom_filters_;
   std::vector<BloomFilterEntry> bloom_filter_entries_;
   std::vector<std::size_t> bloom_filter_entry_indices_;