You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by na...@apache.org on 2016/07/13 04:49:39 UTC

[5/5] incubator-quickstep git commit: Probe Bloom filter adapter in batches

Probe Bloom filter adapter in batches


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0161a4de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0161a4de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0161a4de

Branch: refs/heads/expt_bloom_filter_hash_fn
Commit: 0161a4dee295ea4efe8c8d9ebdf56759e2c56da2
Parents: fa815ff
Author: Navneet Potti <na...@gmail.com>
Authored: Tue Jul 12 23:49:05 2016 -0500
Committer: Navneet Potti <na...@gmail.com>
Committed: Tue Jul 12 23:49:05 2016 -0500

----------------------------------------------------------------------
 storage/HashTable.hpp          |  87 ++++++++++++++++-----------
 utility/BloomFilterAdapter.hpp | 117 ++++++++++++++++++++----------------
 2 files changed, 115 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0161a4de/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index a4b09fb..b666e0c 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -2254,15 +2254,16 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
       // Find (and cache) the size of each attribute in the probe lists.
       // NOTE(nav): This code uses the accessor to get the size,
       // and hence only works if there's at least one tuple.
-      // NOTE(nav): This code is meant to be called only for PKs, and so
-      // we assume non-null values.
       std::vector<std::vector<std::size_t>> attr_size_vectors;
       attr_size_vectors.reserve(probe_attribute_ids_.size());
       for (const auto &probe_attr_vector : probe_attribute_ids_) {
         std::vector<std::size_t> attr_sizes;
         attr_sizes.reserve(probe_attr_vector.size());
         for (const auto &probe_attr : probe_attr_vector) {
-          auto val_and_size = accessor->getUntypedValueAndByteLengthAtAbsolutePosition(0, probe_attr);
+          // Using the function below is the easiest way to get attribute size
+          // here. The template parameter check_null is set to false to ensure
+          // that we get the size even if the first attr value is null.
+          auto val_and_size = accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, probe_attr);
           attr_sizes.push_back(val_and_size.second);
         }
         attr_size_vectors.push_back(attr_sizes);
@@ -2270,44 +2271,58 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
 
       bloom_filter_adapter.reset(new BloomFilterAdapter(
               probe_bloom_filters_, probe_attribute_ids_, attr_size_vectors));
+
+      static const uint32_t kMaxBatchSize = 4000;
+      std::vector<tuple_id> batch;
+      batch.reserve(kMaxBatchSize);
+
+      do {
+        while (batch.size() < kMaxBatchSize && accessor->next())
+          batch.push_back(accessor->getCurrentPosition());
+
+        std::size_t num_hits = bloom_filter_adapter->bulkProbe(accessor, batch);
+
+        for (std::size_t t = 0; t < num_hits; ++t){
+          tuple_id probe_tid = batch[t];
+          TypedValue key = accessor->getTypedValueAtAbsolutePosition(key_attr_id, probe_tid);
+          if (check_for_null_keys && key.isNull()) {
+            continue;
+          }
+          const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
+                                                                        : key.getHash();
+          const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
+                                                                  : true_hash;
+          std::size_t entry_num = 0;
+          const ValueT *value;
+          while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
+            (*functor)(probe_tid, *value);
+            if (!allow_duplicate_keys)
+              break;
+          }
+        }
+        batch.clear();
+      } while (!accessor->iterationFinished());
     }
