You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/18 17:21:19 UTC
[11/12] incubator-quickstep git commit: Add LIPFilter feature.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index cbbfc22..a80bcb0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -41,6 +41,7 @@ class AggregateFunction;
class CatalogDatabaseLite;
class CatalogRelationSchema;
class InsertDestination;
+class LIPFilterAdaptiveProber;
class StorageManager;
/** \addtogroup Storage
@@ -156,7 +157,8 @@ class AggregationOperationState {
* @param input_block The block ID of the storage block where the aggreates
* are going to be computed.
**/
- void aggregateBlock(const block_id input_block);
+ void aggregateBlock(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
/**
* @brief Generate the final results for the aggregates managed by this
@@ -179,8 +181,10 @@ class AggregationOperationState {
const std::vector<std::unique_ptr<AggregationState>> &local_state);
// Aggregate on input block.
- void aggregateBlockSingleState(const block_id input_block);
- void aggregateBlockHashTable(const block_id input_block);
+ void aggregateBlockSingleState(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
+ void aggregateBlockHashTable(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
void finalizeSingleState(InsertDestination *output_destination);
void finalizeHashTable(InsertDestination *output_destination);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index f05cc46..e85e005 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -643,7 +643,6 @@ target_link_libraries(quickstep_storage_FastHashTable
quickstep_threading_SpinSharedMutex
quickstep_types_Type
quickstep_types_TypedValue
- quickstep_utility_BloomFilter
quickstep_utility_HashPair
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_FastHashTableFactory
@@ -659,7 +658,6 @@ target_link_libraries(quickstep_storage_FastHashTableFactory
quickstep_storage_SimpleScalarSeparateChainingHashTable
quickstep_storage_TupleReference
quickstep_types_TypeFactory
- quickstep_utility_BloomFilter
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
quickstep_storage_FastHashTable
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/FastHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp
index 4a95cd9..74d9ee3 100644
--- a/storage/FastHashTable.hpp
+++ b/storage/FastHashTable.hpp
@@ -39,7 +39,6 @@
#include "threading/SpinSharedMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
-#include "utility/BloomFilter.hpp"
#include "utility/HashPair.hpp"
#include "utility/Macros.hpp"
@@ -958,62 +957,6 @@ class FastHashTable : public HashTableBase<resizable,
template <typename FunctorT>
std::size_t forEachCompositeKeyFast(FunctorT *functor, int index) const;
- /**
- * @brief A call to this function will cause a bloom filter to be built
- * during the build phase of this hash table.
- **/
- inline void enableBuildSideBloomFilter() {
- has_build_side_bloom_filter_ = true;
- }
-
- /**
- * @brief A call to this function will cause a set of bloom filters to be
- * probed during the probe phase of this hash table.
- **/
- inline void enableProbeSideBloomFilter() {
- has_probe_side_bloom_filter_ = true;
- }
-
- /**
- * @brief This function sets the pointer to the bloom filter to be
- * used during the build phase of this hash table.
- * @warning Should call enable_build_side_bloom_filter() first to enable
- * bloom filter usage during build phase.
- * @note The ownership of the bloom filter lies with the caller.
- *
- * @param bloom_filter The pointer to the bloom filter.
- **/
- inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) {
- build_bloom_filter_ = bloom_filter;
- }
-
- /**
- * @brief This function adds a pointer to the list of bloom filters to be
- * used during the probe phase of this hash table.
- * @warning Should call enable_probe_side_bloom_filter() first to enable
- * bloom filter usage during probe phase.
- * @note The ownership of the bloom filter lies with the caller.
- *
- * @param bloom_filter The pointer to the bloom filter.
- **/
- inline void addProbeSideBloomFilter(const BloomFilter *bloom_filter) {
- probe_bloom_filters_.emplace_back(bloom_filter);
- }
-
- /**
- * @brief This function adds a vector of attribute ids corresponding to a
- * bloom filter used during the probe phase of this hash table.
- * @warning Should call enable_probe_side_bloom_filter() first to enable
- * bloom filter usage during probe phase.
- *
- * @param probe_attribute_ids The vector of attribute ids to use for probing
- * the bloom filter.
- **/
- inline void addProbeSideAttributeIds(
- std::vector<attribute_id> &&probe_attribute_ids) {
- probe_attribute_ids_.push_back(probe_attribute_ids);
- }
-
protected:
/**
* @brief Constructor for new resizable hash table.
@@ -1318,12 +1261,6 @@ class FastHashTable : public HashTableBase<resizable,
const attribute_id key_attr_id,
FunctorT *functor) const;
- // Data structures used for bloom filter optimized semi-joins.
- bool has_build_side_bloom_filter_ = false;
- bool has_probe_side_bloom_filter_ = false;
- BloomFilter *build_bloom_filter_;
- std::vector<const BloomFilter *> probe_bloom_filters_;
- std::vector<std::vector<attribute_id>> probe_attribute_ids_;
DISALLOW_COPY_AND_ASSIGN(FastHashTable);
};
@@ -1449,13 +1386,6 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
total_entries, total_variable_key_size, &prealloc_state);
}
}
- std::unique_ptr<BloomFilter> thread_local_bloom_filter;
- if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter.reset(
- new BloomFilter(build_bloom_filter_->getRandomSeed(),
- build_bloom_filter_->getNumberOfHashes(),
- build_bloom_filter_->getBitArraySize()));
- }
if (resizable) {
while (result == HashTablePutResult::kOutOfSpace) {
{
@@ -1474,12 +1404,6 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
variable_size,
(*functor)(*accessor),
using_prealloc ? &prealloc_state : nullptr);
- // Insert into bloom filter, if enabled.
- if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter->insertUnSafe(
- static_cast<const std::uint8_t *>(key.getDataPtr()),
- key.getDataSize());
- }
if (result == HashTablePutResult::kDuplicateKey) {
DEBUG_ASSERT(!using_prealloc);
return result;
@@ -1507,22 +1431,11 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
variable_size,
(*functor)(*accessor),
using_prealloc ? &prealloc_state : nullptr);
- // Insert into bloom filter, if enabled.
- if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter->insertUnSafe(
- static_cast<const std::uint8_t *>(key.getDataPtr()),
- key.getDataSize());
- }
if (result != HashTablePutResult::kOK) {
return result;
}
}
}
- // Update the build side bloom filter with thread local copy, if
- // available.
- if (has_build_side_bloom_filter_) {
- build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get());
- }
return HashTablePutResult::kOK;
});
@@ -2462,52 +2375,27 @@ void FastHashTable<resizable,
InvokeOnAnyValueAccessor(
accessor,
[&](auto *accessor) -> void { // NOLINT(build/c++11)
- while (accessor->next()) {
- // Probe any bloom filters, if enabled.
- if (has_probe_side_bloom_filter_) {
- DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size());
- // Check if the key is contained in the BloomFilters or not.
- bool bloom_miss = false;
- for (std::size_t i = 0;
- i < probe_bloom_filters_.size() && !bloom_miss;
- ++i) {
- const BloomFilter *bloom_filter = probe_bloom_filters_[i];
- for (const attribute_id &attr_id : probe_attribute_ids_[i]) {
- TypedValue bloom_key = accessor->getTypedValue(attr_id);
- if (!bloom_filter->contains(static_cast<const std::uint8_t *>(
- bloom_key.getDataPtr()),
- bloom_key.getDataSize())) {
- bloom_miss = true;
- break;
- }
- }
- }
- if (bloom_miss) {
- continue; // On a bloom filter miss, probing the hash table can
- // be skipped.
- }
- }
-
- TypedValue key = accessor->getTypedValue(key_attr_id);
- if (check_for_null_keys && key.isNull()) {
- continue;
- }
- const std::size_t true_hash = use_scalar_literal_hash_template
- ? key.getHashScalarLiteral()
- : key.getHash();
- const std::size_t adjusted_hash =
- adjust_hashes_template ? this->AdjustHash(true_hash) : true_hash;
- std::size_t entry_num = 0;
- const std::uint8_t *value;
- while (this->getNextEntryForKey(
- key, adjusted_hash, &value, &entry_num)) {
- (*functor)(*accessor, *value);
- if (!allow_duplicate_keys) {
- break;
- }
- }
+ while (accessor->next()) {
+ TypedValue key = accessor->getTypedValue(key_attr_id);
+ if (check_for_null_keys && key.isNull()) {
+ continue;
+ }
+ const std::size_t true_hash = use_scalar_literal_hash_template
+ ? key.getHashScalarLiteral()
+ : key.getHash();
+ const std::size_t adjusted_hash =
+ adjust_hashes_template ? this->AdjustHash(true_hash) : true_hash;
+ std::size_t entry_num = 0;
+ const std::uint8_t *value;
+ while (this->getNextEntryForKey(
+ key, adjusted_hash, &value, &entry_num)) {
+ (*functor)(*accessor, *value);
+ if (!allow_duplicate_keys) {
+ break;
}
- });
+ }
+ }
+ });
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/FastHashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTableFactory.hpp b/storage/FastHashTableFactory.hpp
index 6d0b693..682cc2a 100644
--- a/storage/FastHashTableFactory.hpp
+++ b/storage/FastHashTableFactory.hpp
@@ -32,7 +32,6 @@
#include "storage/SimpleScalarSeparateChainingHashTable.hpp"
#include "storage/TupleReference.hpp"
#include "types/TypeFactory.hpp"
-#include "utility/BloomFilter.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
@@ -183,14 +182,11 @@ class FastHashTableFactory {
* @param proto A protobuf description of a resizable HashTable.
* @param storage_manager The StorageManager to use (a StorageBlob will be
* allocated to hold the HashTable's contents).
- * @param bloom_filters A vector of pointers to bloom filters that may be used
- * during hash table construction in build/probe phase.
* @return A new resizable HashTable with parameters specified by proto.
**/
static FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>*
CreateResizableFromProto(const serialization::HashTable &proto,
- StorageManager *storage_manager,
- const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
+ StorageManager *storage_manager) {
DCHECK(ProtoIsValid(proto))
<< "Attempted to create HashTable from invalid proto description:\n"
<< proto.DebugString();
@@ -204,35 +200,6 @@ class FastHashTableFactory {
key_types,
proto.estimated_num_entries(),
storage_manager);
-
- // TODO(ssaurabh): These lazy initializations can be moved from here and pushed to the
- // individual implementations of the hash table constructors.
-
- // Check if there are any build side bloom filter defined on the hash table.
- if (proto.build_side_bloom_filter_id_size() > 0) {
- hash_table->enableBuildSideBloomFilter();
- hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
- }
-
- // Check if there are any probe side bloom filters defined on the hash table.
- if (proto.probe_side_bloom_filters_size() > 0) {
- hash_table->enableProbeSideBloomFilter();
- // Add as many probe bloom filters as defined by the proto.
- for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
- // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
- const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
- hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
-
- // Add the attribute ids corresponding to this probe bloom filter.
- std::vector<attribute_id> probe_attribute_ids;
- for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
- const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
- probe_attribute_ids.push_back(probe_attribute_id);
- }
- hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
- }
- }
-
return hash_table;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index f2dcb03..786a9bb 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -981,61 +981,6 @@ class HashTable : public HashTableBase<resizable,
template <typename FunctorT>
std::size_t forEachCompositeKey(FunctorT *functor) const;
- /**
- * @brief A call to this function will cause a bloom filter to be built
- * during the build phase of this hash table.
- **/
- inline void enableBuildSideBloomFilter() {
- has_build_side_bloom_filter_ = true;
- }
-
- /**
- * @brief A call to this function will cause a set of bloom filters to be
- * probed during the probe phase of this hash table.
- **/
- inline void enableProbeSideBloomFilter() {
- has_probe_side_bloom_filter_ = true;
- }
-
- /**
- * @brief This function sets the pointer to the bloom filter to be
- * used during the build phase of this hash table.
- * @warning Should call enable_build_side_bloom_filter() first to enable
- * bloom filter usage during build phase.
- * @note The ownership of the bloom filter lies with the caller.
- *
- * @param bloom_filter The pointer to the bloom filter.
- **/
- inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) {
- build_bloom_filter_ = bloom_filter;
- }
-
- /**
- * @brief This function adds a pointer to the list of bloom filters to be
- * used during the probe phase of this hash table.
- * @warning Should call enable_probe_side_bloom_filter() first to enable
- * bloom filter usage during probe phase.
- * @note The ownership of the bloom filter lies with the caller.
- *
- * @param bloom_filter The pointer to the bloom filter.
- **/
- inline void addProbeSideBloomFilter(const BloomFilter *bloom_filter) {
- probe_bloom_filters_.emplace_back(bloom_filter);
- }
-
- /**
- * @brief This function adds a vector of attribute ids corresponding to a
- * bloom filter used during the probe phase of this hash table.
- * @warning Should call enable_probe_side_bloom_filter() first to enable
- * bloom filter usage during probe phase.
- *
- * @param probe_attribute_ids The vector of attribute ids to use for probing
- * the bloom filter.
- **/
- inline void addProbeSideAttributeIds(std::vector<attribute_id> &&probe_attribute_ids) {
- probe_attribute_ids_.push_back(probe_attribute_ids);
- }
-
protected:
/**
* @brief Constructor for new resizable hash table.
@@ -1316,13 +1261,6 @@ class HashTable : public HashTableBase<resizable,
const attribute_id key_attr_id,
FunctorT *functor) const;
- // Data structures used for bloom filter optimized semi-joins.
- bool has_build_side_bloom_filter_ = false;
- bool has_probe_side_bloom_filter_ = false;
- BloomFilter *build_bloom_filter_;
- std::vector<const BloomFilter*> probe_bloom_filters_;
- std::vector<std::vector<attribute_id>> probe_attribute_ids_;
-
DISALLOW_COPY_AND_ASSIGN(HashTable);
};
@@ -1467,12 +1405,6 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
&prealloc_state);
}
}
- std::unique_ptr<BloomFilter> thread_local_bloom_filter;
- if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter.reset(new BloomFilter(build_bloom_filter_->getRandomSeed(),
- build_bloom_filter_->getNumberOfHashes(),
- build_bloom_filter_->getBitArraySize()));
- }
if (resizable) {
while (result == HashTablePutResult::kOutOfSpace) {
{
@@ -1488,11 +1420,6 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
variable_size,
(*functor)(*accessor),
using_prealloc ? &prealloc_state : nullptr);
- // Insert into bloom filter, if enabled.
- if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
- key.getDataSize());
- }
if (result == HashTablePutResult::kDuplicateKey) {
DEBUG_ASSERT(!using_prealloc);
return result;
@@ -1518,20 +1445,11 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
variable_size,
(*functor)(*accessor),
using_prealloc ? &prealloc_state : nullptr);
- // Insert into bloom filter, if enabled.
- if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
- key.getDataSize());
- }
if (result != HashTablePutResult::kOK) {
return result;
}
}
}
- // Update the build side bloom filter with thread local copy, if available.
- if (has_build_side_bloom_filter_) {
- build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get());
- }
return HashTablePutResult::kOK;
});
@@ -2237,27 +2155,6 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
accessor,
[&](auto *accessor) -> void { // NOLINT(build/c++11)
while (accessor->next()) {
- // Probe any bloom filters, if enabled.
- if (has_probe_side_bloom_filter_) {
- DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size());
- // Check if the key is contained in the BloomFilters or not.
- bool bloom_miss = false;
- for (std::size_t i = 0; i < probe_bloom_filters_.size() && !bloom_miss; ++i) {
- const BloomFilter *bloom_filter = probe_bloom_filters_[i];
- for (const attribute_id &attr_id : probe_attribute_ids_[i]) {
- TypedValue bloom_key = accessor->getTypedValue(attr_id);
- if (!bloom_filter->contains(static_cast<const std::uint8_t*>(bloom_key.getDataPtr()),
- bloom_key.getDataSize())) {
- bloom_miss = true;
- break;
- }
- }
- }
- if (bloom_miss) {
- continue; // On a bloom filter miss, probing the hash table can be skipped.
- }
- }
-
TypedValue key = accessor->getTypedValue(key_attr_id);
if (check_for_null_keys && key.isNull()) {
continue;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index ade30d8..1d4ccb0 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -34,10 +34,4 @@ message HashTable {
required HashTableImplType hash_table_impl_type = 1;
repeated Type key_types = 2;
required uint64 estimated_num_entries = 3;
- repeated uint32 build_side_bloom_filter_id = 4;
- message ProbeSideBloomFilter {
- required uint32 probe_side_bloom_filter_id = 1;
- repeated uint32 probe_side_attr_ids = 2;
- }
- repeated ProbeSideBloomFilter probe_side_bloom_filters = 6;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 40b39de..d690557 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -295,14 +295,11 @@ class HashTableFactory {
* @param proto A protobuf description of a resizable HashTable.
* @param storage_manager The StorageManager to use (a StorageBlob will be
* allocated to hold the HashTable's contents).
- * @param bloom_filters A vector of pointers to bloom filters that may be used
- * during hash table construction in build/probe phase.
* @return A new resizable HashTable with parameters specified by proto.
**/
static HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>*
CreateResizableFromProto(const serialization::HashTable &proto,
- StorageManager *storage_manager,
- const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
+ StorageManager *storage_manager) {
DCHECK(ProtoIsValid(proto))
<< "Attempted to create HashTable from invalid proto description:\n"
<< proto.DebugString();
@@ -316,35 +313,6 @@ class HashTableFactory {
key_types,
proto.estimated_num_entries(),
storage_manager);
-
- // TODO(ssaurabh): These lazy initializations can be moved from here and pushed to the
- // individual implementations of the hash table constructors.
-
- // Check if there are any build side bloom filter defined on the hash table.
- if (proto.build_side_bloom_filter_id_size() > 0) {
- hash_table->enableBuildSideBloomFilter();
- hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
- }
-
- // Check if there are any probe side bloom filters defined on the hash table.
- if (proto.probe_side_bloom_filters_size() > 0) {
- hash_table->enableProbeSideBloomFilter();
- // Add as many probe bloom filters as defined by the proto.
- for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
- // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
- const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
- hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
-
- // Add the attribute ids corresponding to this probe bloom filter.
- std::vector<attribute_id> probe_attribute_ids;
- for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
- const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
- probe_attribute_ids.push_back(probe_attribute_id);
- }
- hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
- }
- }
-
return hash_table;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index ec5990f..7c16c34 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -60,6 +60,7 @@
#include "types/containers/Tuple.hpp"
#include "types/operations/comparisons/ComparisonUtil.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "glog/logging.h"
@@ -341,20 +342,30 @@ void StorageBlock::sample(const bool is_block_sample,
void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const {
+ InsertDestinationInterface *destination,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const {
ColumnVectorsValueAccessor temp_result;
{
SubBlocksReference sub_blocks_ref(*tuple_store_,
indices_,
indices_consistent_);
+ std::unique_ptr<ValueAccessor> base_accessor(tuple_store_->createValueAccessor());
std::unique_ptr<TupleIdSequence> matches;
+
+ if (lip_filter_adaptive_prober != nullptr) {
+ matches.reset(lip_filter_adaptive_prober->filterValueAccessor(base_accessor.get()));
+ }
+
+ if (predicate != nullptr) {
+ matches.reset(getMatchesForPredicate(predicate));
+ }
+
std::unique_ptr<ValueAccessor> accessor;
- if (predicate == nullptr) {
- accessor.reset(tuple_store_->createValueAccessor());
+ if (matches == nullptr) {
+ accessor.reset(base_accessor.release());
} else {
- matches.reset(getMatchesForPredicate(predicate));
- accessor.reset(tuple_store_->createValueAccessor(matches.get()));
+ accessor.reset(base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
}
for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection.begin();
@@ -371,14 +382,24 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
void StorageBlock::selectSimple(const std::vector<attribute_id> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const {
- std::unique_ptr<ValueAccessor> accessor;
+ InsertDestinationInterface *destination,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const {
+ std::unique_ptr<ValueAccessor> base_accessor(tuple_store_->createValueAccessor());
std::unique_ptr<TupleIdSequence> matches;
- if (predicate == nullptr) {
- accessor.reset(tuple_store_->createValueAccessor());
- } else {
+
+ if (lip_filter_adaptive_prober != nullptr) {
+ matches.reset(lip_filter_adaptive_prober->filterValueAccessor(base_accessor.get()));
+ }
+
+ if (predicate != nullptr) {
matches.reset(getMatchesForPredicate(predicate));
- accessor.reset(tuple_store_->createValueAccessor(matches.get()));
+ }
+
+ std::unique_ptr<ValueAccessor> accessor;
+ if (matches == nullptr) {
+ accessor.reset(base_accessor.release());
+ } else {
+ accessor.reset(base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
}
destination->bulkInsertTuplesWithRemappedAttributes(selection,
@@ -389,37 +410,28 @@ AggregationState* StorageBlock::aggregate(
const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
- const Predicate *predicate,
- std::unique_ptr<TupleIdSequence> *reuse_matches) const {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this same
- // block.
- if (predicate && !*reuse_matches) {
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
+ const TupleIdSequence *filter) const {
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all the arguments to this aggregate are plain relation attributes,
// aggregate directly on a ValueAccessor from this block to avoid a copy.
if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
<< "Mismatch between number of arguments and number of attribute_ids";
- return aggregateHelperValueAccessor(handle, *arguments_as_attributes, reuse_matches->get());
+ return aggregateHelperValueAccessor(handle, *arguments_as_attributes, filter);
}
// TODO(shoban): We may want to optimize for ScalarLiteral here.
#endif
// Call aggregateHelperColumnVector() to materialize each argument as a
// ColumnVector, then aggregate over those.
- return aggregateHelperColumnVector(handle, arguments, reuse_matches->get());
+ return aggregateHelperColumnVector(handle, arguments, filter);
}
void StorageBlock::aggregateGroupBy(
const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
DCHECK_GT(group_by.size(), 0u)
<< "Called aggregateGroupBy() with zero GROUP BY expressions";
@@ -438,23 +450,7 @@ void StorageBlock::aggregateGroupBy(
// this aggregate, as well as the GROUP BY expression values.
ColumnVectorsValueAccessor temp_result;
{
- std::unique_ptr<ValueAccessor> accessor;
- if (predicate) {
- if (!*reuse_matches) {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this
- // same block.
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
- // Create a filtered ValueAccessor that only iterates over predicate
- // matches.
- accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
- } else {
- // Create a ValueAccessor that iterates over all tuples in this block
- accessor.reset(tuple_store_->createValueAccessor());
- }
-
+ std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
attribute_id attr_id = 0;
// First, put GROUP BY keys into 'temp_result'.
@@ -503,9 +499,8 @@ void StorageBlock::aggregateDistinct(
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *distinctify_hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
DCHECK_GT(arguments.size(), 0u)
<< "Called aggregateDistinct() with zero argument expressions";
@@ -517,22 +512,7 @@ void StorageBlock::aggregateDistinct(
// this aggregate, as well as the GROUP BY expression values.
ColumnVectorsValueAccessor temp_result;
{
- std::unique_ptr<ValueAccessor> accessor;
- if (predicate) {
- if (!*reuse_matches) {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this
- // same block.
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
- // Create a filtered ValueAccessor that only iterates over predicate
- // matches.
- accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
- } else {
- // Create a ValueAccessor that iterates over all tuples in this block
- accessor.reset(tuple_store_->createValueAccessor());
- }
+ std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all the arguments to this aggregate are plain relation attributes,
@@ -1246,23 +1226,36 @@ bool StorageBlock::rebuildIndexes(bool short_circuit) {
return all_indices_consistent_;
}
-TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate) const {
+TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate,
+ const TupleIdSequence *filter) const {
if (predicate == nullptr) {
- return tuple_store_->getExistenceMap();
+ TupleIdSequence *matched = tuple_store_->getExistenceMap();
+ if (filter != nullptr) {
+ matched->intersectWith(*filter);
+ }
+ return matched;
}
std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor());
- std::unique_ptr<TupleIdSequence> existence_map;
- if (!tuple_store_->isPacked()) {
- existence_map.reset(tuple_store_->getExistenceMap());
- }
SubBlocksReference sub_blocks_ref(*tuple_store_,
indices_,
indices_consistent_);
- return predicate->getAllMatches(value_accessor.get(),
- &sub_blocks_ref,
- nullptr,
- existence_map.get());
+
+ if (!tuple_store_->isPacked()) {
+ std::unique_ptr<TupleIdSequence> existence_map(tuple_store_->getExistenceMap());
+ if (filter != nullptr) {
+ existence_map->intersectWith(*filter);
+ }
+ return predicate->getAllMatches(value_accessor.get(),
+ &sub_blocks_ref,
+ nullptr,
+ existence_map.get());
+ } else {
+ return predicate->getAllMatches(value_accessor.get(),
+ &sub_blocks_ref,
+ nullptr,
+ filter);
+ }
}
std::unordered_map<attribute_id, TypedValue>* StorageBlock::generateUpdatedValues(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index bab5bab..77fb137 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -44,6 +44,7 @@ class AggregationState;
class CatalogRelationSchema;
class ColumnVector;
class InsertDestinationInterface;
+class LIPFilterAdaptiveProber;
class Predicate;
class Scalar;
class StorageBlockLayout;
@@ -312,6 +313,9 @@ class StorageBlock : public StorageBlockBase {
const std::vector<attribute_id> &attribute_map,
ValueAccessor *accessor);
+ TupleIdSequence* getMatchesForPredicate(const Predicate *predicate,
+ const TupleIdSequence *filter = nullptr) const;
+
/**
* @brief Perform a random sampling of data on the StorageBlock. The number
* of records sampled is determined by the sample percentage in case of
@@ -349,7 +353,8 @@ class StorageBlock : public StorageBlockBase {
**/
void select(const std::vector<std::unique_ptr<const Scalar>> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const;
+ InsertDestinationInterface *destination,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const;
/**
* @brief Perform a simple SELECT query on this StorageBlock which only
@@ -372,7 +377,8 @@ class StorageBlock : public StorageBlockBase {
**/
void selectSimple(const std::vector<attribute_id> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const;
+ InsertDestinationInterface *destination,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const;
/**
* @brief Perform non GROUP BY aggregation on the tuples in the this storage
@@ -412,8 +418,7 @@ class StorageBlock : public StorageBlockBase {
const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
- const Predicate *predicate,
- std::unique_ptr<TupleIdSequence> *reuse_matches) const;
+ const TupleIdSequence *filter) const;
/**
* @brief Perform GROUP BY aggregation on the tuples in the this storage
@@ -461,9 +466,8 @@ class StorageBlock : public StorageBlockBase {
void aggregateGroupBy(
const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
/**
@@ -505,9 +509,8 @@ class StorageBlock : public StorageBlockBase {
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *distinctify_hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
/**
@@ -627,8 +630,6 @@ class StorageBlock : public StorageBlockBase {
// StorageBlock's header.
bool rebuildIndexes(bool short_circuit);
- TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
-
std::unordered_map<attribute_id, TypedValue>* generateUpdatedValues(
const ValueAccessor &accessor,
const tuple_id tuple,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/DAG.hpp
----------------------------------------------------------------------
diff --git a/utility/DAG.hpp b/utility/DAG.hpp
index a1f2619..b35f2b5 100644
--- a/utility/DAG.hpp
+++ b/utility/DAG.hpp
@@ -293,8 +293,10 @@ class DAG {
* node at node_index.
**/
inline void addDependent(const size_type_nodes node_index, const LinkMetadataT &link_metadata) {
- DCHECK(dependents_with_metadata_.find(node_index) == dependents_with_metadata_.end());
- dependents_with_metadata_.emplace(node_index, link_metadata);
+// DCHECK(dependents_with_metadata_.find(node_index) == dependents_with_metadata_.end());
+// dependents_with_metadata_.emplace(node_index, link_metadata);
+ // TODO(jianqiao): implement upsert
+ dependents_with_metadata_[node_index] = link_metadata;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index 2232abe..d78f5cd 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -15,5 +15,44 @@
# specific language governing permissions and limitations
# under the License.
+QS_PROTOBUF_GENERATE_CPP(utility_lipfilter_LIPFilter_proto_srcs
+ utility_lipfilter_LIPFilter_proto_hdrs
+ LIPFilter.proto)
+
# Declare micro-libs:
-add_library(quickstep_utility_lipfilter_LIPFilter ../../empty_src.cpp LIPFilter.hpp)
\ No newline at end of file
+add_library(quickstep_utility_lipfilter_LIPFilter LIPFilter.cpp LIPFilter.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.cpp LIPFilterAdaptiveProber.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterDeployment LIPFilterDeployment.cpp LIPFilterDeployment.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterFactory LIPFilterFactory.cpp LIPFilterFactory.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilter_proto
+ ${utility_lipfilter_LIPFilter_proto_srcs})
+add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_utility_lipfilter_LIPFilter
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+ quickstep_catalog_CatalogTypedefs
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterBuilder
+ quickstep_catalog_CatalogTypedefs
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeployment
+ quickstep_catalog_CatalogTypedefs
+ quickstep_types_TypeFactory
+ quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilter
+ quickstep_utility_lipfilter_LIPFilter_proto)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory
+ quickstep_utility_lipfilter_LIPFilter
+ quickstep_utility_lipfilter_LIPFilter_proto
+ quickstep_utility_lipfilter_SingleIdentityHashFilter
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto
+ ${PROTOBUF_LIBRARY}
+ quickstep_types_Type_proto)
+target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
+ quickstep_storage_StorageConstants
+ quickstep_utility_lipfilter_LIPFilter
+ quickstep_utility_Macros)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilter.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.cpp b/utility/lip_filter/LIPFilter.cpp
new file mode 100644
index 0000000..92bfab1
--- /dev/null
+++ b/utility/lip_filter/LIPFilter.cpp
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilter.hpp"
+
+namespace quickstep {
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.hpp b/utility/lip_filter/LIPFilter.hpp
index 33165ed..0df3c18 100644
--- a/utility/lip_filter/LIPFilter.hpp
+++ b/utility/lip_filter/LIPFilter.hpp
@@ -20,8 +20,20 @@
#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
+#include <cstddef>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
namespace quickstep {
+class Type;
+class ValueAccessor;
+
/** \addtogroup Utility
* @{
*/
@@ -32,6 +44,35 @@ enum class LIPFilterType {
kSingleIdentityHashFilter
};
+class LIPFilter {
+ public:
+ LIPFilterType getType() const {
+ return type_;
+ }
+
+ virtual void insertValueAccessor(ValueAccessor *accessor,
+ const attribute_id attr_id,
+ const Type *attr_type) = 0;
+
+ virtual std::size_t filterBatch(ValueAccessor *accessor,
+ const attribute_id attr_id,
+ const bool is_attr_nullable,
+ std::vector<tuple_id> *batch,
+ const std::size_t batch_size) const = 0;
+
+ virtual std::size_t onesCount() const = 0;
+
+ protected:
+ LIPFilter(const LIPFilterType &type)
+ : type_(type) {
+ }
+
+ private:
+ LIPFilterType type_;
+
+ DISALLOW_COPY_AND_ASSIGN(LIPFilter);
+};
+
/** @} */
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilter.proto
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.proto b/utility/lip_filter/LIPFilter.proto
new file mode 100644
index 0000000..def13dd
--- /dev/null
+++ b/utility/lip_filter/LIPFilter.proto
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+import "types/Type.proto";
+
+enum LIPFilterType {
+ BLOOM_FILTER = 1;
+ EXACT_FILTER = 2;
+ SINGLE_IDENTITY_HASH_FILTER = 3;
+}
+
+message LIPFilter {
+ required LIPFilterType lip_filter_type = 1;
+
+ extensions 16 to max;
+}
+
+message SingleIdentityHashFilter {
+ extend LIPFilter {
+ // All required
+ optional uint64 filter_cardinality = 16;
+ optional uint64 attribute_size = 17;
+ }
+}
+
+enum LIPFilterActionType {
+ BUILD = 1;
+ PROBE = 2;
+}
+
+message LIPFilterDeployment {
+ message Entry {
+ required uint32 lip_filter_id = 1;
+ required int32 attribute_id = 2;
+ required Type attribute_type = 3;
+ }
+
+ required LIPFilterActionType action_type = 1;
+ repeated Entry entries = 2;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterAdaptiveProber.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterAdaptiveProber.hpp b/utility/lip_filter/LIPFilterAdaptiveProber.hpp
new file mode 100644
index 0000000..af42446
--- /dev/null
+++ b/utility/lip_filter/LIPFilterAdaptiveProber.hpp
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/SingleIdentityHashFilter.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ * @{
+ */
+
+class LIPFilterAdaptiveProber {
+ public:
+ LIPFilterAdaptiveProber(const std::vector<LIPFilter *> &lip_filters,
+ const std::vector<attribute_id> &attr_ids,
+ const std::vector<const Type *> &attr_types) {
+ DCHECK_EQ(lip_filters.size(), attr_ids.size());
+ DCHECK_EQ(lip_filters.size(), attr_types.size());
+
+ probe_entries_.reserve(lip_filters.size());
+ for (std::size_t i = 0; i < lip_filters.size(); ++i) {
+ probe_entries_.emplace_back(
+ new ProbeEntry(lip_filters[i], attr_ids[i], attr_types[i]));
+ }
+ }
+
+ ~LIPFilterAdaptiveProber() {
+ for (ProbeEntry *entry : probe_entries_) {
+ delete entry;
+ }
+ }
+
+ TupleIdSequence* filterValueAccessor(ValueAccessor *accessor) {
+ const TupleIdSequence *existence_map = accessor->getTupleIdSequenceVirtual();
+ if (existence_map == nullptr) {
+ return filterValueAccessorNoExistenceMap(accessor);
+ } else {
+ return filterValueAccessorWithExistenceMap(accessor, existence_map);
+ }
+ }
+
+ private:
+ struct ProbeEntry {
+ ProbeEntry(const LIPFilter *lip_filter_in,
+ const attribute_id attr_id_in,
+ const Type *attr_type_in)
+ : lip_filter(lip_filter_in),
+ attr_id(attr_id_in),
+ attr_type(attr_type_in),
+ miss(0),
+ cnt(0) {
+ }
+ static bool isBetterThan(const ProbeEntry *a,
+ const ProbeEntry *b) {
+ return a->miss_rate > b->miss_rate;
+ }
+ const LIPFilter *lip_filter;
+ const attribute_id attr_id;
+ const Type *attr_type;
+ std::uint32_t miss;
+ std::uint32_t cnt;
+ float miss_rate;
+ };
+
+
+ inline TupleIdSequence* filterValueAccessorNoExistenceMap(ValueAccessor *accessor) {
+ const std::uint32_t num_tuples = accessor->getNumTuplesVirtual();
+ std::unique_ptr<TupleIdSequence> matches(new TupleIdSequence(num_tuples));
+ std::uint32_t next_batch_size = 64u;
+ std::vector<tuple_id> batch(num_tuples);
+
+ std::uint32_t batch_start = 0;
+ do {
+ const std::uint32_t batch_size =
+ std::min(next_batch_size, num_tuples - batch_start);
+ for (std::uint32_t i = 0; i < batch_size; ++i) {
+ batch[i] = batch_start + i;
+ }
+
+ const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size);
+ for (std::uint32_t i = 0; i < num_hits; ++i) {
+ matches->set(batch[i], true);
+ }
+
+ batch_start += batch_size;
+ next_batch_size *= 2;
+ } while (batch_start < num_tuples);
+
+ return matches.release();
+ }
+
+ inline TupleIdSequence* filterValueAccessorWithExistenceMap(ValueAccessor *accessor,
+ const TupleIdSequence *existence_map) {
+ std::unique_ptr<TupleIdSequence> matches(
+ new TupleIdSequence(existence_map->length()));
+ std::uint32_t next_batch_size = 64u;
+ std::uint32_t num_tuples_left = existence_map->numTuples();
+ std::vector<tuple_id> batch(num_tuples_left);
+
+ TupleIdSequence::const_iterator tuple_it = existence_map->before_begin();
+ do {
+ const std::uint32_t batch_size =
+ next_batch_size < num_tuples_left ? next_batch_size : num_tuples_left;
+ for (std::uint32_t i = 0; i < batch_size; ++i) {
+ ++tuple_it;
+ batch[i] = *tuple_it;
+ }
+
+ const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size);
+ for (std::uint32_t i = 0; i < num_hits; ++i) {
+ matches->set(batch[i], true);
+ }
+
+ num_tuples_left -= batch_size;
+ next_batch_size *= 2;
+ } while (num_tuples_left > 0);
+
+ return matches.release();
+ }
+
+ inline std::size_t filterBatch(ValueAccessor *accessor,
+ std::vector<tuple_id> *batch,
+ std::uint32_t batch_size) {
+ for (auto *entry : probe_entries_) {
+ const std::uint32_t out_size =
+ entry->lip_filter->filterBatch(accessor,
+ entry->attr_id,
+ entry->attr_type->isNullable(),
+ batch,
+ batch_size);
+ entry->cnt += batch_size;
+ entry->miss += batch_size - out_size;
+ batch_size = out_size;
+ }
+ adaptEntryOrder();
+ return batch_size;
+ }
+
+ inline void adaptEntryOrder() {
+ for (auto &entry : probe_entries_) {
+ entry->miss_rate = static_cast<float>(entry->miss) / entry->cnt;
+ }
+ std::sort(probe_entries_.begin(),
+ probe_entries_.end(),
+ ProbeEntry::isBetterThan);
+ }
+
+ std::vector<ProbeEntry *> probe_entries_;
+
+ DISALLOW_COPY_AND_ASSIGN(LIPFilterAdaptiveProber);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterBuilder.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterBuilder.hpp b/utility/lip_filter/LIPFilterBuilder.hpp
new file mode 100644
index 0000000..0a2d465
--- /dev/null
+++ b/utility/lip_filter/LIPFilterBuilder.hpp
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_
+
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class ValueAccessor;
+
+/** \addtogroup Utility
+ * @{
+ */
+
+class LIPFilterBuilder;
+typedef std::shared_ptr<LIPFilterBuilder> LIPFilterBuilderPtr;
+
+class LIPFilterBuilder {
+ public:
+ LIPFilterBuilder(const std::vector<LIPFilter *> &lip_filters,
+ const std::vector<attribute_id> &attr_ids,
+ const std::vector<const Type *> &attr_types) {
+ DCHECK_EQ(lip_filters.size(), attr_ids.size());
+ DCHECK_EQ(lip_filters.size(), attr_types.size());
+
+ build_entries_.reserve(lip_filters.size());
+ for (std::size_t i = 0; i < lip_filters.size(); ++i) {
+ build_entries_.emplace_back(lip_filters[i], attr_ids[i], attr_types[i]);
+ }
+ }
+
+ void insertValueAccessor(ValueAccessor *accessor) {
+ for (auto &entry : build_entries_) {
+ entry.lip_filter->insertValueAccessor(accessor, entry.attr_id, entry.attr_type);
+ }
+ }
+
+ private:
+ struct BuildEntry {
+ BuildEntry(LIPFilter *lip_filter_in,
+ const attribute_id attr_id_in,
+ const Type *attr_type_in)
+ : lip_filter(lip_filter_in),
+ attr_id(attr_id_in),
+ attr_type(attr_type_in) {
+ }
+ LIPFilter *lip_filter;
+ const attribute_id attr_id;
+ const Type *attr_type;
+ };
+
+ std::vector<BuildEntry> build_entries_;
+
+ DISALLOW_COPY_AND_ASSIGN(LIPFilterBuilder);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterDeployment.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.cpp b/utility/lip_filter/LIPFilterDeployment.cpp
new file mode 100644
index 0000000..0ac396b
--- /dev/null
+++ b/utility/lip_filter/LIPFilterDeployment.cpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
+
+#include "types/TypeFactory.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+LIPFilterDeployment::LIPFilterDeployment(
+ const serialization::LIPFilterDeployment &proto,
+ const std::vector<std::unique_ptr<LIPFilter>> &lip_filters) {
+ switch (proto.action_type()) {
+ case serialization::LIPFilterActionType::BUILD:
+ action_type_ = LIPFilterActionType::kBuild;
+ break;
+ case serialization::LIPFilterActionType::PROBE:
+ action_type_ = LIPFilterActionType::kProbe;
+ break;
+ default:
+ LOG(FATAL) << "Unsupported LIPFilterActionType: "
+ << serialization::LIPFilterActionType_Name(proto.action_type());
+ }
+
+ for (int i = 0; i < proto.entries_size(); ++i) {
+ const auto &entry_proto = proto.entries(i);
+ lip_filters_.emplace_back(lip_filters.at(entry_proto.lip_filter_id()).get());
+ attr_ids_.emplace_back(entry_proto.attribute_id());
+ attr_types_.emplace_back(&TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+ }
+}
+
+bool LIPFilterDeployment::ProtoIsValid(
+ const serialization::LIPFilterDeployment &proto) {
+ return true;
+}
+
+LIPFilterBuilder* LIPFilterDeployment::createLIPFilterBuilder() const {
+ DCHECK(action_type_ == LIPFilterActionType::kBuild);
+ return new LIPFilterBuilder(lip_filters_, attr_ids_, attr_types_);
+}
+
+LIPFilterAdaptiveProber* LIPFilterDeployment::createLIPFilterAdaptiveProber() const {
+ DCHECK(action_type_ == LIPFilterActionType::kProbe);
+ return new LIPFilterAdaptiveProber(lip_filters_, attr_ids_, attr_types_);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterDeployment.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.hpp b/utility/lip_filter/LIPFilterDeployment.hpp
new file mode 100644
index 0000000..d939e85
--- /dev/null
+++ b/utility/lip_filter/LIPFilterDeployment.hpp
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
+
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+
+namespace quickstep {
+
+class LIPFilterBuilder;
+class LIPFilterAdaptiveProber;
+class Type;
+
+/** \addtogroup Utility
+ * @{
+ */
+
+enum class LIPFilterActionType {
+ kBuild = 0,
+ kProbe
+};
+
+class LIPFilterDeployment {
+ public:
+ LIPFilterDeployment(const serialization::LIPFilterDeployment &proto,
+ const std::vector<std::unique_ptr<LIPFilter>> &lip_filters);
+
+ static bool ProtoIsValid(const serialization::LIPFilterDeployment &proto);
+
+ LIPFilterActionType getActionType() const {
+ return action_type_;
+ }
+
+ LIPFilterBuilder* createLIPFilterBuilder() const;
+
+ LIPFilterAdaptiveProber* createLIPFilterAdaptiveProber() const;
+
+ private:
+ LIPFilterActionType action_type_;
+ std::vector<LIPFilter *> lip_filters_;
+ std::vector<attribute_id> attr_ids_;
+ std::vector<const Type *> attr_types_;
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterFactory.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.cpp b/utility/lip_filter/LIPFilterFactory.cpp
new file mode 100644
index 0000000..f0e7725
--- /dev/null
+++ b/utility/lip_filter/LIPFilterFactory.cpp
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilterFactory.hpp"
+
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/SingleIdentityHashFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter &proto) {
+ switch (proto.lip_filter_type()) {
+ case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
+ const std::size_t attr_size =
+ proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);
+ const std::size_t filter_cardinality =
+ proto.GetExtension(serialization::SingleIdentityHashFilter::filter_cardinality);
+
+ if (attr_size >= 8) {
+ return new SingleIdentityHashFilter<std::uint64_t>(filter_cardinality);
+ } else if (attr_size >= 4) {
+ return new SingleIdentityHashFilter<std::uint32_t>(filter_cardinality);
+ } else if (attr_size >= 2) {
+ return new SingleIdentityHashFilter<std::uint16_t>(filter_cardinality);
+ } else {
+ return new SingleIdentityHashFilter<std::uint8_t>(filter_cardinality);
+ }
+ }
+ default:
+ LOG(FATAL) << "Unsupported LIP filter type: "
+ << serialization::LIPFilterType_Name(proto.lip_filter_type());
+ }
+}
+
+bool LIPFilterFactory::ProtoIsValid(const serialization::LIPFilter &proto) {
+ return true;
+}
+
+} // namespace quickstep
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/LIPFilterFactory.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.hpp b/utility/lip_filter/LIPFilterFactory.hpp
new file mode 100644
index 0000000..6a94ae4
--- /dev/null
+++ b/utility/lip_filter/LIPFilterFactory.hpp
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_
+
+#include <vector>
+
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+
+namespace quickstep {
+
+class LIPFilter;
+
+/** \addtogroup Utility
+ * @{
+ */
+
+class LIPFilterFactory {
+ public:
+ static LIPFilter* ReconstructFromProto(const serialization::LIPFilter &proto);
+
+ static bool ProtoIsValid(const serialization::LIPFilter &proto);
+
+ private:
+
+ DISALLOW_COPY_AND_ASSIGN(LIPFilterFactory);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_FACTORY_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca9c1790/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
new file mode 100644
index 0000000..0258c24
--- /dev/null
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
+
+#include <vector>
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <utility>
+#include <vector>
+
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/BitManipulation.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ * @{
+ */
+
+template <typename CppType>
+class SingleIdentityHashFilter : public LIPFilter {
+ public:
+ SingleIdentityHashFilter(const std::size_t filter_cardinality)
+ : LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
+ filter_cardinality_(filter_cardinality),
+ bit_array_(GetByteSize(filter_cardinality)) {
+ std::memset(bit_array_.data(),
+ 0x0,
+ sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
+ }
+
+ void insertValueAccessor(ValueAccessor *accessor,
+ const attribute_id attr_id,
+ const Type *attr_type) override {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ if (attr_type->isNullable()) {
+ insertValueAccessorInternal<true>(accessor, attr_id);
+ } else {
+ insertValueAccessorInternal<false>(accessor, attr_id);
+ }
+ });
+ }
+
+ std::size_t filterBatch(ValueAccessor *accessor,
+ const attribute_id attr_id,
+ const bool is_attr_nullable,
+ std::vector<tuple_id> *batch,
+ const std::size_t batch_size) const override {
+ return InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> std::size_t { // NOLINT(build/c++11)
+ if (is_attr_nullable) {
+ return filterBatchInternal<true>(accessor, attr_id, batch, batch_size);
+ } else {
+ return filterBatchInternal<false>(accessor, attr_id, batch, batch_size);
+ }
+ });
+ }
+
+ std::size_t onesCount() const override {
+ std::size_t count = 0;
+ for (std::size_t i = 0; i < bit_array_.size(); ++i) {
+ count += population_count<std::uint8_t>(bit_array_[i].load(std::memory_order_relaxed));
+ }
+ return count;
+ }
+
+ /**
+ * @brief Inserts a given value into the hash filter.
+ *
+ * @param key_begin A pointer to the value being inserted.
+ */
+ inline void insert(const void *key_begin) {
+ const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+ bit_array_[hash >> 3].fetch_or(1 << (hash & 0x7), std::memory_order_relaxed);
+ }
+
+ /**
+ * @brief Test membership of a given value in the hash filter.
+ * If true is returned, then a value may or may not be present in the hash filter.
+ * If false is returned, a value is certainly not present in the hash filter.
+ *
+ * @param key_begin A pointer to the value being tested for membership.
+ */
+ inline bool contains(const void *key_begin) const {
+ const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+ return ((bit_array_[hash >> 3].load(std::memory_order_relaxed) & (1 << (hash & 0x7))) > 0);
+ }
+
+ private:
+ inline static std::size_t GetByteSize(const std::size_t bit_size) {
+ return (bit_size + 7) / 8;
+ }
+
+ template <bool is_attr_nullable, typename ValueAccessorT>
+ inline void insertValueAccessorInternal(ValueAccessorT *accessor,
+ const attribute_id attr_id) {
+ accessor->beginIteration();
+ while (accessor->next()) {
+ const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+ if (!is_attr_nullable || value != nullptr) {
+ insert(value);
+ }
+ }
+ }
+
+ template <bool is_attr_nullable, typename ValueAccessorT>
+ inline std::size_t filterBatchInternal(const ValueAccessorT *accessor,
+ const attribute_id attr_id,
+ std::vector<tuple_id> *batch,
+ const std::size_t batch_size) const {
+ std::size_t out_size = 0;
+ for (std::size_t i = 0; i < batch_size; ++i) {
+ const tuple_id tid = batch->at(i);
+ const void *value =
+ accessor->template getUntypedValueAtAbsolutePosition(attr_id, tid);
+ if (is_attr_nullable && value == nullptr) {
+ continue;
+ }
+ if (contains(value)) {
+ batch->at(out_size) = tid;
+ ++out_size;
+ }
+ }
+ return out_size;
+ }
+
+ std::size_t filter_cardinality_;
+ alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+
+ DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_