You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/11 20:27:48 UTC

[08/16] incubator-quickstep git commit: Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/PackedRowStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedRowStoreValueAccessor.hpp b/storage/PackedRowStoreValueAccessor.hpp
index 80edecd..1520b3a 100644
--- a/storage/PackedRowStoreValueAccessor.hpp
+++ b/storage/PackedRowStoreValueAccessor.hpp
@@ -20,6 +20,8 @@
 #ifndef QUICKSTEP_STORAGE_PACKED_ROW_STORE_VALUE_ACCESSOR_HPP_
 #define QUICKSTEP_STORAGE_PACKED_ROW_STORE_VALUE_ACCESSOR_HPP_
 
+#include <utility>
+
 #include "catalog/CatalogRelationSchema.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -42,7 +44,8 @@ class PackedRowStoreValueAccessorHelper {
       : relation_(relation),
         num_tuples_(num_tuples),
         tuple_storage_(tuple_storage),
-        null_bitmap_(null_bitmap) {
+        null_bitmap_(null_bitmap),
+        attr_max_lengths_(relation.getMaximumAttributeByteLengths()) {
   }
 
   inline tuple_id numPackedTuples() const {
@@ -67,6 +70,25 @@ class PackedRowStoreValueAccessorHelper {
            + relation_.getFixedLengthAttributeOffset(attr);  // Attribute offset within tuple.
   }
 
+  template <bool check_null>
+  inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple,
+                                                                        const attribute_id attr) const {
+    DEBUG_ASSERT(tuple < num_tuples_);
+    DEBUG_ASSERT(relation_.hasAttributeWithId(attr));
+    if (check_null) {
+      const int nullable_idx = relation_.getNullableAttributeIndex(attr);
+      if ((nullable_idx != -1)
+          && null_bitmap_->getBit(tuple * relation_.numNullableAttributes() + nullable_idx)) {
+        return std::make_pair(nullptr, 0);
+      }
+    }
+
+    return std::make_pair(static_cast<const char*>(tuple_storage_)
+                              + (tuple * relation_.getFixedByteLength())
+                              + relation_.getFixedLengthAttributeOffset(attr),
+                          attr_max_lengths_[attr]);
+  }
+
   inline TypedValue getAttributeValueTyped(const tuple_id tuple,
                                            const attribute_id attr) const {
     const Type &attr_type = relation_.getAttributeById(attr)->getType();
@@ -81,6 +103,7 @@ class PackedRowStoreValueAccessorHelper {
   const tuple_id num_tuples_;
   const void *tuple_storage_;
   const BitVector<false> *null_bitmap_;
+  const std::vector<std::size_t> &attr_max_lengths_;
 
   DISALLOW_COPY_AND_ASSIGN(PackedRowStoreValueAccessorHelper);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/SplitRowStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreValueAccessor.hpp b/storage/SplitRowStoreValueAccessor.hpp
index 61bb7bf..e2d2b47 100644
--- a/storage/SplitRowStoreValueAccessor.hpp
+++ b/storage/SplitRowStoreValueAccessor.hpp
@@ -102,6 +102,11 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
     return getUntypedValueAtAbsolutePosition<check_null>(attr_id, current_position_);
   }
 
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const attribute_id attr_id) const {
+    return getUntypedValueAndByteLengthAtAbsolutePosition<check_null>(attr_id, current_position_);
+  }
+
   inline TypedValue getTypedValue(const attribute_id attr_id) const {
     return getTypedValueAtAbsolutePosition(attr_id, current_position_);
   }
@@ -142,6 +147,44 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
     }
   }
 
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthAtAbsolutePosition(const attribute_id attr_id,
+                                                                                            const tuple_id tid) const {
+    DEBUG_ASSERT(occupancy_bitmap_.getBit(tid));
+    DEBUG_ASSERT(relation_.hasAttributeWithId(attr_id));
+    const char *tuple_slot = static_cast<const char*>(tuple_storage_)
+                             + tuple_slot_bytes_ * tid;
+    if (check_null) {
+      const int nullable_idx = relation_.getNullableAttributeIndex(attr_id);
+      if (nullable_idx != -1) {
+        // const_cast is safe here. We will only be using read-only methods of
+        // BitVector.
+        BitVector<true> tuple_null_bitmap(const_cast<void*>(static_cast<const void*>(tuple_slot)),
+                                          relation_.numNullableAttributes());
+        if (tuple_null_bitmap.getBit(nullable_idx)) {
+          return std::make_pair(nullptr, 0);
+        }
+      }
+    }
+
+    const int variable_length_idx = relation_.getVariableLengthAttributeIndex(attr_id);
+    if (variable_length_idx == -1) {
+      // Fixed-length, stored in-line in slot.
+      return std::make_pair(tuple_slot + per_tuple_null_bitmap_bytes_
+                                       + relation_.getFixedLengthAttributeOffset(attr_id),
+                            attr_max_lengths_[attr_id]);
+
+    } else {
+      // Variable-length, stored at back of block.
+      const std::uint32_t *pos_ptr = reinterpret_cast<const std::uint32_t*>(
+          tuple_slot + per_tuple_null_bitmap_bytes_
+                     + relation_.getFixedByteLength()
+                     + variable_length_idx * 2 * sizeof(std::uint32_t));
+      return std::make_pair(static_cast<const char*>(tuple_storage_) + pos_ptr[0],
+                            pos_ptr[1]);
+    }
+  }
+
   inline TypedValue getTypedValueAtAbsolutePosition(const attribute_id attr_id,
                                                     const tuple_id tid) const {
     DEBUG_ASSERT(occupancy_bitmap_.getBit(tid));
@@ -319,6 +362,7 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
         tuple_storage_(tuple_storage),
         tuple_slot_bytes_(tuple_slot_bytes),
         per_tuple_null_bitmap_bytes_(per_tuple_null_bitmap_bytes),
+        attr_max_lengths_(relation.getMaximumAttributeByteLengths()),
         current_position_(std::numeric_limits<std::size_t>::max()) {
   }
 
@@ -329,6 +373,7 @@ class SplitRowStoreValueAccessor : public ValueAccessor {
   const void *tuple_storage_;
   const std::size_t tuple_slot_bytes_;
   const std::size_t per_tuple_null_bitmap_bytes_;
+  const std::vector<std::size_t> &attr_max_lengths_;
 
   std::size_t current_position_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 21aa12c..8370418 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -389,15 +389,7 @@ AggregationState* StorageBlock::aggregate(
     const AggregationHandle &handle,
     const std::vector<std::unique_ptr<const Scalar>> &arguments,
     const std::vector<attribute_id> *arguments_as_attributes,
-    const Predicate *predicate,
     std::unique_ptr<TupleIdSequence> *reuse_matches) const {
-  // If there is a filter predicate that hasn't already been evaluated,
-  // evaluate it now and save the results for other aggregates on this same
-  // block.
-  if (predicate && !*reuse_matches) {
-    reuse_matches->reset(getMatchesForPredicate(predicate));
-  }
-
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   // If all the arguments to this aggregate are plain relation attributes,
   // aggregate directly on a ValueAccessor from this block to avoid a copy.
@@ -418,7 +410,6 @@ void StorageBlock::aggregateGroupBy(
     const AggregationHandle &handle,
     const std::vector<std::unique_ptr<const Scalar>> &arguments,
     const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const Predicate *predicate,
     AggregationStateHashTableBase *hash_table,
     std::unique_ptr<TupleIdSequence> *reuse_matches,
     std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
@@ -440,14 +431,7 @@ void StorageBlock::aggregateGroupBy(
   ColumnVectorsValueAccessor temp_result;
   {
     std::unique_ptr<ValueAccessor> accessor;
-    if (predicate) {
-      if (!*reuse_matches) {
-        // If there is a filter predicate that hasn't already been evaluated,
-        // evaluate it now and save the results for other aggregates on this
-        // same block.
-        reuse_matches->reset(getMatchesForPredicate(predicate));
-      }
-
+    if (reuse_matches) {
       // Create a filtered ValueAccessor that only iterates over predicate
       // matches.
       accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
@@ -499,7 +483,6 @@ void StorageBlock::aggregateDistinct(
     const std::vector<std::unique_ptr<const Scalar>> &arguments,
     const std::vector<attribute_id> *arguments_as_attributes,
     const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const Predicate *predicate,
     AggregationStateHashTableBase *distinctify_hash_table,
     std::unique_ptr<TupleIdSequence> *reuse_matches,
     std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
@@ -514,14 +497,7 @@ void StorageBlock::aggregateDistinct(
   ColumnVectorsValueAccessor temp_result;
   {
     std::unique_ptr<ValueAccessor> accessor;
-    if (predicate) {
-      if (!*reuse_matches) {
-        // If there is a filter predicate that hasn't already been evaluated,
-        // evaluate it now and save the results for other aggregates on this
-        // same block.
-        reuse_matches->reset(getMatchesForPredicate(predicate));
-      }
-
+    if (reuse_matches) {
       // Create a filtered ValueAccessor that only iterates over predicate
       // matches.
       accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 97b4773..2a20cb5 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -412,7 +412,6 @@ class StorageBlock : public StorageBlockBase {
       const AggregationHandle &handle,
       const std::vector<std::unique_ptr<const Scalar>> &arguments,
       const std::vector<attribute_id> *arguments_as_attributes,
-      const Predicate *predicate,
       std::unique_ptr<TupleIdSequence> *reuse_matches) const;
 
   /**
@@ -462,7 +461,6 @@ class StorageBlock : public StorageBlockBase {
   void aggregateGroupBy(const AggregationHandle &handle,
                         const std::vector<std::unique_ptr<const Scalar>> &arguments,
                         const std::vector<std::unique_ptr<const Scalar>> &group_by,
-                        const Predicate *predicate,
                         AggregationStateHashTableBase *hash_table,
                         std::unique_ptr<TupleIdSequence> *reuse_matches,
                         std::vector<std::unique_ptr<ColumnVector>>
@@ -507,7 +505,6 @@ class StorageBlock : public StorageBlockBase {
                          const std::vector<std::unique_ptr<const Scalar>> &arguments,
                          const std::vector<attribute_id> *arguments_as_attributes,
                          const std::vector<std::unique_ptr<const Scalar>> &group_by,
-                         const Predicate *predicate,
                          AggregationStateHashTableBase *distinctify_hash_table,
                          std::unique_ptr<TupleIdSequence> *reuse_matches,
                          std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
@@ -590,6 +587,8 @@ class StorageBlock : public StorageBlockBase {
    **/
   const std::size_t getNumTuples() const;
 
+  TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
+
  private:
   static TupleStorageSubBlock* CreateTupleStorageSubBlock(
       const CatalogRelationSchema &relation,
@@ -629,8 +628,6 @@ class StorageBlock : public StorageBlockBase {
   // StorageBlock's header.
   bool rebuildIndexes(bool short_circuit);
 
-  TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
-
   std::unordered_map<attribute_id, TypedValue>* generateUpdatedValues(
       const ValueAccessor &accessor,
       const tuple_id tuple,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/ValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/ValueAccessor.hpp b/storage/ValueAccessor.hpp
index 70d4405..b107390 100644
--- a/storage/ValueAccessor.hpp
+++ b/storage/ValueAccessor.hpp
@@ -377,6 +377,11 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor {
     return accessor_->template getUntypedValueAtAbsolutePosition<check_null>(attr_id, *current_position_);
   }
 
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const attribute_id attr_id) const {
+    return accessor_->template getUntypedValueAndByteLengthAtAbsolutePosition<check_null>(attr_id, *current_position_);
+  }
+
   inline TypedValue getTypedValue(const attribute_id attr_id) const {
     return accessor_->getTypedValueAtAbsolutePosition(attr_id, *current_position_);
   }
@@ -389,6 +394,13 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor {
   }
 
   // Pass-through.
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthAtAbsolutePosition(const attribute_id attr_id,
+                                                                                            const tuple_id tid) const {
+    return accessor_->template getUntypedValueAndByteLengthAtAbsolutePosition<check_null>(attr_id, tid);
+  }
+
+  // Pass-through.
   inline TypedValue getTypedValueAtAbsolutePosition(const attribute_id attr_id,
                                                     const tuple_id tid) const {
     return accessor_->getTypedValueAtAbsolutePosition(attr_id, tid);
@@ -562,6 +574,12 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor {
                                                                              id_sequence_[current_position_]);
   }
 
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const attribute_id attr_id) const {
+    return accessor_->template getUntypedValueAndByteLengthAtAbsolutePosition<check_null>(
+        attr_id, id_sequence_[current_position_]);
+  }
+
   inline TypedValue getTypedValue(const attribute_id attr_id) const {
     return accessor_->getTypedValueAtAbsolutePosition(attr_id, id_sequence_[current_position_]);
   }
@@ -573,6 +591,13 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor {
                 "OrderedTupleIdSequenceAdapterValueAccessor");
   }
 
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthAtAbsolutePosition(const attribute_id attr_id,
+                                                                                            const tuple_id tid) const {
+    FATAL_ERROR("getUntypedValueAndByteLengthAtAbsolutePosition() not implemented in "
+                "OrderedTupleIdSequenceAdapterValueAccessor");
+  }
+
   inline TypedValue getTypedValueAtAbsolutePosition(const attribute_id attr_id,
                                                     const tuple_id tid) const {
     FATAL_ERROR("getTypedValueAtAbsolutePosition() not implemented in "
@@ -739,6 +764,11 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor {
     return getUntypedValueAtAbsolutePosition<check_null>(attr_id, current_tuple_);
   }
 
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const attribute_id attr_id) const {
+    return getUntypedValueAndByteLengthAtAbsolutePosition<check_null>(attr_id, current_tuple_);
+  }
+
   inline TypedValue getTypedValue(const attribute_id attr_id) const {
     return getTypedValueAtAbsolutePosition(attr_id, current_tuple_);
   }
@@ -749,6 +779,12 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor {
     return helper_.template getAttributeValue<check_null>(tid, attr_id);
   }
 
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthAtAbsolutePosition(const attribute_id attr_id,
+                                                                                            const tuple_id tid) const {
+    return helper_.template getAttributeValueAndByteLength<check_null>(tid, attr_id);
+  }
+
   inline TypedValue getTypedValueAtAbsolutePosition(const attribute_id attr_id,
                                                     const tuple_id tid) const {
     return helper_.getAttributeValueTyped(tid, attr_id);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/types/containers/ColumnVector.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVector.hpp b/types/containers/ColumnVector.hpp
index fc65656..0817054 100644
--- a/types/containers/ColumnVector.hpp
+++ b/types/containers/ColumnVector.hpp
@@ -195,6 +195,22 @@ class NativeColumnVector : public ColumnVector {
   }
 
   /**
+   * @brief Get the untyped pointer to a value as well as the value's byte length
+   *        in this NativeColumnVector as a pair.
+   *
+   * @param position The position of the value to get.
+   * @return A pair containing the untyped pointer to the value at position and
+   *         the value's byte length.
+   **/
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const std::size_t position) const {
+    DCHECK_LT(position, actual_length_);
+    return (check_null && null_bitmap_ && null_bitmap_->getBit(position))
+        ? std::make_pair(nullptr, 0)
+        : std::make_pair(static_cast<const char*>(values_) + (position * type_length_), type_length_);
+  }
+
+  /**
    * @brief Get a value in this NativeColumnVector as a TypedValue.
    *
    * @param position The position of the value to get.
@@ -455,6 +471,25 @@ class IndirectColumnVector : public ColumnVector {
   }
 
   /**
+   * @brief Get the untyped pointer to a value as well as the value's byte length
+   *        in this IndirectColumnVector as a pair.
+   *
+   * @param position The position of the value to get.
+   * @return A pair containing the untyped pointer to the value at position and
+   *         the value's byte length.
+   **/
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const std::size_t position) const {
+    DCHECK_LT(position, values_.size());
+    if (check_null && type_is_nullable_ && values_[position].isNull()) {
+      return std::make_pair(nullptr, 0);
+    } else {
+      const TypedValue &value = values_[position];
+      return std::make_pair(value.getDataPtr(), value.getDataSize());
+    }
+  }
+
+  /**
    * @brief Get a value in this IndirectColumnVector as a TypedValue.
    *
    * @param position The position of the value to get.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/types/containers/ColumnVectorsValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVectorsValueAccessor.hpp b/types/containers/ColumnVectorsValueAccessor.hpp
index 2300f3b..19e57a9 100644
--- a/types/containers/ColumnVectorsValueAccessor.hpp
+++ b/types/containers/ColumnVectorsValueAccessor.hpp
@@ -126,6 +126,11 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
     return getUntypedValueAtAbsolutePosition<check_null>(attr_id, current_position_);
   }
 
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const attribute_id attr_id) const {
+    return getUntypedValueAndByteLengthAtAbsolutePosition<check_null>(attr_id, current_position_);
+  }
+
   inline TypedValue getTypedValue(const attribute_id attr_id) const {
     return getTypedValueAtAbsolutePosition(attr_id, current_position_);
   }
@@ -142,6 +147,18 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
     }
   }
 
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthAtAbsolutePosition(const attribute_id attr_id,
+                                                                                            const tuple_id tid) const {
+    DCHECK(attributeIdInRange(attr_id));
+    DCHECK(tupleIdInRange(tid));
+    if (column_native_[attr_id]) {
+      return static_cast<const NativeColumnVector&>(*columns_[attr_id]).getUntypedValueAndByteLength<check_null>(tid);
+    } else {
+      return static_cast<const IndirectColumnVector&>(*columns_[attr_id]).getUntypedValueAndByteLength<check_null>(tid);
+    }
+  }
+
   inline TypedValue getTypedValueAtAbsolutePosition(const attribute_id attr_id,
                                                     const tuple_id tid) const {
     DCHECK(attributeIdInRange(attr_id));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/BloomFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/BloomFilter.hpp b/utility/BloomFilter.hpp
index 8d62da9..749d33a 100644
--- a/utility/BloomFilter.hpp
+++ b/utility/BloomFilter.hpp
@@ -23,6 +23,7 @@
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <cstring>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -41,11 +42,358 @@ namespace quickstep {
  *  @{
  */
 
+class BloomFilterOriginal;
+class BloomFilterBlocked;
+typedef BloomFilterBlocked BloomFilter;
+
+/**
+ * @brief A "blocked" version of Bloom Filter based on this paper:
+ *        Putze, Felix, Peter Sanders, and Johannes Singler.
+ *        "Cache-, hash-and space-efficient bloom filters."
+ *        International Workshop on Experimental and Efficient Algorithms.
+ *        Springer Berlin Heidelberg, 2007.
+ **/
+class BloomFilterBlocked {
+ public:
+  static const std::uint8_t kNumBitsPerByte = 8;
+  static const std::uint8_t kMaxNumHashFns = 4;
+
+  // This union allows us to read/write position in convenient fashion,
+  // through nested structs and their bitfield members
+  //
+  // A position can simply be a 32-bit hash
+  // Or it can be a cache line (block of 512 bits) and position within it
+  // Or it can be a byte (block of 8 bits) and position within it
+  union Position {
+    std::uint32_t hash;
+    struct CacheLinePosition {
+      unsigned index_in_line : 9;
+      unsigned line_num : 23;
+    } cache_line_pos;
+    struct BytePosition {
+      unsigned index_in_byte : 3;
+      unsigned byte_num : 29;
+    } byte_pos;
+  };
+
+  // This Bloom filter implementation requires the bit array to be a
+  // multiple of the cache-line size. So we either have to round up to a 
+  // multiple (default behavior) or round down to a multiple.
+  // Rounding up is usually preferable but rounding down is necessary when
+  // we are given a bit array that we don't control the size of, in the
+  // constructor.
+  static std::uint64_t getNearestAllowedSize(
+      const std::uint64_t approx_size,
+      bool round_down = false) {
+    if (round_down)
+      return (approx_size / kCacheLineBytes) * kCacheLineBytes;
+    return ((approx_size + kCacheLineBytes - 1)/ kCacheLineBytes) * kCacheLineBytes;
+  }
+
+
+  /**
+   * @brief Constructor.
+   * @note When no bit_array is being passed to the constructor,
+   *       then the bit_array is owned and managed by this class.
+   *
+   * @param hash_fn_count The number of hash functions used by this bloom filter.
+   * @param bit_array_size_in_bytes Size of the bit array.
+   **/
+  BloomFilterBlocked(const std::uint8_t hash_fn_count,
+              const std::uint64_t bit_array_size_in_bytes)
+      : hash_fn_count_(hash_fn_count),
+        array_size_in_bytes_(getNearestAllowedSize(bit_array_size_in_bytes)),
+        is_bit_array_owner_(true),
+        bit_array_(new std::uint8_t[array_size_in_bytes_]) {
+    reset();
+  }
+
+  /**
+   * @brief Constructor.
+   * @note When a bit_array is passed as an argument to the constructor,
+   *       then the ownership of the bit array lies with the caller.
+   *
+   * @param hash_fn_count The number of hash functions used by this bloom filter.
+   * @param bit_array_size_in_bytes Size of the bit array.
+   * @param bit_array A pointer to the memory region that is used to store bit array.
+   * @param is_initialized A boolean that indicates whether to zero-out the region
+   *                       before use or not.
+   **/
+  BloomFilterBlocked(const std::uint8_t hash_fn_count,
+              const std::uint64_t bit_array_size_in_bytes,
+              std::uint8_t *bit_array,
+              const bool is_initialized)
+      : hash_fn_count_(hash_fn_count),
+        array_size_in_bytes_(getNearestAllowedSize(bit_array_size_in_bytes, true)),
+        is_bit_array_owner_(false),
+        bit_array_(bit_array) {  // Owned by the calling method.
+    if (!is_initialized) {
+      reset();
+    }
+  }
+
+  /**
+   * @brief Constructor.
+   * @note When a bloom filter proto is passed as an initializer,
+   *       then the bit_array is owned and managed by this class.
+   *
+   * @param bloom_filter_proto The protobuf representation of a
+   *        bloom filter configuration.
+   **/
+  explicit BloomFilterBlocked(const serialization::BloomFilter &bloom_filter_proto)
+      : hash_fn_count_(bloom_filter_proto.number_of_hashes()),
+        array_size_in_bytes_(bloom_filter_proto.bloom_filter_size()),
+        is_bit_array_owner_(true),
+        bit_array_(new std::uint8_t[array_size_in_bytes_]) {
+    reset();
+  }
+
+  /**
+   * @brief Destructor.
+   **/
+  ~BloomFilterBlocked() {
+    if (is_bit_array_owner_) {
+      bit_array_.reset();
+    } else {
+      bit_array_.release();
+    }
+  }
+
+  static bool ProtoIsValid(const serialization::BloomFilter &bloom_filter_proto) {
+    return bloom_filter_proto.IsInitialized();
+  }
+
+  /**
+   * @brief Zeros out the contents of the bit array.
+   **/
+  inline void reset() {
+    // Initialize the bit_array with all zeros.
+    std::fill_n(bit_array_.get(), array_size_in_bytes_, 0x00);
+    inserted_element_count_ = 0;
+  }
+
+  /**
+   * @brief Get the number of hash functions used in this bloom filter.
+   *
+   * @return Returns the number of hash functions.
+   **/
+  inline std::uint8_t getNumberOfHashes() const {
+    return hash_fn_count_;
+  }
+
+  /**
+   * @brief Get the size of the bit array in bytes for this bloom filter.
+   *
+   * @return Returns the bit array size (in bytes).
+   **/
+  inline std::uint64_t getBitArraySize() const {
+    return array_size_in_bytes_;
+  }
+
+  /**
+   * @brief Get the constant pointer to the bit array for this bloom filter
+   *
+   * @return Returns constant pointer to the bit array.
+   **/
+  inline const std::uint8_t* getBitArray() const {
+    return bit_array_.get();
+  }
+
+  template <typename T>
+  void insert(const T &value) {
+    insert(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
+  }
+
+  /**
+   * @brief Inserts a given value into the bloom filter in a thread-safe manner.
+   *
+   * @param key_begin A pointer to the value being inserted.
+   * @param length Size of the value being inserted in bytes.
+   */
+  inline void insert(const std::uint8_t *key_begin, const std::size_t length) {
+      SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
+      insertUnSafe(key_begin, length);
+  }
+
+  template <typename T>
+  void insertUnSafe(const T &value) {
+    insertUnSafe(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
+  }
+
+  /**
+   * @brief Inserts a given value into the bloom filter.
+   * @Warning This is a faster thread-unsafe version of the insert() function.
+   *          The caller needs to ensure the thread safety.
+   *
+   * @param key_begin A pointer to the value being inserted.
+   * @param length Size of the value being inserted in bytes.
+   */
+  inline void insertUnSafe(const std::uint8_t *key_begin, const std::size_t length) {
+    Position first_pos = getFirstPosition(key_begin, length);
+    setBitAtPosition(first_pos);
+    Position other_pos;
+    for (std::uint8_t i = 1; i <hash_fn_count_; ++i) {
+      other_pos = getOtherPosition(key_begin, length, first_pos, i);
+      setBitAtPosition(other_pos);
+    }
+    ++inserted_element_count_;
+  }
+
+  template <typename T>
+  bool contains(const T &value) {
+    return contains(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T));
+  }
+
+  /**
+   * @brief Test membership of a given value in the bloom filter.
+   *        If true is returned, then a value may or may not be present in the bloom filter.
+   *        If false is returned, a value is certainly not present in the bloom filter.
+   *
+   * @note The membersip test does not require any locks, because the assumption is that
+   *       the bloom filter will only be used after it has been built.
+   *
+   * @param key_begin A pointer to the value being tested for membership.
+   * @param length Size of the value being inserted in bytes.
+   */
+  inline bool contains(
+      const std::uint8_t *__restrict__ key_begin,
+      const std::size_t length) const {
+    Position first_pos = getFirstPosition(key_begin, length);
+    if (!getBitAtPosition(first_pos)) {
+      return false;
+    }
+    Position other_pos;
+    for (std::uint8_t i = 1; i < hash_fn_count_; ++i) {
+      other_pos = getOtherPosition(key_begin, length, first_pos, i);
+      if (!getBitAtPosition(other_pos)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * @brief Perform a bitwise-OR of the given Bloom filter with this bloom filter.
+   *        Essentially, it does a union of this bloom filter with the passed bloom filter.
+   *
+   * @param bloom_filter A const pointer to the bloom filter object to do bitwise-OR with.
+   */
+  inline void bitwiseOr(const BloomFilterBlocked *bloom_filter) {
+    SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
+    for (std::size_t byte_index = 0; byte_index < bloom_filter->getBitArraySize(); ++byte_index) {
+      (bit_array_.get())[byte_index] |= bloom_filter->getBitArray()[byte_index];
+    }
+  }
+
+  /**
+   * @brief Return the number of elements currently inserted into bloom filter.
+   *
+   * @return The number of elements inserted into bloom filter.
+   **/
+  inline std::uint32_t element_count() const {
+    return inserted_element_count_;
+  }
+
+ protected:
+  Position getFirstPosition(const std::uint8_t *begin, std::size_t length) const {
+    Position pos;
+    pos.hash = hash_identity(begin, length);
+    return pos;
+  }
+
+  Position getOtherPosition(
+      const std::uint8_t *begin,
+      std::size_t length,
+      const Position first_pos,
+      const std::uint8_t index) const {
+    Position pos;
+    pos.hash = hash_multiplicative(begin, length, hash_fn_[index-1]);
+    pos.cache_line_pos.line_num = first_pos.cache_line_pos.line_num;
+    return pos;
+  }
+
+  void fillPosition(
+      const std::uint8_t *begin,
+      std::size_t length,
+      const std::uint8_t index,
+      Position positions[]) const {
+    if (index == 0)
+      positions[0].hash = hash_identity(begin, length);
+    else {
+      positions[index].hash = hash_multiplicative(begin, length, hash_fn_[index-1]);
+      positions[index].cache_line_pos.line_num = positions[0].cache_line_pos.line_num;
+    }
+  }
+
+  void setBitAtPosition(const Position &pos) {
+    (bit_array_.get())[pos.byte_pos.byte_num] |= (1 << pos.byte_pos.index_in_byte);
+  }
+
+  bool getBitAtPosition(const Position &pos) const {
+    return (bit_array_.get())[pos.byte_pos.byte_num] & (1 << pos.byte_pos.index_in_byte);
+  }
+
+  inline std::uint32_t hash_identity(
+      const std::uint8_t *__restrict__ begin,
+      std::size_t length) const {
+    std::uint32_t hash;
+    if (length >= 4)
+      hash = *reinterpret_cast<const std::uint32_t*> (begin);
+    else
+      std::memcpy(&hash, begin, length);
+    return hash % (array_size_in_bytes_ * kNumBitsPerByte);
+  }
+
+  inline std::uint32_t hash_multiplicative(
+      const std::uint8_t *__restrict__ begin,
+      std::size_t length,
+      const std::uint64_t multiplier) const {
+    std::uint32_t hash = 0;
+    std::size_t bytes_hashed = 0;
+    if (length >= 4) {
+      while (bytes_hashed < length) {
+        auto val = *reinterpret_cast<const std::uint32_t *>(begin + bytes_hashed);
+        hash += (multiplier * val) >> 24;
+        bytes_hashed += 4;
+      }
+    }
+    while (bytes_hashed < length) {
+      std::uint8_t val = *(begin + bytes_hashed);
+      hash += (multiplier * val) >> 24;
+      bytes_hashed++;
+    }
+    return hash;//  % (array_size_in_bytes_ * kNumBitsPerByte);
+  }
+
+ private:
+  const std::uint32_t hash_fn_count_;
+  const std::uint64_t array_size_in_bytes_;
+  std::uint32_t inserted_element_count_;
+  const bool is_bit_array_owner_;
+
+  static constexpr std::uint64_t kKnuthGoldenRatioNumber = 2654435761;
+  const std::uint64_t hash_fn_[kMaxNumHashFns] = { // hash_fn_[i] is 2**(i+1) - 1
+    0x00000001 * kKnuthGoldenRatioNumber, // 0x00000003, 0x00000007, 0x0000000f,
+    // 0x0000001f * kKnuthGoldenRatioNumber, // 0x0000003f, 0x0000007f, 0x000000ff,
+    0x000001ff * kKnuthGoldenRatioNumber, // 0x000003ff, 0x000007ff, 0x00000fff,
+    // 0x00001fff * kKnuthGoldenRatioNumber, // 0x00003fff, 0x00007fff, 0x0000ffff,
+    0x0001ffff * kKnuthGoldenRatioNumber, // 0x0003ffff, 0x0007ffff, 0x000fffff,
+    // 0x001fffff * kKnuthGoldenRatioNumber, // 0x003fffff, 0x007fffff, 0x00ffffff,
+    0x01ffffff * kKnuthGoldenRatioNumber, // 0x03ffffff, 0x07ffffff, 0x0fffffff,
+    // 0x1fffffff * kKnuthGoldenRatioNumber  // 0x3fffffff, 0x7fffffff, 0xffffffff
+    };
+
+  alignas(kCacheLineBytes) std::unique_ptr<std::uint8_t> bit_array_;
+  alignas(kCacheLineBytes) mutable SpinSharedMutex<false> bloom_filter_insert_mutex_;
+
+  DISALLOW_COPY_AND_ASSIGN(BloomFilterBlocked);
+};
+
 /**
  * @brief A simple Bloom Filter implementation with basic primitives
  *        based on Partow's Bloom Filter implementation.
  **/
-class BloomFilter {
+class BloomFilterOriginal {
  public:
   static const uint32_t kNumBitsPerByte = 8;
 
@@ -54,21 +402,17 @@ class BloomFilter {
    * @note When no bit_array is being passed to the constructor,
    *       then the bit_array is owned and managed by this class.
    *
-   * @param random_seed A random_seed that generates unique hash functions.
    * @param hash_fn_count The number of hash functions used by this bloom filter.
    * @param bit_array_size_in_bytes Size of the bit array.
    **/
-  BloomFilter(const std::uint64_t random_seed,
-              const std::size_t hash_fn_count,
+  BloomFilterOriginal(const std::size_t hash_fn_count,
               const std::uint64_t bit_array_size_in_bytes)
-      : random_seed_(random_seed),
-        hash_fn_count_(hash_fn_count),
+      : hash_fn_count_(hash_fn_count),
         array_size_in_bytes_(bit_array_size_in_bytes),
         array_size_(array_size_in_bytes_ * kNumBitsPerByte),
         bit_array_(new std::uint8_t[array_size_in_bytes_]),
         is_bit_array_owner_(true) {
     reset();
-    generate_unique_hash_fn();
   }
 
   /**
@@ -76,20 +420,17 @@ class BloomFilter {
    * @note When a bit_array is passed as an argument to the constructor,
    *       then the ownership of the bit array lies with the caller.
    *
-   * @param random_seed A random_seed that generates unique hash functions.
    * @param hash_fn_count The number of hash functions used by this bloom filter.
    * @param bit_array_size_in_bytes Size of the bit array.
    * @param bit_array A pointer to the memory region that is used to store bit array.
    * @param is_initialized A boolean that indicates whether to zero-out the region
    *                       before use or not.
    **/
-  BloomFilter(const std::uint64_t random_seed,
-              const std::size_t hash_fn_count,
+  BloomFilterOriginal(const std::size_t hash_fn_count,
               const std::uint64_t bit_array_size_in_bytes,
               std::uint8_t *bit_array,
               const bool is_initialized)
-      : random_seed_(random_seed),
-        hash_fn_count_(hash_fn_count),
+      : hash_fn_count_(hash_fn_count),
         array_size_in_bytes_(bit_array_size_in_bytes),
         array_size_(bit_array_size_in_bytes * kNumBitsPerByte),
         bit_array_(bit_array),  // Owned by the calling method.
@@ -97,7 +438,6 @@ class BloomFilter {
     if (!is_initialized) {
       reset();
     }
-    generate_unique_hash_fn();
   }
 
   /**
@@ -108,21 +448,19 @@ class BloomFilter {
    * @param bloom_filter_proto The protobuf representation of a
    *        bloom filter configuration.
    **/
-  explicit BloomFilter(const serialization::BloomFilter &bloom_filter_proto)
-      : random_seed_(bloom_filter_proto.bloom_filter_seed()),
-        hash_fn_count_(bloom_filter_proto.number_of_hashes()),
+  explicit BloomFilterOriginal(const serialization::BloomFilter &bloom_filter_proto)
+      : hash_fn_count_(bloom_filter_proto.number_of_hashes()),
         array_size_in_bytes_(bloom_filter_proto.bloom_filter_size()),
         array_size_(array_size_in_bytes_ * kNumBitsPerByte),
         bit_array_(new std::uint8_t[array_size_in_bytes_]),
         is_bit_array_owner_(true) {
     reset();
-    generate_unique_hash_fn();
   }
 
   /**
    * @brief Destructor.
    **/
-  ~BloomFilter() {
+  ~BloomFilterOriginal() {
     if (is_bit_array_owner_) {
       bit_array_.reset();
     } else {
@@ -144,15 +482,6 @@ class BloomFilter {
   }
 
   /**
-   * @brief Get the random seed that was used to initialize this bloom filter.
-   *
-   * @return Returns the random seed.
-   **/
-  inline std::uint64_t getRandomSeed() const {
-    return random_seed_;
-  }
-
-  /**
    * @brief Get the number of hash functions used in this bloom filter.
    *
    * @return Returns the number of hash functions.
@@ -195,7 +524,7 @@ class BloomFilter {
 
     // Determine all the bit positions that are required to be set.
     for (std::size_t i = 0; i < hash_fn_count_; ++i) {
-      compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit);
+      compute_indices(hash_multiplicative(key_begin, length, hash_fn_[i]), &bit_index, &bit);
       modified_bit_positions.push_back(std::make_pair(bit_index, bit));
     }
 
@@ -240,7 +569,7 @@ class BloomFilter {
     std::size_t bit = 0;
 
     for (std::size_t i = 0; i < hash_fn_count_; ++i) {
-      compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit);
+      compute_indices(hash_multiplicative(key_begin, length, hash_fn_[i]), &bit_index, &bit);
       (bit_array_.get())[bit_index / kNumBitsPerByte] |= (1 << bit);
     }
 
@@ -262,7 +591,7 @@ class BloomFilter {
     std::size_t bit_index = 0;
     std::size_t bit = 0;
     for (std::size_t i = 0; i < hash_fn_count_; ++i) {
-      compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit);
+      compute_indices(hash_multiplicative(key_begin, length, hash_fn_[i]), &bit_index, &bit);
       if (((bit_array_.get())[bit_index / kNumBitsPerByte] & (1 << bit)) != (1 << bit)) {
         return false;
       }
@@ -276,7 +605,7 @@ class BloomFilter {
    *
    * @param bloom_filter A const pointer to the bloom filter object to do bitwise-OR with.
    */
-  inline void bitwiseOr(const BloomFilter *bloom_filter) {
+  inline void bitwiseOr(const BloomFilterOriginal *bloom_filter) {
     SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_);
     for (std::size_t byte_index = 0; byte_index < bloom_filter->getBitArraySize(); ++byte_index) {
       (bit_array_.get())[byte_index] |= bloom_filter->getBitArray()[byte_index];
@@ -298,95 +627,28 @@ class BloomFilter {
     *bit = *bit_index % kNumBitsPerByte;
   }
 
-  void generate_unique_hash_fn() {
-    hash_fn_.reserve(hash_fn_count_);
-    const std::uint32_t predef_hash_fn_count = 128;
-    static const std::uint32_t predef_hash_fn[predef_hash_fn_count] = {
-       0xAAAAAAAA, 0x55555555, 0x33333333, 0xCCCCCCCC,
-       0x66666666, 0x99999999, 0xB5B5B5B5, 0x4B4B4B4B,
-       0xAA55AA55, 0x55335533, 0x33CC33CC, 0xCC66CC66,
-       0x66996699, 0x99B599B5, 0xB54BB54B, 0x4BAA4BAA,
-       0xAA33AA33, 0x55CC55CC, 0x33663366, 0xCC99CC99,
-       0x66B566B5, 0x994B994B, 0xB5AAB5AA, 0xAAAAAA33,
-       0x555555CC, 0x33333366, 0xCCCCCC99, 0x666666B5,
-       0x9999994B, 0xB5B5B5AA, 0xFFFFFFFF, 0xFFFF0000,
-       0xB823D5EB, 0xC1191CDF, 0xF623AEB3, 0xDB58499F,
-       0xC8D42E70, 0xB173F616, 0xA91A5967, 0xDA427D63,
-       0xB1E8A2EA, 0xF6C0D155, 0x4909FEA3, 0xA68CC6A7,
-       0xC395E782, 0xA26057EB, 0x0CD5DA28, 0x467C5492,
-       0xF15E6982, 0x61C6FAD3, 0x9615E352, 0x6E9E355A,
-       0x689B563E, 0x0C9831A8, 0x6753C18B, 0xA622689B,
-       0x8CA63C47, 0x42CC2884, 0x8E89919B, 0x6EDBD7D3,
-       0x15B6796C, 0x1D6FDFE4, 0x63FF9092, 0xE7401432,
-       0xEFFE9412, 0xAEAEDF79, 0x9F245A31, 0x83C136FC,
-       0xC3DA4A8C, 0xA5112C8C, 0x5271F491, 0x9A948DAB,
-       0xCEE59A8D, 0xB5F525AB, 0x59D13217, 0x24E7C331,
-       0x697C2103, 0x84B0A460, 0x86156DA9, 0xAEF2AC68,
-       0x23243DA5, 0x3F649643, 0x5FA495A8, 0x67710DF8,
-       0x9A6C499E, 0xDCFB0227, 0x46A43433, 0x1832B07A,
-       0xC46AFF3C, 0xB9C8FFF0, 0xC9500467, 0x34431BDF,
-       0xB652432B, 0xE367F12B, 0x427F4C1B, 0x224C006E,
-       0x2E7E5A89, 0x96F99AA5, 0x0BEB452A, 0x2FD87C39,
-       0x74B2E1FB, 0x222EFD24, 0xF357F60C, 0x440FCB1E,
-       0x8BBE030F, 0x6704DC29, 0x1144D12F, 0x948B1355,
-       0x6D8FD7E9, 0x1C11A014, 0xADD1592F, 0xFB3C712E,
-       0xFC77642F, 0xF9C4CE8C, 0x31312FB9, 0x08B0DD79,
-       0x318FA6E7, 0xC040D23D, 0xC0589AA7, 0x0CA5C075,
-       0xF874B172, 0x0CF914D5, 0x784D3280, 0x4E8CFEBC,
-       0xC569F575, 0xCDB2A091, 0x2CC016B4, 0x5C5F4421
-    };
-    if (hash_fn_count_ <= predef_hash_fn_count) {
-      std::copy(predef_hash_fn, predef_hash_fn + hash_fn_count_, hash_fn_.begin());
-      for (std::uint32_t i = 0; i < hash_fn_.size(); ++i) {
-        hash_fn_[i] = hash_fn_[i] * hash_fn_[(i + 3) % hash_fn_count_] + static_cast<std::uint32_t>(random_seed_);
+  inline std::uint32_t hash_multiplicative(
+      const std::uint8_t *begin,
+      std::size_t remaining_length,
+      const std::uint64_t multiplier) const {
+    std::uint32_t hash = 0;
+    std::size_t bytes_hashed = 0;
+    if (remaining_length >= 4) {
+      while (bytes_hashed < remaining_length) {
+        auto val = *reinterpret_cast<const std::uint32_t *>(begin + bytes_hashed);
+        hash += (multiplier * val) >> 32;
+        bytes_hashed += 4;
       }
-    } else {
-      LOG(FATAL) << "Requested number of hash functions is too large.";
     }
-  }
-
-  inline std::uint32_t hash_ap(const std::uint8_t *begin, std::size_t remaining_length, std::uint32_t hash) const {
-    const std::uint8_t *itr = begin;
-    std::uint32_t loop = 0;
-    while (remaining_length >= 8) {
-      const std::uint32_t &i1 = *(reinterpret_cast<const std::uint32_t*>(itr)); itr += sizeof(std::uint32_t);
-      const std::uint32_t &i2 = *(reinterpret_cast<const std::uint32_t*>(itr)); itr += sizeof(std::uint32_t);
-      hash ^= (hash <<  7) ^  i1 * (hash >> 3) ^ (~((hash << 11) + (i2 ^ (hash >> 5))));
-      remaining_length -= 8;
-    }
-    if (remaining_length) {
-      if (remaining_length >= 4) {
-        const std::uint32_t &i = *(reinterpret_cast<const std::uint32_t*>(itr));
-        if (loop & 0x01) {
-          hash ^= (hash <<  7) ^  i * (hash >> 3);
-        } else {
-          hash ^= (~((hash << 11) + (i ^ (hash >> 5))));
-        }
-        ++loop;
-        remaining_length -= 4;
-        itr += sizeof(std::uint32_t);
-      }
-      if (remaining_length >= 2) {
-        const std::uint16_t &i = *(reinterpret_cast<const std::uint16_t*>(itr));
-        if (loop & 0x01) {
-          hash ^= (hash <<  7) ^  i * (hash >> 3);
-        } else {
-          hash ^= (~((hash << 11) + (i ^ (hash >> 5))));
-        }
-        ++loop;
-        remaining_length -= 2;
-        itr += sizeof(std::uint16_t);
-      }
-      if (remaining_length) {
-        hash += ((*itr) ^ (hash * 0xA5A5A5A5)) + loop;
-      }
+    while (bytes_hashed < remaining_length) {
+      std::uint8_t val = *(begin + bytes_hashed);
+      hash += (multiplier * val) >> 32;
+      bytes_hashed++;
     }
     return hash;
   }
 
  private:
-  const std::uint64_t random_seed_;
-  std::vector<std::uint32_t> hash_fn_;
   const std::uint32_t hash_fn_count_;
   std::uint64_t array_size_in_bytes_;
   std::uint64_t array_size_;
@@ -394,9 +656,21 @@ class BloomFilter {
   std::uint32_t inserted_element_count_;
   const bool is_bit_array_owner_;
 
+  static constexpr std::uint64_t kKnuthGoldenRatioNumber = 2654435761;
+  static constexpr std::size_t kMaxNumHashFns = 8;
+  const std::uint64_t hash_fn_[kMaxNumHashFns] = { // hash_fn_[i] is 2**(i+1) - 1
+    0x00000001 * kKnuthGoldenRatioNumber, // 0x00000003, 0x00000007, 0x0000000f,
+    0x0000001f * kKnuthGoldenRatioNumber, // 0x0000003f, 0x0000007f, 0x000000ff,
+    0x000001ff * kKnuthGoldenRatioNumber, // 0x000003ff, 0x000007ff, 0x00000fff,
+    0x00001fff * kKnuthGoldenRatioNumber, // 0x00003fff, 0x00007fff, 0x0000ffff,
+    0x0001ffff * kKnuthGoldenRatioNumber, // 0x0003ffff, 0x0007ffff, 0x000fffff,
+    0x001fffff * kKnuthGoldenRatioNumber, // 0x003fffff, 0x007fffff, 0x00ffffff,
+    0x01ffffff * kKnuthGoldenRatioNumber, // 0x03ffffff, 0x07ffffff, 0x0fffffff,
+    0x1fffffff * kKnuthGoldenRatioNumber  // 0x3fffffff, 0x7fffffff, 0xffffffff
+    };
   alignas(kCacheLineBytes) mutable SpinSharedMutex<false> bloom_filter_insert_mutex_;
 
-  DISALLOW_COPY_AND_ASSIGN(BloomFilter);
+  DISALLOW_COPY_AND_ASSIGN(BloomFilterOriginal);
 };
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/BloomFilter.proto
----------------------------------------------------------------------
diff --git a/utility/BloomFilter.proto b/utility/BloomFilter.proto
index 0f67878..1a8dbf2 100644
--- a/utility/BloomFilter.proto
+++ b/utility/BloomFilter.proto
@@ -23,10 +23,8 @@ message BloomFilter {
   // The default values were determined from empirical experiments.
   // These values control the amount of false positivity that
   // is expected from Bloom Filter.
-  // - Default seed for initializing family of hashes = 0xA5A5A5A55A5A5A5A.
   // - Default bloom filter size = 10 KB.
   // - Default number of hash functions used in bloom filter = 5.
-  optional fixed64 bloom_filter_seed = 1 [default = 0xA5A5A5A55A5A5A5A];
-  optional uint32 bloom_filter_size = 2 [default = 10000];
-  optional uint32 number_of_hashes = 3 [default = 5];
+  optional uint32 bloom_filter_size = 1 [default = 10000];
+  optional uint32 number_of_hashes = 2 [default = 5];
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/BloomFilterAdapter.hpp
----------------------------------------------------------------------
diff --git a/utility/BloomFilterAdapter.hpp b/utility/BloomFilterAdapter.hpp
new file mode 100644
index 0000000..f094307
--- /dev/null
+++ b/utility/BloomFilterAdapter.hpp
@@ -0,0 +1,142 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_BLOOM_FILTER_ADAPTER_HPP
+#define QUICKSTEP_UTILITY_BLOOM_FILTER_ADAPTER_HPP
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "utility/BloomFilter.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+class BloomFilterAdapter {
+ public:
+  BloomFilterAdapter(const std::vector<const BloomFilter*> &bloom_filters,
+                     const std::vector<attribute_id> &attribute_ids,
+                     const std::vector<std::size_t> &attr_sizes) {
+    DCHECK_EQ(bloom_filters.size(), attribute_ids.size());
+    DCHECK_EQ(bloom_filters.size(), attr_sizes.size());
+
+    bloom_filter_entries_.reserve(bloom_filters.size());
+    for (std::size_t i = 0; i < bloom_filters.size(); ++i) {
+      bloom_filter_entries_.emplace_back(
+          new BloomFilterEntry(
+              bloom_filters[i], attribute_ids[i], attr_sizes[i]));
+    }
+  }
+
+  ~BloomFilterAdapter() {
+    for (auto &entry : bloom_filter_entries_) {
+      delete entry;
+    }
+  }
+
+  template <bool adapt_filters, typename ValueAccessorT>
+  inline std::size_t bulkProbe(const ValueAccessorT *accessor,
+                               std::vector<tuple_id> &batch,
+                               const std::size_t batch_size) {
+    std::size_t out_size = batch_size;
+    for (auto &entry : bloom_filter_entries_) {
+      out_size = bulkProbeBloomFilterEntry<adapt_filters>(*entry, accessor, batch, out_size);
+    }
+    adaptEntryOrder();
+    return out_size;
+  }
+
+ private:
+  struct BloomFilterEntry {
+    BloomFilterEntry(const BloomFilter *in_bloom_filter,
+                     const attribute_id &in_attribute_id,
+                     const std::size_t &in_attribute_size)
+        : bloom_filter(in_bloom_filter),
+          attribute_id(in_attribute_id),
+          attribute_size(in_attribute_size),
+          miss(0),
+          cnt(0) {
+    }
+
+    static bool isBetterThan(const BloomFilterEntry *a,
+                             const BloomFilterEntry *b) {
+      return a->miss_rate > b->miss_rate;
+    }
+
+    const BloomFilter *bloom_filter;
+    const attribute_id attribute_id;
+    const std::size_t attribute_size;
+    std::uint32_t miss;
+    std::uint32_t cnt;
+    float miss_rate;
+  };
+
+  template <bool adapt_filters, typename ValueAccessorT>
+  inline std::size_t bulkProbeBloomFilterEntry(
+      BloomFilterEntry &entry,
+      const ValueAccessorT *accessor,
+      std::vector<tuple_id> &batch,
+      const std::size_t in_size) {
+    std::size_t out_size = 0;
+    const BloomFilter *bloom_filter = entry.bloom_filter;
+
+    for (std::size_t t = 0; t < in_size; ++t) {
+      const tuple_id tid = batch[t];
+      const auto value = static_cast<const std::uint8_t*>(
+          accessor->getUntypedValueAtAbsolutePosition(entry.attribute_id, tid));
+      if (bloom_filter->contains(value, entry.attribute_size)) {
+        batch[out_size] = tid;
+        ++out_size;
+      }
+    }
+    if (adapt_filters) {
+      entry.cnt += in_size;
+      entry.miss += (in_size - out_size);
+    }
+    return out_size;
+  }
+
+  inline void adaptEntryOrder() {
+    for (auto &entry : bloom_filter_entries_) {
+      entry->miss_rate = static_cast<float>(entry->miss) / entry->cnt;
+    }
+    std::sort(bloom_filter_entries_.begin(),
+              bloom_filter_entries_.end(),
+              BloomFilterEntry::isBetterThan);
+  }
+
+  std::vector<BloomFilterEntry *> bloom_filter_entries_;
+
+  DISALLOW_COPY_AND_ASSIGN(BloomFilterAdapter);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_BLOOM_FILTER_ADAPTER_HPP

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index ae1179d..46389f0 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -161,6 +161,7 @@ add_library(quickstep_utility_Alignment ../empty_src.cpp Alignment.hpp)
 add_library(quickstep_utility_BitManipulation ../empty_src.cpp BitManipulation.hpp)
 add_library(quickstep_utility_BitVector ../empty_src.cpp BitVector.hpp)
 add_library(quickstep_utility_BloomFilter ../empty_src.cpp BloomFilter.hpp)
+add_library(quickstep_utility_BloomFilterAdapter ../empty_src.cpp BloomFilterAdapter.hpp)
 add_library(quickstep_utility_BloomFilter_proto
             ${quickstep_utility_BloomFilter_proto_srcs}
             ${quickstep_utility_BloomFilter_proto_hdrs})
@@ -168,6 +169,8 @@ add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
 add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp)
+add_library(quickstep_utility_DisjointTreeForest ../empty_src.cpp DisjointTreeForest.hpp)
+add_library(quickstep_utility_EventProfiler EventProfiler.cpp EventProfiler.hpp)
 add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp)
 add_library(quickstep_utility_ExecutionDAGVisualizer
             ExecutionDAGVisualizer.cpp
@@ -221,6 +224,10 @@ target_link_libraries(quickstep_utility_BloomFilter
                       quickstep_threading_SpinSharedMutex
                       quickstep_utility_BloomFilter_proto
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_BloomFilterAdapter
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_utility_BloomFilter
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_BloomFilter_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_utility_CalculateInstalledMemory
@@ -230,6 +237,9 @@ target_link_libraries(quickstep_utility_CheckSnprintf
 target_link_libraries(quickstep_utility_DAG
                       glog
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_DisjointTreeForest)
+target_link_libraries(quickstep_utility_EventProfiler
+                      quickstep_threading_Mutex)
 target_link_libraries(quickstep_utility_ExecutionDAGVisualizer
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_queryexecution_QueryExecutionTypedefs
@@ -312,11 +322,14 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_BitManipulation
                       quickstep_utility_BitVector
                       quickstep_utility_BloomFilter
+                      quickstep_utility_BloomFilterAdapter
                       quickstep_utility_BloomFilter_proto
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf
                       quickstep_utility_DAG
+                      quickstep_utility_DisjointTreeForest
+                      quickstep_utility_EventProfiler
                       quickstep_utility_EqualsAnyConstant
                       quickstep_utility_ExecutionDAGVisualizer
                       quickstep_utility_Glob

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/DisjointTreeForest.hpp
----------------------------------------------------------------------
diff --git a/utility/DisjointTreeForest.hpp b/utility/DisjointTreeForest.hpp
new file mode 100644
index 0000000..f5722ba
--- /dev/null
+++ b/utility/DisjointTreeForest.hpp
@@ -0,0 +1,116 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_DISJOINT_TREE_FOREST_HPP_
+#define QUICKSTEP_UTILITY_DISJOINT_TREE_FOREST_HPP_
+
+#include <cstddef>
+#include <limits>
+#include <utility>
+#include <unordered_map>
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+/**
+ * @brief A.k.a. union-find set.
+ */
+template <typename ElementT,
+          class MapperT = std::unordered_map<ElementT, std::size_t>>
+class DisjointTreeForest {
+ public:
+  inline bool hasElement(const ElementT &element) const {
+    return elements_map_.find(element) != elements_map_.end();
+  }
+
+  inline void makeSet(const ElementT &element) {
+    if (!hasElement(element)) {
+      std::size_t loc = nodes_.size();
+      nodes_.emplace_back(0, loc);
+      elements_map_.emplace(element, loc);
+    }
+  }
+
+  inline std::size_t find(const ElementT &element) {
+    const std::size_t node_id = elements_map_.at(element);
+    std::size_t root_id = node_id;
+    std::size_t parent_id;
+    while ((parent_id = nodes_[root_id].parent) != root_id) {
+      root_id = parent_id;
+    }
+    compress_path(node_id, root_id);
+    return root_id;
+  }
+
+  inline void merge(const ElementT &element1, const ElementT &element2) {
+    std::size_t root_id1 = find(element1);
+    std::size_t root_id2 = find(element2);
+    if (root_id1 != root_id2) {
+      Node &n1 = nodes_[root_id1];
+      Node &n2 = nodes_[root_id2];
+      if (n1.rank > n2.rank) {
+        n2.parent = root_id1;
+      } else if (n1.rank < n2.rank) {
+        n1.parent = root_id2;
+      } else {
+        n1.parent = root_id2;
+        n2.rank += 1;
+      }
+    }
+  }
+
+  inline bool isConnected(const ElementT &element1, const ElementT &element2) {
+    return find(element1) == find(element2);
+  }
+
+ private:
+  struct Node {
+    Node(const std::size_t rank_in, const std::size_t parent_in)
+        : rank(rank_in), parent(parent_in) {
+    }
+    std::size_t rank;
+    std::size_t parent;
+  };
+
+  inline void compress_path(const std::size_t leaf_node_id, const std::size_t root_node_id) {
+    std::size_t node_id = leaf_node_id;
+    std::size_t max_rank = 0;
+    while (node_id != root_node_id) {
+      const Node &node = nodes_[node_id];
+      max_rank = std::max(max_rank, node.rank);
+
+      const std::size_t parent_id = node.parent;
+      nodes_[node_id].parent = root_node_id;
+      node_id = parent_id;
+    }
+    nodes_[root_node_id].rank = max_rank + 1;
+  }
+
+  std::vector<Node> nodes_;
+  MapperT elements_map_;
+
+  static constexpr std::size_t kInvalid = std::numeric_limits<std::size_t>::max();
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_DISJOINT_TREE_FOREST_HPP_
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/EventProfiler.cpp
----------------------------------------------------------------------
diff --git a/utility/EventProfiler.cpp b/utility/EventProfiler.cpp
new file mode 100644
index 0000000..728ebff
--- /dev/null
+++ b/utility/EventProfiler.cpp
@@ -0,0 +1,29 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "utility/EventProfiler.hpp"
+
+#include <cstddef>
+#include <string>
+#include <vector>
+
+namespace quickstep {
+
+EventProfiler<int, std::size_t> simple_profiler;
+EventProfiler<std::size_t> relop_profiler;
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/EventProfiler.hpp
----------------------------------------------------------------------
diff --git a/utility/EventProfiler.hpp b/utility/EventProfiler.hpp
new file mode 100644
index 0000000..70024e6
--- /dev/null
+++ b/utility/EventProfiler.hpp
@@ -0,0 +1,188 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_EVENT_PROFILER_HPP_
+#define QUICKSTEP_UTILITY_EVENT_PROFILER_HPP_
+
+#include <chrono>
+#include <cstddef>
+#include <cstring>
+#include <ctime>
+#include <iomanip>
+#include <map>
+#include <ostream>
+#include <thread>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "threading/Mutex.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+using clock = std::chrono::steady_clock;
+
+template <typename TagT, typename ...PayloadT>
+class EventProfiler {
+
+ public:
+  EventProfiler()
+      : zero_time_(clock::now()) {
+  }
+
+  struct EventInfo {
+    clock::time_point start_time;
+    clock::time_point end_time;
+    bool is_finished;
+    std::tuple<PayloadT...> payload;
+
+    explicit EventInfo(const clock::time_point &start_time_in)
+        : start_time(start_time_in),
+          is_finished(false) {
+    }
+
+    EventInfo()
+        : start_time(clock::now()),
+          is_finished(false) {
+    }
+
+    inline void setPayload(PayloadT &&...in_payload) {
+      payload = std::make_tuple(in_payload...);
+    }
+
+    inline void endEvent() {
+      end_time = clock::now();
+      is_finished = true;
+    }
+  };
+
+  struct EventContainer {
+    EventContainer()
+        : context(0) {}
+
+    inline void startEvent(const TagT &tag) {
+      events[tag].emplace_back(clock::now());
+    }
+
+    inline void endEvent(const TagT &tag) {
+      auto &event_info = events.at(tag).back();
+      event_info.is_finished = true;
+      event_info.end_time = clock::now();
+    }
+
+    inline std::vector<EventInfo> *getEventLine(const TagT &tag) {
+      return &events[tag];
+    }
+
+    inline void setContext(int context_in) {
+      context = context_in;
+    }
+
+    inline int getContext() const {
+      return context;
+    }
+
+    std::map<TagT, std::vector<EventInfo>> events;
+    int context;
+  };
+
+  EventContainer *getContainer() {
+    MutexLock lock(mutex_);
+    return &thread_map_[std::this_thread::get_id()];
+  }
+
+  void writeToStream(std::ostream &os) const {
+    time_t rawtime;
+    time(&rawtime);
+    char event_id[32];
+    strftime(event_id, sizeof event_id, "%Y-%m-%d %H:%M:%S", localtime(&rawtime));
+
+    int thread_id = 0;
+    for (const auto &thread_ctx : thread_map_) {
+      for (const auto &event_group : thread_ctx.second.events) {
+        for (const auto &event_info : event_group.second) {
+          CHECK(event_info.is_finished) << "Unfinished profiling event";
+
+          os << std::setprecision(12)
+             << event_id << ","
+             << thread_id << "," << event_group.first << ",";
+
+          PrintTuple(os, event_info.payload, ",");
+
+          os << std::chrono::duration<double>(event_info.start_time - zero_time_).count()
+             << ","
+             << std::chrono::duration<double>(event_info.end_time - zero_time_).count()
+             << "\n";
+        }
+      }
+      ++thread_id;
+    }
+  }
+
+  void clear() {
+    zero_time_ = clock::now();
+    thread_map_.clear();
+  }
+
+  const std::map<std::thread::id, EventContainer> &containers() {
+    return thread_map_;
+  }
+
+  const clock::time_point &zero_time() {
+    return zero_time_;
+  }
+
+ private:
+  template<class Tuple, std::size_t N>
+  struct TuplePrinter {
+    static void Print(std::ostream &os, const Tuple &t, const std::string &sep) {
+      TuplePrinter<Tuple, N-1>::Print(os, t, sep);
+      os << std::get<N-1>(t) << sep;
+    }
+  };
+
+  template<class Tuple>
+  struct TuplePrinter<Tuple, 1> {
+    static void Print(std::ostream &os, const Tuple &t, const std::string &sep) {
+      os << std::get<0>(t) << sep;
+    }
+  };
+
+  template<class... Args>
+  static void PrintTuple(std::ostream &os, const std::tuple<Args...>& t, const std::string &sep) {
+    TuplePrinter<decltype(t), sizeof...(Args)>::Print(os, t, sep);
+  }
+
+  clock::time_point zero_time_;
+  std::map<std::thread::id, EventContainer> thread_map_;
+  Mutex mutex_;
+};
+
+extern EventProfiler<int, std::size_t> simple_profiler;
+extern EventProfiler<std::size_t> relop_profiler;
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_EVENT_PROFILER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index 50cf7f0..b90a8dc 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -21,6 +21,7 @@
 
 #include <cstddef>
 #include <memory>
+#include <set>
 #include <sstream>
 #include <string>
 #include <unordered_map>
@@ -30,6 +31,7 @@
 
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
@@ -103,6 +105,10 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
   int node_id = ++id_counter_;
   node_id_map_.emplace(input, node_id);
 
+  std::set<E::ExprId> referenced_ids;
+  for (const auto &attr : input->getReferencedAttributes()) {
+    referenced_ids.emplace(attr->id());
+  }
   for (const auto &child : input->children()) {
     visit(child);
 
@@ -113,10 +119,8 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
     edge_info.src_node_id = child_id;
     edge_info.dst_node_id = node_id;
 
-    // Print output attributes except for TableReference -- there are just too many
-    // attributes out of TableReference.
-    if (child->getPhysicalType() != P::PhysicalType::kTableReference) {
-      for (const auto &attr : child->getOutputAttributes()) {
+    for (const auto &attr : child->getOutputAttributes()) {
+      if (referenced_ids.find(attr->id()) != referenced_ids.end()) {
         edge_info.labels.emplace_back(attr->attribute_alias());
       }
     }
@@ -147,6 +151,36 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
         node_info.labels.emplace_back(
             left_attributes[i]->attribute_alias() + " = " + right_attributes[i]->attribute_alias());
       }
+      if (hash_join->left()->impliesUniqueAttributes(left_attributes)) {
+        node_info.labels.emplace_back("LEFT join attrs unique");
+      }
+      if (hash_join->right()->impliesUniqueAttributes(right_attributes)) {
+        node_info.labels.emplace_back("RIGHT join attrs unique");
+      }
+
+      const auto &bf_config = hash_join->bloom_filter_config();
+      for (const auto &bf : bf_config.build_side_bloom_filters) {
+        node_info.labels.emplace_back(
+            std::string("[BF build] ") + bf.attribute->attribute_alias());
+      }
+      for (const auto &bf : bf_config.probe_side_bloom_filters) {
+        node_info.labels.emplace_back(
+            std::string("[BF probe] ") + bf.attribute->attribute_alias());
+      }
+
+      break;
+    }
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr aggregate =
+        std::static_pointer_cast<const P::Aggregate>(input);
+      node_info.labels.emplace_back(input->getName());
+
+      const auto &bf_config = aggregate->bloom_filter_config();
+      for (const auto &bf : bf_config.probe_side_bloom_filters) {
+        node_info.labels.emplace_back(
+            std::string("[BF probe] ") + bf.attribute->attribute_alias());
+      }
+
       break;
     }
     default: {