-    // std::size_t numFalsePositives = 0;
-    // std::size_t numMisses = 0;
-    // std::size_t numHits = 0;
-    while (accessor->next()) {
-      if (has_probe_side_bloom_filter_ && bloom_filter_adapter->miss(accessor)) {
-        // numMisses++;
-        continue;  // On a bloom filter miss, probing the hash table can be skipped.
-      }
 
-      TypedValue key = accessor->getTypedValue(key_attr_id);
-      if (check_for_null_keys && key.isNull()) {
-        continue;
-      }
-      const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
-                                                                     : key.getHash();
-      const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
-                                                               : true_hash;
-      std::size_t entry_num = 0;
-      const ValueT *value;
-      bool wasHit = false;
-      while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
-        wasHit = true;
-        (*functor)(*accessor, *value);
-        if (!allow_duplicate_keys) {
-          break;
+    else { // no Bloom filters to probe
+      while(accessor->next()) {
+        TypedValue key = accessor->getTypedValue(key_attr_id);
+        if (check_for_null_keys && key.isNull()) {
+          continue;
+        }
+        const std::size_t true_hash = use_scalar_literal_hash_template ? key.getHashScalarLiteral()
+                                      : key.getHash();
+        const std::size_t adjusted_hash = adjust_hashes_template ? this->AdjustHash(true_hash)
+                                          : true_hash;
+        std::size_t entry_num = 0;
+        const ValueT *value;
+        while (this->getNextEntryForKey(key, adjusted_hash, &value, &entry_num)) {
+          (*functor)(*accessor, *value);
+          if (!allow_duplicate_keys)
+            break;
         }
       }
-
-      // if (!wasHit)
-      //   numFalsePositives++;
-      // else
-      //   numHits++;
-
     }
-    // std::cerr << "numFalsePositives: " << numFalsePositives
-    //           << " numMisses: " << numMisses
-    //           << " numHits: " << numHits << "\n";
   });
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0161a4de/utility/BloomFilterAdapter.hpp
----------------------------------------------------------------------
diff --git a/utility/BloomFilterAdapter.hpp b/utility/BloomFilterAdapter.hpp
index 10d1450..f1916dc 100644
--- a/utility/BloomFilterAdapter.hpp
+++ b/utility/BloomFilterAdapter.hpp
@@ -40,59 +40,33 @@ namespace quickstep {
 class BloomFilterAdapter {
  public:
   BloomFilterAdapter(const std::vector<const BloomFilter*> &bloom_filters,
-                     const std::vector<std::vector<attribute_id>> &attribute_ids,
-                     const std::vector<std::vector<std::size_t>> &attr_sizes)
-      : num_bloom_filters_(bloom_filters.size()) {
+                     const std::vector<std::vector<attribute_id>> attribute_ids,
+                     const std::vector<std::vector<std::size_t>> attr_sizes) {
     DCHECK_EQ(bloom_filters.size(), attribute_ids.size());
     DCHECK_EQ(bloom_filters.size(), attr_sizes.size());
 
-    bloom_filter_entries_.reserve(num_bloom_filters_);
-    bloom_filter_entry_indices_.reserve(num_bloom_filters_);
-
-    for (std::size_t i = 0; i < num_bloom_filters_; ++i) {
-      bloom_filter_entries_.emplace_back(bloom_filters[i], attribute_ids[i], attr_sizes[i]);
-      bloom_filter_entry_indices_.emplace_back(i);
+    bloom_filter_entries_.reserve(bloom_filters.size());
+    for (std::size_t i = 0; i < bloom_filters.size(); ++i) {
+      auto entry = new BloomFilterEntry(
+          bloom_filters[i], attribute_ids[i], attr_sizes[i]);
+      bloom_filter_entries_.push_back(entry);
     }
   }
 
-  template <typename ValueAccessorT>
-  inline bool miss(const ValueAccessorT *accessor) {
-    return missImpl<ValueAccessorT, true>(accessor);
+  ~BloomFilterAdapter() {
+    for (auto &entry : bloom_filter_entries_)
+      delete entry;
   }
 
-  template <typename ValueAccessorT, bool adapt_filters>
-  inline bool missImpl(const ValueAccessorT *accessor) {
-    for (std::size_t i = 0; i < num_bloom_filters_; ++i) {
-      const std::size_t entry_idx = bloom_filter_entry_indices_[i];
-      BloomFilterEntry &entry = bloom_filter_entries_[entry_idx];
-      if (adapt_filters) {
-        ++entry.cnt;
-      }
-
-      const BloomFilter *bloom_filter = entry.bloom_filter;
-      for (std::size_t i = 0; i < entry.attribute_ids.size(); ++i) {
-        const attribute_id &attr_id = entry.attribute_ids[i];
-        const std::size_t size = entry.attribute_sizes[i];
-        auto value = static_cast<const std::uint8_t*>(accessor->template getUntypedValue<false>(attr_id));
-        if (!bloom_filter->contains(value, size)) {
-          if (adapt_filters) {
-            // Record miss
-            ++entry.miss;
-
-            // Update entry order
-            if (!(entry.miss % kNumMissesBeforeAdapt) && i > 0) {
-              const std::size_t prev_entry_idx = bloom_filter_entry_indices_[i-1];
-              if (entry.isBetterThan(bloom_filter_entries_[prev_entry_idx])) {
-                bloom_filter_entry_indices_[i-1] = entry_idx;
-                bloom_filter_entry_indices_[i] = prev_entry_idx;
-              }
-            }
-          }
-          return true;
-        }
-      }
-    }
-    return false;
+  template <typename ValueAccessorT, bool adapt_filters = true>
+  std::size_t bulkProbe(
+      const ValueAccessorT *accessor,
+      std::vector<tuple_id> &batch) {
+    std::size_t out_size = batch.size();
+    for (auto &entry : bloom_filter_entries_)
+      out_size = bulkProbeBloomFilterEntry(*entry, accessor, batch, out_size);
+    adaptEntryOrder();
+    return out_size;
   }
 
  private:
@@ -107,22 +81,59 @@ class BloomFilterAdapter {
           cnt(0) {
     }
 
-    inline bool isBetterThan(const BloomFilterEntry& other) {
-      return static_cast<std::uint64_t>(miss) * other.cnt
-                 > static_cast<std::uint64_t>(cnt + 5) * (other.miss + 5);
+    static bool isBetterThan(const BloomFilterEntry *a,
+                             const BloomFilterEntry *b) {
+      return a->miss_rate > b->miss_rate;
+      // return static_cast<std::uint64_t>(a.miss) * b.cnt
+      //            > static_cast<std::uint64_t>(a.cnt + 5) * (b.miss + 5);
     }
 
     const BloomFilter *bloom_filter;
-    const std::vector<attribute_id> &attribute_ids;
+    std::vector<attribute_id> attribute_ids;
     std::vector<std::size_t> attribute_sizes;
     std::uint32_t miss;
     std::uint32_t cnt;
+    float miss_rate;
   };
 
-  const std::size_t kNumMissesBeforeAdapt = 64;
-  const std::size_t num_bloom_filters_;
-  std::vector<BloomFilterEntry> bloom_filter_entries_;
-  std::vector<std::size_t> bloom_filter_entry_indices_;
+  template <typename ValueAccessorT, bool adapt_filters = true>
+  std::size_t bulkProbeBloomFilterEntry(
+      BloomFilterEntry &entry,
+      const ValueAccessorT *accessor,
+      std::vector<tuple_id> &batch,
+      const std::size_t in_size) {
+    std::size_t out_size = 0;
+    const BloomFilter *bloom_filter = entry.bloom_filter;
+
+    for (std::size_t t = 0; t < in_size; ++t) {
+      tuple_id tid = batch[t];
+      for (std::size_t a = 0; a < entry.attribute_ids.size(); ++a) {
+        const attribute_id &attr_id = entry.attribute_ids[a];
+        const std::size_t size = entry.attribute_sizes[a];
+        auto value = static_cast<const std::uint8_t*>(
+            accessor->getUntypedValueAtAbsolutePosition(attr_id, tid));
+        if (bloom_filter->contains(value, size)) {
+          batch[out_size] = tid;
+          ++out_size;
+        }
+      }
+    }
+    if (adapt_filters) {
+      entry.cnt += in_size;
+      entry.miss += (in_size - out_size);
+    }
+    return out_size;
+  }
+
+  void adaptEntryOrder() {
+    for (auto &entry : bloom_filter_entries_)
+      entry->miss_rate = static_cast<float>(entry->miss) / entry->cnt;
+    std::sort(bloom_filter_entries_.begin(),
+              bloom_filter_entries_.end(),
+              BloomFilterEntry::isBetterThan);
+  }
+
+  std::vector<BloomFilterEntry *> bloom_filter_entries_;
 
   DISALLOW_COPY_AND_ASSIGN(BloomFilterAdapter);
 };