You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/07 18:22:18 UTC
incubator-quickstep git commit: Backend updates
Repository: incubator-quickstep
Updated Branches:
refs/heads/lip-refactor 396f8576a -> 6a0f68f0f
Backend updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6a0f68f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6a0f68f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6a0f68f0
Branch: refs/heads/lip-refactor
Commit: 6a0f68f0fb13b74542c3a0bb86eb8f6d568cb121
Parents: 396f857
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Fri Oct 7 13:22:10 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Fri Oct 7 13:22:10 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 4 +-
query_execution/QueryContext.cpp | 27 ++++
query_execution/QueryContext.hpp | 31 ++--
query_execution/QueryContext.proto | 2 +-
query_optimizer/CMakeLists.txt | 3 +
query_optimizer/ExecutionGenerator.cpp | 3 +-
query_optimizer/LIPFilterGenerator.cpp | 89 +++++++++--
query_optimizer/LIPFilterGenerator.hpp | 66 ++++----
.../physical/LIPFilterConfiguration.hpp | 6 +-
relational_operators/BuildHashOperator.cpp | 22 ++-
relational_operators/BuildHashOperator.hpp | 18 ++-
relational_operators/HashJoinOperator.cpp | 31 +++-
relational_operators/HashJoinOperator.hpp | 43 +++--
relational_operators/RelationalOperator.hpp | 9 ++
storage/CMakeLists.txt | 2 -
storage/FastHashTable.hpp | 152 +++---------------
storage/FastHashTableFactory.hpp | 35 +---
utility/lip_filter/CMakeLists.txt | 15 +-
utility/lip_filter/LIPFilter.hpp | 21 +++
utility/lip_filter/LIPFilter.proto | 15 +-
utility/lip_filter/LIPFilterAdaptiveProber.hpp | 115 +++++++++++++-
utility/lip_filter/LIPFilterBuilder.hpp | 28 +++-
utility/lip_filter/LIPFilterDeployment.cpp | 69 ++++++++
utility/lip_filter/LIPFilterDeployment.hpp | 71 +++++++++
utility/lip_filter/LIPFilterDeploymentInfo.hpp | 69 --------
utility/lip_filter/LIPFilterFactory.cpp | 57 +++++++
utility/lip_filter/LIPFilterFactory.hpp | 8 +-
utility/lip_filter/SingleIdentityHashFilter.hpp | 158 +++++++++++++++++++
28 files changed, 808 insertions(+), 361 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 58e5761..78ffb33 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -192,7 +192,9 @@ target_link_libraries(quickstep_queryexecution_QueryContext
quickstep_utility_Macros
quickstep_utility_SortConfiguration
quickstep_utility_lipfilter_LIPFilter
- quickstep_utility_lipfilter_LIPFilterDeploymentInfo)
+ quickstep_utility_lipfilter_LIPFilterDeployment
+ quickstep_utility_lipfilter_LIPFilterFactory
+ quickstep_utility_lipfilter_LIPFilter_proto)
target_link_libraries(quickstep_queryexecution_QueryContext_proto
quickstep_expressions_Expressions_proto
quickstep_expressions_tablegenerator_GeneratorFunction_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 47408d4..57d200a 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -40,6 +40,9 @@
#include "types/TypedValue.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
+#include "utility/lip_filter/LIPFilterFactory.hpp"
#include "utility/SortConfiguration.hpp"
#include "glog/logging.h"
@@ -93,6 +96,18 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
bus));
}
+ for (int i = 0; i < proto.lip_filters_size(); ++i) {
+ lip_filters_.emplace_back(
+ std::unique_ptr<LIPFilter>(
+ LIPFilterFactory::ReconstructFromProto(proto.lip_filters(i))));
+ }
+
+ for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) {
+ lip_deployments_.emplace_back(
+ std::make_unique<LIPFilterDeployment>(
+ proto.lip_filter_deployments(i), lip_filters_));
+ }
+
for (int i = 0; i < proto.predicates_size(); ++i) {
predicates_.emplace_back(
PredicateFactory::ReconstructFromProto(proto.predicates(i), database));
@@ -180,6 +195,18 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
}
}
+ for (int i = 0; i < proto.lip_filters_size(); ++i) {
+ if (!LIPFilterFactory::ProtoIsValid(proto.lip_filters(i))) {
+ return false;
+ }
+ }
+
+ for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) {
+ if (!LIPFilterDeployment::ProtoIsValid(proto.lip_filter_deployments(i))) {
+ return false;
+ }
+ }
+
for (int i = 0; i < proto.predicates_size(); ++i) {
if (!PredicateFactory::ProtoIsValid(proto.predicates(i), database)) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 3e287c0..66476f4 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -36,7 +36,7 @@
#include "storage/WindowAggregationOperationState.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
-#include "utility/lip_filter/LIPFilterDeploymentInfo.hpp"
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
#include "utility/Macros.hpp"
#include "utility/SortConfiguration.hpp"
@@ -93,7 +93,8 @@ class QueryContext {
/**
* @brief A unique identifier for a LIPFilterDeploymentInfo per query.
**/
- typedef std::uint32_t lip_filter_deployment_info_id;
+ typedef std::uint32_t lip_deployment_id;
+ static constexpr lip_deployment_id kInvalidILIPDeploymentId = static_cast<lip_deployment_id>(-1);
/**
* @brief A unique identifier for a Predicate per query.
@@ -345,22 +346,22 @@ class QueryContext {
*
* @return True if valid, otherwise false.
**/
- bool isValidLIPFilterDeploymentInfoId(const lip_filter_deployment_info_id id) const {
- return id < lip_filter_deployment_infos_.size();
+ bool isValidLIPDeploymentId(const lip_deployment_id id) const {
+ return id < lip_deployments_.size();
}
/**
- * @brief Get a constant pointer to the LIPFilterDeploymentInfo.
+ * @brief Get a constant pointer to the LIPFilterDeployment.
*
- * @param id The LIPFilterDeploymentInfo id.
+ * @param id The LIPFilterDeployment id.
*
- * @return The constant pointer to LIPFilterDeploymentInfo that is
+ * @return The constant pointer to LIPFilterDeployment that is
* already created in the constructor.
**/
- inline const LIPFilterDeploymentInfo* getLIPFilterDeploymentInfo(
- const lip_filter_deployment_info_id id) const {
- DCHECK_LT(id, lip_filter_deployment_infos_.size());
- return lip_filter_deployment_infos_[id].get();
+ inline const LIPFilterDeployment* getLIPDeployment(
+ const lip_deployment_id id) const {
+ DCHECK_LT(id, lip_deployments_.size());
+ return lip_deployments_[id].get();
}
/**
@@ -368,9 +369,9 @@ class QueryContext {
*
* @param id The id of the LIPFilterDeploymentInfo to destroy.
**/
- inline void destroyLIPFilterDeploymentInfo(const lip_filter_deployment_info_id id) {
- DCHECK_LT(id, lip_filter_deployment_infos_.size());
- lip_filter_deployment_infos_[id].reset();
+ inline void destroyLIPDeployment(const lip_deployment_id id) {
+ DCHECK_LT(id, lip_deployments_.size());
+ lip_deployments_[id].reset();
}
/**
@@ -552,7 +553,7 @@ class QueryContext {
std::vector<std::unique_ptr<InsertDestination>> insert_destinations_;
std::vector<std::unique_ptr<JoinHashTable>> join_hash_tables_;
std::vector<std::unique_ptr<LIPFilter>> lip_filters_;
- std::vector<std::unique_ptr<LIPFilterDeploymentInfo>> lip_filter_deployment_infos_;
+ std::vector<std::unique_ptr<LIPFilterDeployment>> lip_deployments_;
std::vector<std::unique_ptr<const Predicate>> predicates_;
std::vector<std::vector<std::unique_ptr<const Scalar>>> scalar_groups_;
std::vector<std::unique_ptr<const SortConfiguration>> sort_configs_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index d79b990..ab0f520 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -50,7 +50,7 @@ message QueryContext {
repeated HashTable join_hash_tables = 3;
repeated InsertDestination insert_destinations = 4;
repeated LIPFilter lip_filters = 5;
- repeated LIPFilterDeploymentInfo lip_filter_deployment_infos = 6;
+ repeated LIPFilterDeployment lip_filter_deployments = 6;
repeated Predicate predicates = 7;
repeated ScalarGroup scalar_groups = 8;
repeated SortConfiguration sort_configs = 9;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 4013e3e..16e3934 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -165,7 +165,10 @@ target_link_libraries(quickstep_queryoptimizer_LIPFilterGenerator
quickstep_queryoptimizer_physical_LIPFilterConfiguration
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_physical_Selection
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_types_Type
quickstep_utility_lipfilter_LIPFilter
+ quickstep_utility_lipfilter_LIPFilterDeployment
quickstep_utility_lipfilter_LIPFilter_proto)
target_link_libraries(quickstep_queryoptimizer_LogicalGenerator
glog
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 559de57..ce25841 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1460,8 +1460,7 @@ void ExecutionGenerator::convertAggregate(
true);
lip_filter_generator_->addAggregateInfo(physical_plan,
- aggregation_operator_index,
- aggr_state_index);
+ aggregation_operator_index);
}
void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_optimizer/LIPFilterGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/LIPFilterGenerator.cpp b/query_optimizer/LIPFilterGenerator.cpp
index 4e24740..f7394e0 100644
--- a/query_optimizer/LIPFilterGenerator.cpp
+++ b/query_optimizer/LIPFilterGenerator.cpp
@@ -25,6 +25,8 @@
#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.pb.h"
+#include "relational_operators/RelationalOperator.hpp"
+#include "types/Type.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
#include "utility/lip_filter/LIPFilter.pb.h"
@@ -65,21 +67,31 @@ void LIPFilterGenerator::deployLIPFilters(QueryPlan *execution_plan,
// Deploy builders
const auto &build_info_map = lip_filter_configuration_->getBuildInfoMap();
- for (const auto &hash_join_info : hash_join_infos_) {
- const P::PhysicalPtr &builder_node = hash_join_info.hash_join;
- const auto build_it = build_info_map.find(builder_node);
+ for (const auto &info : builder_infos_) {
+ const auto build_it = build_info_map.find(info.builder_node);
if (build_it != build_info_map.end()) {
deployBuilderInternal(execution_plan,
query_context_proto,
- builder_node,
- hash_join_info.build_operator_index,
+ info.builder_node,
+ info.builder_operator_index,
build_it->second,
&lip_filter_builder_map);
}
}
// Deploy probers
- // const auto &probe_info_map = lip_filter_configuration_->getProbeInfoMap();
+ const auto &probe_info_map = lip_filter_configuration_->getProbeInfoMap();
+ for (const auto &info : prober_infos_) {
+ const auto probe_it = probe_info_map.find(info.prober_node);
+ if (probe_it != probe_info_map.end()) {
+ deployProberInteral(execution_plan,
+ query_context_proto,
+ info.prober_node,
+ info.prober_operator_index,
+ probe_it->second,
+ lip_filter_builder_map);
+ }
+ }
}
void LIPFilterGenerator::deployBuilderInternal(
@@ -89,18 +101,29 @@ void LIPFilterGenerator::deployBuilderInternal(
const QueryPlan::DAGNodeIndex builder_operator_index,
const std::vector<physical::LIPFilterBuildInfo> &build_info_vec,
LIPFilterBuilderMap *lip_filter_builder_map) const {
+ const auto lip_deployment_index = query_context_proto->lip_filter_deployments_size();
+ auto *lip_filter_deployment_info_proto =
+ query_context_proto->add_lip_filter_deployments();
+ lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::BUILD);
+
const auto &builder_attribute_map = attribute_map_.at(builder_node);
for (const auto &info : build_info_vec) {
const QueryContext::lip_filter_id lip_filter_id = query_context_proto->lip_filters_size();
serialization::LIPFilter *lip_filter_proto = query_context_proto->add_lip_filters();
+ const CatalogAttribute *target_attr = builder_attribute_map.at(info.build_attribute->id());
+ const Type &attr_type = target_attr->getType();
switch (info.filter_type) {
- case LIPFilterType::kSingleIdentityHashFilter:
+ case LIPFilterType::kSingleIdentityHashFilter: {
+ DCHECK(!attr_type.isVariableLength());
lip_filter_proto->set_lip_filter_type(
serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER);
lip_filter_proto->SetExtension(
- serialization::SingleIdentityHashFilter::num_bits, info.filter_size);
+ serialization::SingleIdentityHashFilter::filter_cardinality, info.filter_cardinality);
+ lip_filter_proto->SetExtension(
+ serialization::SingleIdentityHashFilter::attribute_size, attr_type.minimumByteLength());
break;
+ }
default:
LOG(FATAL) << "Unsupported LIPFilter type";
break;
@@ -110,21 +133,53 @@ void LIPFilterGenerator::deployBuilderInternal(
std::make_pair(info.build_attribute->id(), builder_node),
std::make_pair(lip_filter_id, builder_operator_index));
- auto *lip_filter_deployment_info_proto =
- query_context_proto->add_lip_filter_deployment_infos();
- lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::BUILD);
- lip_filter_deployment_info_proto->set_lip_filter_id(lip_filter_id);
-
- const CatalogAttribute *target_attr = builder_attribute_map.at(info.build_attribute->id());
- lip_filter_deployment_info_proto->set_attribute_id(target_attr->getID());
- lip_filter_deployment_info_proto->mutable_attribute_type()->CopyFrom(
- target_attr->getType().getProto());
+ auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_entries();
+ lip_filter_entry_proto->set_lip_filter_id(lip_filter_id);
+ lip_filter_entry_proto->set_attribute_id(target_attr->getID());
+ lip_filter_entry_proto->mutable_attribute_type()->CopyFrom(attr_type.getProto());
std::cerr << "Build " << info.build_attribute->toString()
<< " @" << builder_node << "\n";
}
+
+ RelationalOperator *relop =
+ execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(builder_operator_index);
+ relop->deployLIPFilter(lip_deployment_index);
}
+void LIPFilterGenerator::deployProberInteral(
+ QueryPlan *execution_plan,
+ serialization::QueryContext *query_context_proto,
+ const physical::PhysicalPtr &prober_node,
+ const QueryPlan::DAGNodeIndex prober_operator_index,
+ const std::vector<physical::LIPFilterProbeInfo> &probe_info_vec,
+ const LIPFilterBuilderMap &lip_filter_builder_map) const {
+ const auto lip_deployment_index = query_context_proto->lip_filter_deployments_size();
+ auto *lip_filter_deployment_info_proto =
+ query_context_proto->add_lip_filter_deployments();
+ lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::PROBE);
+
+ const auto &prober_attribute_map = attribute_map_.at(prober_node);
+ for (const auto &info : probe_info_vec) {
+ const auto &builder_info =
+ lip_filter_builder_map.at(
+ std::make_pair(info.build_attribute->id(), info.builder));
+ const CatalogAttribute *target_attr = prober_attribute_map.at(info.probe_attribute->id());
+
+ auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_entries();
+ lip_filter_entry_proto->set_lip_filter_id(builder_info.first);
+ lip_filter_entry_proto->set_attribute_id(target_attr->getID());
+ lip_filter_entry_proto->mutable_attribute_type()->CopyFrom(
+ target_attr->getType().getProto());
+
+ std::cerr << "Probe " << info.probe_attribute->toString()
+ << " @" << prober_node << "\n";
+ }
+
+ RelationalOperator *relop =
+ execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(prober_operator_index);
+ relop->deployLIPFilter(lip_deployment_index);
+}
} // namespace optimizer
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_optimizer/LIPFilterGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/LIPFilterGenerator.hpp b/query_optimizer/LIPFilterGenerator.hpp
index 05270c9..b5925d4 100644
--- a/query_optimizer/LIPFilterGenerator.hpp
+++ b/query_optimizer/LIPFilterGenerator.hpp
@@ -55,60 +55,44 @@ class LIPFilterGenerator {
const std::unordered_map<expressions::ExprId, const CatalogAttribute *> &attribute_substitution_map);
void addAggregateInfo(const physical::AggregatePtr &aggregate,
- const QueryPlan::DAGNodeIndex aggregate_operator_index,
- const QueryContext::aggregation_state_id aggregation_state_id) {
- aggregate_infos_.emplace_back(aggregate, aggregate_operator_index, aggregation_state_id);
+ const QueryPlan::DAGNodeIndex aggregate_operator_index) {
+ prober_infos_.emplace_back(aggregate, aggregate_operator_index);
}
void addHashJoinInfo(const physical::HashJoinPtr &hash_join,
const QueryPlan::DAGNodeIndex build_operator_index,
const QueryPlan::DAGNodeIndex join_operator_index) {
- hash_join_infos_.emplace_back(hash_join, build_operator_index, join_operator_index);
+ builder_infos_.emplace_back(hash_join, build_operator_index);
+ prober_infos_.emplace_back(hash_join, join_operator_index);
}
void addSelectionInfo(const physical::SelectionPtr &selection,
const QueryPlan::DAGNodeIndex select_operator_index) {
- selection_infos_.emplace_back(selection, select_operator_index);
+ prober_infos_.emplace_back(selection, select_operator_index);
}
void deployLIPFilters(QueryPlan *execution_plan,
serialization::QueryContext *query_context_proto) const;
private:
- struct AggregateInfo {
- AggregateInfo(const physical::AggregatePtr &aggregate_in,
- const QueryPlan::DAGNodeIndex aggregate_operator_index_in,
- const QueryContext::aggregation_state_id aggregation_state_id_in)
- : aggregate(aggregate_in),
- aggregate_operator_index(aggregate_operator_index_in),
- aggregation_state_id(aggregation_state_id_in) {
+ struct BuilderInfo {
+ BuilderInfo(const physical::PhysicalPtr &builder_node_in,
+ const QueryPlan::DAGNodeIndex builder_operator_index_in)
+ : builder_node(builder_node_in),
+ builder_operator_index(builder_operator_index_in) {
}
- const physical::AggregatePtr aggregate;
- const QueryPlan::DAGNodeIndex aggregate_operator_index;
- const QueryContext::aggregation_state_id aggregation_state_id;
+ const physical::PhysicalPtr builder_node;
+ const QueryPlan::DAGNodeIndex builder_operator_index;
};
- struct HashJoinInfo {
- HashJoinInfo(const physical::HashJoinPtr &hash_join_in,
- const QueryPlan::DAGNodeIndex build_operator_index_in,
- const QueryPlan::DAGNodeIndex join_operator_index_in)
- : hash_join(hash_join_in),
- build_operator_index(build_operator_index_in),
- join_operator_index(join_operator_index_in) {
+ struct ProberInfo {
+ ProberInfo(const physical::PhysicalPtr &prober_node_in,
+ const QueryPlan::DAGNodeIndex prober_operator_index_in)
+ : prober_node(prober_node_in),
+ prober_operator_index(prober_operator_index_in) {
}
- const physical::HashJoinPtr hash_join;
- const QueryPlan::DAGNodeIndex build_operator_index;
- const QueryPlan::DAGNodeIndex join_operator_index;
- };
-
- struct SelectionInfo {
- SelectionInfo(const physical::SelectionPtr &selection_in,
- const QueryPlan::DAGNodeIndex select_operator_index_in)
- : selection(selection_in),
- select_operator_index(select_operator_index_in) {
- }
- const physical::SelectionPtr selection;
- const QueryPlan::DAGNodeIndex select_operator_index;
+ const physical::PhysicalPtr prober_node;
+ const QueryPlan::DAGNodeIndex prober_operator_index;
};
typedef std::map<std::pair<expressions::ExprId, physical::PhysicalPtr>,
@@ -121,13 +105,17 @@ class LIPFilterGenerator {
const std::vector<physical::LIPFilterBuildInfo> &build_info_vec,
LIPFilterBuilderMap *lip_filter_builder_map) const;
- void deployProberInteral();
+ void deployProberInteral(QueryPlan *execution_plan,
+ serialization::QueryContext *query_context_proto,
+ const physical::PhysicalPtr &prober_node,
+ const QueryPlan::DAGNodeIndex prober_operator_index,
+ const std::vector<physical::LIPFilterProbeInfo> &probe_info_vec,
+ const LIPFilterBuilderMap &lip_filter_builder_map) const;
const physical::LIPFilterConfigurationPtr lip_filter_configuration_;
std::map<physical::PhysicalPtr, std::map<expressions::ExprId, const CatalogAttribute *>> attribute_map_;
- std::vector<AggregateInfo> aggregate_infos_;
- std::vector<HashJoinInfo> hash_join_infos_;
- std::vector<SelectionInfo> selection_infos_;
+ std::vector<BuilderInfo> builder_infos_;
+ std::vector<ProberInfo> prober_infos_;
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/query_optimizer/physical/LIPFilterConfiguration.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/LIPFilterConfiguration.hpp b/query_optimizer/physical/LIPFilterConfiguration.hpp
index 9b028ad..f9236e5 100644
--- a/query_optimizer/physical/LIPFilterConfiguration.hpp
+++ b/query_optimizer/physical/LIPFilterConfiguration.hpp
@@ -44,14 +44,14 @@ typedef std::shared_ptr<const Physical> PhysicalPtr;
struct LIPFilterBuildInfo {
LIPFilterBuildInfo(const expressions::AttributeReferencePtr &build_attribute_in,
- const std::size_t filter_size_in,
+ const std::size_t filter_cardinality_in,
const LIPFilterType &filter_type_in)
: build_attribute(build_attribute_in),
- filter_size(filter_size_in),
+ filter_cardinality(filter_cardinality_in),
filter_type(filter_type_in) {
}
expressions::AttributeReferencePtr build_attribute;
- std::size_t filter_size;
+ std::size_t filter_cardinality;
LIPFilterType filter_type;
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/relational_operators/BuildHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp
index 465621c..eaf3259 100644
--- a/relational_operators/BuildHashOperator.cpp
+++ b/relational_operators/BuildHashOperator.cpp
@@ -34,6 +34,7 @@
#include "storage/TupleReference.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
#include "glog/logging.h"
@@ -68,6 +69,14 @@ bool BuildHashOperator::getAllWorkOrders(
tmb::MessageBus *bus) {
DCHECK(query_context != nullptr);
+ LIPFilterBuilderPtr lip_filter_builder = nullptr;
+ if (lip_deployment_index_ != QueryContext::kInvalidILIPDeploymentId) {
+ const LIPFilterDeployment *lip_filter_deployment =
+ query_context->getLIPDeployment(lip_deployment_index_);
+ lip_filter_builder = std::shared_ptr<LIPFilterBuilder>(
+ lip_filter_deployment->createLIPFilterBuilder());
+ }
+
JoinHashTable *hash_table = query_context->getJoinHashTable(hash_table_index_);
if (input_relation_is_stored_) {
if (!started_) {
@@ -79,7 +88,8 @@ bool BuildHashOperator::getAllWorkOrders(
any_join_key_attributes_nullable_,
input_block_id,
hash_table,
- storage_manager),
+ storage_manager,
+ lip_filter_builder),
op_index_);
}
started_ = true;
@@ -95,7 +105,8 @@ bool BuildHashOperator::getAllWorkOrders(
any_join_key_attributes_nullable_,
input_relation_block_ids_[num_workorders_generated_],
hash_table,
- storage_manager),
+ storage_manager,
+ lip_filter_builder),
op_index_);
++num_workorders_generated_;
}
@@ -136,17 +147,22 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id
any_join_key_attributes_nullable_);
proto->SetExtension(serialization::BuildHashWorkOrder::join_hash_table_index, hash_table_index_);
proto->SetExtension(serialization::BuildHashWorkOrder::block_id, block);
+ // TODO(jianqiao): update lip_filter related stuff
return proto;
}
-
void BuildHashWorkOrder::execute() {
BlockReference block(
storage_manager_->getBlock(build_block_id_, input_relation_));
TupleReferenceGenerator generator(build_block_id_);
std::unique_ptr<ValueAccessor> accessor(block->getTupleStorageSubBlock().createValueAccessor());
+
+ if (lip_filter_builder_ != nullptr) {
+ lip_filter_builder_->insertValueAccessor(accessor.get());
+ }
+
HashTablePutResult result;
if (join_key_attributes_.size() == 1) {
result = hash_table_->putValueAccessor(accessor.get(),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index 4a80a8a..940298c 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -20,6 +20,7 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
+#include <memory>
#include <string>
#include <utility>
#include <vector>
@@ -31,6 +32,7 @@
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
#include "glog/logging.h"
@@ -162,6 +164,7 @@ class BuildHashWorkOrder : public WorkOrder {
* @param build_block_id The block id.
* @param hash_table The JoinHashTable to use.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_builder The attached builder for building LIP filters.
**/
BuildHashWorkOrder(const std::size_t query_id,
const CatalogRelationSchema &input_relation,
@@ -169,14 +172,16 @@ class BuildHashWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable,
const block_id build_block_id,
JoinHashTable *hash_table,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterBuilderPtr lip_filter_builder = nullptr)
: WorkOrder(query_id),
input_relation_(input_relation),
join_key_attributes_(join_key_attributes),
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
build_block_id_(build_block_id),
hash_table_(DCHECK_NOTNULL(hash_table)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_builder_(lip_filter_builder) {}
/**
* @brief Constructor for the distributed version.
@@ -189,6 +194,7 @@ class BuildHashWorkOrder : public WorkOrder {
* @param build_block_id The block id.
* @param hash_table The JoinHashTable to use.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_builder The attached builder for building LIP filters.
**/
BuildHashWorkOrder(const std::size_t query_id,
const CatalogRelationSchema &input_relation,
@@ -196,14 +202,16 @@ class BuildHashWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable,
const block_id build_block_id,
JoinHashTable *hash_table,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterBuilderPtr lip_filter_builder = nullptr)
: WorkOrder(query_id),
input_relation_(input_relation),
join_key_attributes_(std::move(join_key_attributes)),
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
build_block_id_(build_block_id),
hash_table_(DCHECK_NOTNULL(hash_table)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_builder_(lip_filter_builder) {}
~BuildHashWorkOrder() override {}
@@ -222,6 +230,8 @@ class BuildHashWorkOrder : public WorkOrder {
JoinHashTable *hash_table_;
StorageManager *storage_manager_;
+ LIPFilterBuilderPtr lip_filter_builder_;
+
DISALLOW_COPY_AND_ASSIGN(BuildHashWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 779c0fe..f8916df 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -48,6 +48,7 @@
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -180,6 +181,11 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
if (blocking_dependencies_met_) {
DCHECK(query_context != nullptr);
+ const LIPFilterDeployment *lip_filter_deployment = nullptr;
+ if (lip_deployment_index_ != QueryContext::kInvalidILIPDeploymentId) {
+ lip_filter_deployment = query_context->getLIPDeployment(lip_deployment_index_);
+ }
+
const Predicate *residual_predicate =
query_context->getPredicate(residual_predicate_index_);
const vector<unique_ptr<const Scalar>> &selection =
@@ -192,6 +198,10 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
if (probe_relation_is_stored_) {
if (!started_) {
for (const block_id probe_block_id : probe_relation_block_ids_) {
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+ if (lip_filter_deployment != nullptr) {
+ lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+ }
container->addNormalWorkOrder(
new JoinWorkOrderClass(query_id_,
build_relation_,
@@ -203,7 +213,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
selection,
hash_table,
output_destination,
- storage_manager),
+ storage_manager,
+ lip_filter_adaptive_prober),
op_index_);
}
started_ = true;
@@ -211,6 +222,10 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
return started_;
} else {
while (num_workorders_generated_ < probe_relation_block_ids_.size()) {
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+ if (lip_filter_deployment != nullptr) {
+ lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+ }
container->addNormalWorkOrder(
new JoinWorkOrderClass(
query_id_,
@@ -223,7 +238,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
selection,
hash_table,
output_destination,
- storage_manager),
+ storage_manager,
+ lip_filter_adaptive_prober),
op_index_);
++num_workorders_generated_;
} // end while
@@ -423,6 +439,17 @@ void HashInnerJoinWorkOrder::execute() {
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+
+ std::unique_ptr<TupleIdSequence> lip_filter_existence_map;
+ std::unique_ptr<ValueAccessor> base_accessor;
+ if (lip_filter_adaptive_prober_ != nullptr) {
+ base_accessor.reset(probe_accessor.release());
+ lip_filter_existence_map.reset(
+ lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+ probe_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*lip_filter_existence_map));
+ }
+
MapBasedJoinedTupleCollector collector;
if (join_key_attributes_.size() == 1) {
hash_table_.getAllFromValueAccessor(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index fa393b6..29d6eba 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -35,6 +35,7 @@
#include "storage/HashTable.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "glog/logging.h"
@@ -307,7 +308,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -318,7 +320,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
/**
* @brief Constructor for the distributed version.
@@ -354,7 +357,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -365,7 +369,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
~HashInnerJoinWorkOrder() override {}
@@ -392,6 +397,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder);
};
@@ -435,7 +442,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -446,7 +454,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
/**
* @brief Constructor for the distributed version.
@@ -482,7 +491,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -493,7 +503,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
~HashSemiJoinWorkOrder() override {}
@@ -516,6 +527,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder);
};
@@ -559,7 +572,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -570,7 +584,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
/**
* @brief Constructor for the distributed version.
@@ -606,7 +621,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -617,7 +633,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
~HashAntiJoinWorkOrder() override {}
@@ -646,6 +663,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index f0303e5..fb05e9e 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -245,6 +245,13 @@ class RelationalOperator {
return op_index_;
}
+ /**
+ * @brief TODO
+ */
+ void deployLIPFilter(const QueryContext::lip_deployment_id lip_deployment_index) {
+ lip_deployment_index_ = lip_deployment_index;
+ }
+
protected:
/**
* @brief Constructor
@@ -265,6 +272,8 @@ class RelationalOperator {
bool done_feeding_input_relation_;
std::size_t op_index_;
+ QueryContext::lip_deployment_id lip_deployment_index_;
+
private:
DISALLOW_COPY_AND_ASSIGN(RelationalOperator);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index f05cc46..e85e005 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -643,7 +643,6 @@ target_link_libraries(quickstep_storage_FastHashTable
quickstep_threading_SpinSharedMutex
quickstep_types_Type
quickstep_types_TypedValue
- quickstep_utility_BloomFilter
quickstep_utility_HashPair
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_FastHashTableFactory
@@ -659,7 +658,6 @@ target_link_libraries(quickstep_storage_FastHashTableFactory
quickstep_storage_SimpleScalarSeparateChainingHashTable
quickstep_storage_TupleReference
quickstep_types_TypeFactory
- quickstep_utility_BloomFilter
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
quickstep_storage_FastHashTable
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/storage/FastHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp
index 4a95cd9..74d9ee3 100644
--- a/storage/FastHashTable.hpp
+++ b/storage/FastHashTable.hpp
@@ -39,7 +39,6 @@
#include "threading/SpinSharedMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
-#include "utility/BloomFilter.hpp"
#include "utility/HashPair.hpp"
#include "utility/Macros.hpp"
@@ -958,62 +957,6 @@ class FastHashTable : public HashTableBase<resizable,
template <typename FunctorT>
std::size_t forEachCompositeKeyFast(FunctorT *functor, int index) const;
- /**
- * @brief A call to this function will cause a bloom filter to be built
- * during the build phase of this hash table.
- **/
- inline void enableBuildSideBloomFilter() {
- has_build_side_bloom_filter_ = true;
- }
-
- /**
- * @brief A call to this function will cause a set of bloom filters to be
- * probed during the probe phase of this hash table.
- **/
- inline void enableProbeSideBloomFilter() {
- has_probe_side_bloom_filter_ = true;
- }
-
- /**
- * @brief This function sets the pointer to the bloom filter to be
- * used during the build phase of this hash table.
- * @warning Should call enable_build_side_bloom_filter() first to enable
- * bloom filter usage during build phase.
- * @note The ownership of the bloom filter lies with the caller.
- *
- * @param bloom_filter The pointer to the bloom filter.
- **/
- inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) {
- build_bloom_filter_ = bloom_filter;
- }
-
- /**
- * @brief This function adds a pointer to the list of bloom filters to be
- * used during the probe phase of this hash table.
- * @warning Should call enable_probe_side_bloom_filter() first to enable
- * bloom filter usage during probe phase.
- * @note The ownership of the bloom filter lies with the caller.
- *
- * @param bloom_filter The pointer to the bloom filter.
- **/
- inline void addProbeSideBloomFilter(const BloomFilter *bloom_filter) {
- probe_bloom_filters_.emplace_back(bloom_filter);
- }
-
- /**
- * @brief This function adds a vector of attribute ids corresponding to a
- * bloom filter used during the probe phase of this hash table.
- * @warning Should call enable_probe_side_bloom_filter() first to enable
- * bloom filter usage during probe phase.
- *
- * @param probe_attribute_ids The vector of attribute ids to use for probing
- * the bloom filter.
- **/
- inline void addProbeSideAttributeIds(
- std::vector<attribute_id> &&probe_attribute_ids) {
- probe_attribute_ids_.push_back(probe_attribute_ids);
- }
-
protected:
/**
* @brief Constructor for new resizable hash table.
@@ -1318,12 +1261,6 @@ class FastHashTable : public HashTableBase<resizable,
const attribute_id key_attr_id,
FunctorT *functor) const;
- // Data structures used for bloom filter optimized semi-joins.
- bool has_build_side_bloom_filter_ = false;
- bool has_probe_side_bloom_filter_ = false;
- BloomFilter *build_bloom_filter_;
- std::vector<const BloomFilter *> probe_bloom_filters_;
- std::vector<std::vector<attribute_id>> probe_attribute_ids_;
DISALLOW_COPY_AND_ASSIGN(FastHashTable);
};
@@ -1449,13 +1386,6 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
total_entries, total_variable_key_size, &prealloc_state);
}
}
- std::unique_ptr<BloomFilter> thread_local_bloom_filter;
- if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter.reset(
- new BloomFilter(build_bloom_filter_->getRandomSeed(),
- build_bloom_filter_->getNumberOfHashes(),
- build_bloom_filter_->getBitArraySize()));
- }
if (resizable) {
while (result == HashTablePutResult::kOutOfSpace) {
{
@@ -1474,12 +1404,6 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
variable_size,
(*functor)(*accessor),
using_prealloc ? &prealloc_state : nullptr);
- // Insert into bloom filter, if enabled.
- if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter->insertUnSafe(
- static_cast<const std::uint8_t *>(key.getDataPtr()),
- key.getDataSize());
- }
if (result == HashTablePutResult::kDuplicateKey) {
DEBUG_ASSERT(!using_prealloc);
return result;
@@ -1507,22 +1431,11 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
variable_size,
(*functor)(*accessor),
using_prealloc ? &prealloc_state : nullptr);
- // Insert into bloom filter, if enabled.
- if (has_build_side_bloom_filter_) {
- thread_local_bloom_filter->insertUnSafe(
- static_cast<const std::uint8_t *>(key.getDataPtr()),
- key.getDataSize());
- }
if (result != HashTablePutResult::kOK) {
return result;
}
}
}
- // Update the build side bloom filter with thread local copy, if
- // available.
- if (has_build_side_bloom_filter_) {
- build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get());
- }
return HashTablePutResult::kOK;
});
@@ -2462,52 +2375,27 @@ void FastHashTable<resizable,
InvokeOnAnyValueAccessor(
accessor,
[&](auto *accessor) -> void { // NOLINT(build/c++11)
- while (accessor->next()) {
- // Probe any bloom filters, if enabled.
- if (has_probe_side_bloom_filter_) {
- DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size());
- // Check if the key is contained in the BloomFilters or not.
- bool bloom_miss = false;
- for (std::size_t i = 0;
- i < probe_bloom_filters_.size() && !bloom_miss;
- ++i) {
- const BloomFilter *bloom_filter = probe_bloom_filters_[i];
- for (const attribute_id &attr_id : probe_attribute_ids_[i]) {
- TypedValue bloom_key = accessor->getTypedValue(attr_id);
- if (!bloom_filter->contains(static_cast<const std::uint8_t *>(
- bloom_key.getDataPtr()),
- bloom_key.getDataSize())) {
- bloom_miss = true;
- break;
- }
- }
- }
- if (bloom_miss) {
- continue; // On a bloom filter miss, probing the hash table can
- // be skipped.
- }
- }
-
- TypedValue key = accessor->getTypedValue(key_attr_id);
- if (check_for_null_keys && key.isNull()) {
- continue;
- }
- const std::size_t true_hash = use_scalar_literal_hash_template
- ? key.getHashScalarLiteral()
- : key.getHash();
- const std::size_t adjusted_hash =
- adjust_hashes_template ? this->AdjustHash(true_hash) : true_hash;
- std::size_t entry_num = 0;
- const std::uint8_t *value;
- while (this->getNextEntryForKey(
- key, adjusted_hash, &value, &entry_num)) {
- (*functor)(*accessor, *value);
- if (!allow_duplicate_keys) {
- break;
- }
- }
+ while (accessor->next()) {
+ TypedValue key = accessor->getTypedValue(key_attr_id);
+ if (check_for_null_keys && key.isNull()) {
+ continue;
+ }
+ const std::size_t true_hash = use_scalar_literal_hash_template
+ ? key.getHashScalarLiteral()
+ : key.getHash();
+ const std::size_t adjusted_hash =
+ adjust_hashes_template ? this->AdjustHash(true_hash) : true_hash;
+ std::size_t entry_num = 0;
+ const std::uint8_t *value;
+ while (this->getNextEntryForKey(
+ key, adjusted_hash, &value, &entry_num)) {
+ (*functor)(*accessor, *value);
+ if (!allow_duplicate_keys) {
+ break;
}
- });
+ }
+ }
+ });
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/storage/FastHashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTableFactory.hpp b/storage/FastHashTableFactory.hpp
index 6d0b693..682cc2a 100644
--- a/storage/FastHashTableFactory.hpp
+++ b/storage/FastHashTableFactory.hpp
@@ -32,7 +32,6 @@
#include "storage/SimpleScalarSeparateChainingHashTable.hpp"
#include "storage/TupleReference.hpp"
#include "types/TypeFactory.hpp"
-#include "utility/BloomFilter.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
@@ -183,14 +182,11 @@ class FastHashTableFactory {
* @param proto A protobuf description of a resizable HashTable.
* @param storage_manager The StorageManager to use (a StorageBlob will be
* allocated to hold the HashTable's contents).
- * @param bloom_filters A vector of pointers to bloom filters that may be used
- * during hash table construction in build/probe phase.
* @return A new resizable HashTable with parameters specified by proto.
**/
static FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>*
CreateResizableFromProto(const serialization::HashTable &proto,
- StorageManager *storage_manager,
- const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
+ StorageManager *storage_manager) {
DCHECK(ProtoIsValid(proto))
<< "Attempted to create HashTable from invalid proto description:\n"
<< proto.DebugString();
@@ -204,35 +200,6 @@ class FastHashTableFactory {
key_types,
proto.estimated_num_entries(),
storage_manager);
-
- // TODO(ssaurabh): These lazy initializations can be moved from here and pushed to the
- // individual implementations of the hash table constructors.
-
- // Check if there are any build side bloom filter defined on the hash table.
- if (proto.build_side_bloom_filter_id_size() > 0) {
- hash_table->enableBuildSideBloomFilter();
- hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
- }
-
- // Check if there are any probe side bloom filters defined on the hash table.
- if (proto.probe_side_bloom_filters_size() > 0) {
- hash_table->enableProbeSideBloomFilter();
- // Add as many probe bloom filters as defined by the proto.
- for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
- // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
- const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
- hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
-
- // Add the attribute ids corresponding to this probe bloom filter.
- std::vector<attribute_id> probe_attribute_ids;
- for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
- const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
- probe_attribute_ids.push_back(probe_attribute_id);
- }
- hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
- }
- }
-
return hash_table;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index df6a5ec..d78f5cd 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -23,10 +23,11 @@ QS_PROTOBUF_GENERATE_CPP(utility_lipfilter_LIPFilter_proto_srcs
add_library(quickstep_utility_lipfilter_LIPFilter LIPFilter.cpp LIPFilter.hpp)
add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.cpp LIPFilterAdaptiveProber.hpp)
add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp)
-add_library(quickstep_utility_lipfilter_LIPFilterDeploymentInfo ../../empty_src.cpp LIPFilterDeploymentInfo.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterDeployment LIPFilterDeployment.cpp LIPFilterDeployment.hpp)
add_library(quickstep_utility_lipfilter_LIPFilterFactory LIPFilterFactory.cpp LIPFilterFactory.hpp)
add_library(quickstep_utility_lipfilter_LIPFilter_proto
${utility_lipfilter_LIPFilter_proto_srcs})
+add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp)
# Link dependencies:
target_link_libraries(quickstep_utility_lipfilter_LIPFilter
@@ -37,13 +38,21 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilterAdaptiveProber
target_link_libraries(quickstep_utility_lipfilter_LIPFilterBuilder
quickstep_catalog_CatalogTypedefs
quickstep_utility_Macros)
-target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeploymentInfo
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeployment
quickstep_catalog_CatalogTypedefs
+ quickstep_types_TypeFactory
quickstep_utility_Macros
- quickstep_utility_lipfilter_LIPFilter)
+ quickstep_utility_lipfilter_LIPFilter
+ quickstep_utility_lipfilter_LIPFilter_proto)
target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory
+ quickstep_utility_lipfilter_LIPFilter
quickstep_utility_lipfilter_LIPFilter_proto
+ quickstep_utility_lipfilter_SingleIdentityHashFilter
quickstep_utility_Macros)
target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto
${PROTOBUF_LIBRARY}
quickstep_types_Type_proto)
+target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
+ quickstep_storage_StorageConstants
+ quickstep_utility_lipfilter_LIPFilter
+ quickstep_utility_Macros)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.hpp b/utility/lip_filter/LIPFilter.hpp
index c14b526..dd72e48 100644
--- a/utility/lip_filter/LIPFilter.hpp
+++ b/utility/lip_filter/LIPFilter.hpp
@@ -20,14 +20,20 @@
#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
+#include <cstddef>
#include <vector>
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
namespace quickstep {
+class Type;
+class ValueAccessor;
+
/** \addtogroup Utility
* @{
*/
@@ -44,6 +50,21 @@ class LIPFilter {
return type_;
}
+ virtual void insertValueAccessor(ValueAccessor *accessor,
+ const attribute_id attr_id,
+ const Type *attr_type) = 0;
+
+ virtual std::size_t filterBatch(ValueAccessor *accessor,
+ const attribute_id attr_id,
+ const bool is_attr_nullable,
+ std::vector<tuple_id> *batch,
+ const std::size_t batch_size) const = 0;
+
+ protected:
+ LIPFilter(const LIPFilterType &type)
+ : type_(type) {
+ }
+
private:
LIPFilterType type_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilter.proto
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.proto b/utility/lip_filter/LIPFilter.proto
index 897a86e..def13dd 100644
--- a/utility/lip_filter/LIPFilter.proto
+++ b/utility/lip_filter/LIPFilter.proto
@@ -36,7 +36,8 @@ message LIPFilter {
message SingleIdentityHashFilter {
extend LIPFilter {
// All required
- optional uint64 num_bits = 16;
+ optional uint64 filter_cardinality = 16;
+ optional uint64 attribute_size = 17;
}
}
@@ -45,9 +46,13 @@ enum LIPFilterActionType {
PROBE = 2;
}
-message LIPFilterDeploymentInfo {
+message LIPFilterDeployment {
+ message Entry {
+ required uint32 lip_filter_id = 1;
+ required int32 attribute_id = 2;
+ required Type attribute_type = 3;
+ }
+
required LIPFilterActionType action_type = 1;
- required uint32 lip_filter_id = 2;
- required int32 attribute_id = 3;
- required Type attribute_type = 4;
+ repeated Entry entries = 2;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterAdaptiveProber.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterAdaptiveProber.hpp b/utility/lip_filter/LIPFilterAdaptiveProber.hpp
index 6005690..af42446 100644
--- a/utility/lip_filter/LIPFilterAdaptiveProber.hpp
+++ b/utility/lip_filter/LIPFilterAdaptiveProber.hpp
@@ -20,10 +20,19 @@
#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_
#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_ADAPTIVE_PROBER_HPP_
+#include <algorithm>
+#include <cstdint>
+#include <memory>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/SingleIdentityHashFilter.hpp"
namespace quickstep {
@@ -33,16 +42,16 @@ namespace quickstep {
class LIPFilterAdaptiveProber {
public:
- LIPFilterAdaptiveProber(const std::vector<const LIPFilter *> &lip_filters,
+ LIPFilterAdaptiveProber(const std::vector<LIPFilter *> &lip_filters,
const std::vector<attribute_id> &attr_ids,
- const std::vector<std::size_t> &attr_sizes) {
+ const std::vector<const Type *> &attr_types) {
DCHECK_EQ(lip_filters.size(), attr_ids.size());
- DCHECK_EQ(lip_filters.size(), attr_sizes.size());
+ DCHECK_EQ(lip_filters.size(), attr_types.size());
probe_entries_.reserve(lip_filters.size());
for (std::size_t i = 0; i < lip_filters.size(); ++i) {
probe_entries_.emplace_back(
- new ProbeEntry(lip_filters[i], attr_ids[i], attr_sizes[i]));
+ new ProbeEntry(lip_filters[i], attr_ids[i], attr_types[i]));
}
}
@@ -52,14 +61,23 @@ class LIPFilterAdaptiveProber {
}
}
+ TupleIdSequence* filterValueAccessor(ValueAccessor *accessor) {
+ const TupleIdSequence *existence_map = accessor->getTupleIdSequenceVirtual();
+ if (existence_map == nullptr) {
+ return filterValueAccessorNoExistenceMap(accessor);
+ } else {
+ return filterValueAccessorWithExistenceMap(accessor, existence_map);
+ }
+ }
+
private:
struct ProbeEntry {
ProbeEntry(const LIPFilter *lip_filter_in,
const attribute_id attr_id_in,
- const std::size_t attr_size_in)
+ const Type *attr_type_in)
: lip_filter(lip_filter_in),
attr_id(attr_id_in),
- attr_size(attr_size_in),
+ attr_type(attr_type_in),
miss(0),
cnt(0) {
}
@@ -69,12 +87,95 @@ class LIPFilterAdaptiveProber {
}
const LIPFilter *lip_filter;
const attribute_id attr_id;
- const std::size_t attr_size;
+ const Type *attr_type;
std::uint32_t miss;
std::uint32_t cnt;
float miss_rate;
};
+
+ inline TupleIdSequence* filterValueAccessorNoExistenceMap(ValueAccessor *accessor) {
+ const std::uint32_t num_tuples = accessor->getNumTuplesVirtual();
+ std::unique_ptr<TupleIdSequence> matches(new TupleIdSequence(num_tuples));
+ std::uint32_t next_batch_size = 64u;
+ std::vector<tuple_id> batch(num_tuples);
+
+ std::uint32_t batch_start = 0;
+ do {
+ const std::uint32_t batch_size =
+ std::min(next_batch_size, num_tuples - batch_start);
+ for (std::uint32_t i = 0; i < batch_size; ++i) {
+ batch[i] = batch_start + i;
+ }
+
+ const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size);
+ for (std::uint32_t i = 0; i < num_hits; ++i) {
+ matches->set(batch[i], true);
+ }
+
+ batch_start += batch_size;
+ next_batch_size *= 2;
+ } while (batch_start < num_tuples);
+
+ return matches.release();
+ }
+
+ inline TupleIdSequence* filterValueAccessorWithExistenceMap(ValueAccessor *accessor,
+ const TupleIdSequence *existence_map) {
+ std::unique_ptr<TupleIdSequence> matches(
+ new TupleIdSequence(existence_map->length()));
+ std::uint32_t next_batch_size = 64u;
+ std::uint32_t num_tuples_left = existence_map->numTuples();
+ std::vector<tuple_id> batch(num_tuples_left);
+
+ TupleIdSequence::const_iterator tuple_it = existence_map->before_begin();
+ do {
+ const std::uint32_t batch_size =
+ next_batch_size < num_tuples_left ? next_batch_size : num_tuples_left;
+ for (std::uint32_t i = 0; i < batch_size; ++i) {
+ ++tuple_it;
+ batch[i] = *tuple_it;
+ }
+
+ const std::uint32_t num_hits = filterBatch(accessor, &batch, batch_size);
+ for (std::uint32_t i = 0; i < num_hits; ++i) {
+ matches->set(batch[i], true);
+ }
+
+ num_tuples_left -= batch_size;
+ next_batch_size *= 2;
+ } while (num_tuples_left > 0);
+
+ return matches.release();
+ }
+
+ inline std::size_t filterBatch(ValueAccessor *accessor,
+ std::vector<tuple_id> *batch,
+ std::uint32_t batch_size) {
+ for (auto *entry : probe_entries_) {
+ const std::uint32_t out_size =
+ entry->lip_filter->filterBatch(accessor,
+ entry->attr_id,
+ entry->attr_type->isNullable(),
+ batch,
+ batch_size);
+ entry->cnt += batch_size;
+ entry->miss += batch_size - out_size;
+ batch_size = out_size;
+ }
+ adaptEntryOrder();
+ return batch_size;
+ }
+
+ inline void adaptEntryOrder() {
+ for (auto &entry : probe_entries_) {
+ entry->miss_rate = static_cast<float>(entry->miss) / entry->cnt;
+ }
+ std::sort(probe_entries_.begin(),
+ probe_entries_.end(),
+ ProbeEntry::isBetterThan);
+ }
+
std::vector<ProbeEntry *> probe_entries_;
DISALLOW_COPY_AND_ASSIGN(LIPFilterAdaptiveProber);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterBuilder.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterBuilder.hpp b/utility/lip_filter/LIPFilterBuilder.hpp
index 07b26da..0a2d465 100644
--- a/utility/lip_filter/LIPFilterBuilder.hpp
+++ b/utility/lip_filter/LIPFilterBuilder.hpp
@@ -20,29 +20,41 @@
#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_
#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_BUILDER_HPP_
+#include <memory>
#include <vector>
-#include "utility/Macros.hpp"
-
#include "catalog/CatalogTypedefs.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
namespace quickstep {
+class ValueAccessor;
+
/** \addtogroup Utility
* @{
*/
+class LIPFilterBuilder;
+typedef std::shared_ptr<LIPFilterBuilder> LIPFilterBuilderPtr;
+
class LIPFilterBuilder {
public:
LIPFilterBuilder(const std::vector<LIPFilter *> &lip_filters,
const std::vector<attribute_id> &attr_ids,
- const std::vector<std::size_t> &attr_sizes) {
+ const std::vector<const Type *> &attr_types) {
DCHECK_EQ(lip_filters.size(), attr_ids.size());
- DCHECK_EQ(lip_filters.size(), attr_sizes.size());
+ DCHECK_EQ(lip_filters.size(), attr_types.size());
build_entries_.reserve(lip_filters.size());
for (std::size_t i = 0; i < lip_filters.size(); ++i) {
- build_entries_.emplace_back(lip_filters[i], attr_ids[i], attr_sizes[i]);
+ build_entries_.emplace_back(lip_filters[i], attr_ids[i], attr_types[i]);
+ }
+ }
+
+ void insertValueAccessor(ValueAccessor *accessor) {
+ for (auto &entry : build_entries_) {
+ entry.lip_filter->insertValueAccessor(accessor, entry.attr_id, entry.attr_type);
}
}
@@ -50,14 +62,14 @@ class LIPFilterBuilder {
struct BuildEntry {
BuildEntry(LIPFilter *lip_filter_in,
const attribute_id attr_id_in,
- const std::size_t attr_size_in)
+ const Type *attr_type_in)
: lip_filter(lip_filter_in),
attr_id(attr_id_in),
- attr_size(attr_size_in) {
+ attr_type(attr_type_in) {
}
LIPFilter *lip_filter;
const attribute_id attr_id;
- const std::size_t attr_size;
+ const Type *attr_type;
};
std::vector<BuildEntry> build_entries_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterDeployment.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.cpp b/utility/lip_filter/LIPFilterDeployment.cpp
new file mode 100644
index 0000000..0ac396b
--- /dev/null
+++ b/utility/lip_filter/LIPFilterDeployment.cpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
+
+#include "types/TypeFactory.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+LIPFilterDeployment::LIPFilterDeployment(
+ const serialization::LIPFilterDeployment &proto,
+ const std::vector<std::unique_ptr<LIPFilter>> &lip_filters) {
+ switch (proto.action_type()) {
+ case serialization::LIPFilterActionType::BUILD:
+ action_type_ = LIPFilterActionType::kBuild;
+ break;
+ case serialization::LIPFilterActionType::PROBE:
+ action_type_ = LIPFilterActionType::kProbe;
+ break;
+ default:
+ LOG(FATAL) << "Unsupported LIPFilterActionType: "
+ << serialization::LIPFilterActionType_Name(proto.action_type());
+ }
+
+ for (int i = 0; i < proto.entries_size(); ++i) {
+ const auto &entry_proto = proto.entries(i);
+ lip_filters_.emplace_back(lip_filters.at(entry_proto.lip_filter_id()).get());
+ attr_ids_.emplace_back(entry_proto.attribute_id());
+ attr_types_.emplace_back(&TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+ }
+}
+
+bool LIPFilterDeployment::ProtoIsValid(
+ const serialization::LIPFilterDeployment &proto) {
+ return true;
+}
+
+LIPFilterBuilder* LIPFilterDeployment::createLIPFilterBuilder() const {
+ DCHECK(action_type_ == LIPFilterActionType::kBuild);
+ return new LIPFilterBuilder(lip_filters_, attr_ids_, attr_types_);
+}
+
+LIPFilterAdaptiveProber* LIPFilterDeployment::createLIPFilterAdaptiveProber() const {
+ DCHECK(action_type_ == LIPFilterActionType::kProbe);
+ return new LIPFilterAdaptiveProber(lip_filters_, attr_ids_, attr_types_);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterDeployment.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.hpp b/utility/lip_filter/LIPFilterDeployment.hpp
new file mode 100644
index 0000000..60de14e
--- /dev/null
+++ b/utility/lip_filter/LIPFilterDeployment.hpp
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
+
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
+
+namespace quickstep {
+
+class LIPFilterBuilder;
+class LIPFilterAdaptiveProber;
+class Type;
+
+/** \addtogroup Utility
+ * @{
+ */
+
+enum class LIPFilterActionType {
+ kBuild = 0,
+ kProbe
+};
+
+class LIPFilterDeployment {
+ public:
+ LIPFilterDeployment(const serialization::LIPFilterDeployment &proto,
+ const std::vector<std::unique_ptr<LIPFilter>> &lip_filters);
+
+ static bool ProtoIsValid(const serialization::LIPFilterDeployment &proto);
+
+ LIPFilterActionType getActionType() const {
+ return action_type_;
+ }
+
+ LIPFilterBuilder* createLIPFilterBuilder() const;
+
+ LIPFilterAdaptiveProber* createLIPFilterAdaptiveProber() const;
+
+ private:
+ LIPFilterActionType action_type_;
+ std::vector<LIPFilter *> lip_filters_;
+ std::vector<attribute_id> attr_ids_;
+ std::vector<const Type *> attr_types_;
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterDeploymentInfo.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeploymentInfo.hpp b/utility/lip_filter/LIPFilterDeploymentInfo.hpp
deleted file mode 100644
index db75021..0000000
--- a/utility/lip_filter/LIPFilterDeploymentInfo.hpp
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_INFO_HPP_
-#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_INFO_HPP_
-
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "utility/Macros.hpp"
-#include "utility/lip_filter/LIPFilter.hpp"
-
-namespace quickstep {
-
-/** \addtogroup Utility
- * @{
- */
-
-enum class LIPFilterActionType {
- kBuild = 0,
- kProbe
-};
-
-class LIPFilterDeploymentInfo {
- public:
- const LIPFilterActionType getActionType() const {
- return action_type_;
- }
-
- const std::vector<LIPFilter*>& lip_filters() const {
- return lip_filters_;
- }
-
- const std::vector<attribute_id>& attr_ids() const {
- return attr_ids_;
- }
-
- const std::vector<const Type*>& attr_types() const {
- return attr_types_;
- }
-
- private:
- LIPFilterActionType action_type_;
- std::vector<LIPFilter*> lip_filters_;
- std::vector<attribute_id> attr_ids_;
- std::vector<const Type*> attr_types_;
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_DEPLOYMENT_INFO_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterFactory.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.cpp b/utility/lip_filter/LIPFilterFactory.cpp
index e69de29..f0e7725 100644
--- a/utility/lip_filter/LIPFilterFactory.cpp
+++ b/utility/lip_filter/LIPFilterFactory.cpp
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilterFactory.hpp"
+
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/lip_filter/SingleIdentityHashFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter &proto) {
+ switch (proto.lip_filter_type()) {
+ case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
+ const std::size_t attr_size =
+ proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);
+ const std::size_t filter_cardinality =
+ proto.GetExtension(serialization::SingleIdentityHashFilter::filter_cardinality);
+
+ if (attr_size >= 8) {
+ return new SingleIdentityHashFilter<std::uint64_t>(filter_cardinality);
+ } else if (attr_size >= 4) {
+ return new SingleIdentityHashFilter<std::uint32_t>(filter_cardinality);
+ } else if (attr_size >= 2) {
+ return new SingleIdentityHashFilter<std::uint16_t>(filter_cardinality);
+ } else {
+ return new SingleIdentityHashFilter<std::uint8_t>(filter_cardinality);
+ }
+ }
+ default:
+ LOG(FATAL) << "Unsupported LIP filter type: "
+ << serialization::LIPFilterType_Name(proto.lip_filter_type());
+ }
+}
+
+bool LIPFilterFactory::ProtoIsValid(const serialization::LIPFilter &proto) {
+ return true;
+}
+
+} // namespace quickstep
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/LIPFilterFactory.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.hpp b/utility/lip_filter/LIPFilterFactory.hpp
index 0567093..6a94ae4 100644
--- a/utility/lip_filter/LIPFilterFactory.hpp
+++ b/utility/lip_filter/LIPFilterFactory.hpp
@@ -23,17 +23,21 @@
#include <vector>
#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
+#include "utility/lip_filter/LIPFilter.pb.h"
namespace quickstep {
+class LIPFilter;
+
/** \addtogroup Utility
* @{
*/
class LIPFilterFactory {
public:
+ static LIPFilter* ReconstructFromProto(const serialization::LIPFilter &proto);
+
+ static bool ProtoIsValid(const serialization::LIPFilter &proto);
private:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a0f68f0/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
new file mode 100644
index 0000000..40ef14b
--- /dev/null
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
+
+#include <vector>
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <utility>
+#include <vector>
+
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ * @{
+ */
+
+template <typename CppType>
+class SingleIdentityHashFilter : public LIPFilter {
+ public:
+ SingleIdentityHashFilter(const std::size_t filter_cardinality)
+ : LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
+ filter_cardinality_(filter_cardinality),
+ bit_array_(GetByteSize(filter_cardinality)) {
+ std::memset(bit_array_.data(),
+ 0x0,
+ sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
+ }
+
+ void insertValueAccessor(ValueAccessor *accessor,
+ const attribute_id attr_id,
+ const Type *attr_type) override {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ if (attr_type->isNullable()) {
+ insertValueAccessorInternal<true>(accessor, attr_id);
+ } else {
+ insertValueAccessorInternal<false>(accessor, attr_id);
+ }
+ });
+ }
+
+ std::size_t filterBatch(ValueAccessor *accessor,
+ const attribute_id attr_id,
+ const bool is_attr_nullable,
+ std::vector<tuple_id> *batch,
+ const std::size_t batch_size) const override {
+ return InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> std::size_t { // NOLINT(build/c++11)
+ if (is_attr_nullable) {
+ return filterBatchInternal<true>(accessor, attr_id, batch, batch_size);
+ } else {
+ return filterBatchInternal<false>(accessor, attr_id, batch, batch_size);
+ }
+ });
+ }
+
+
+ /**
+ * @brief Inserts a given value into the hash filter.
+ *
+ * @param key_begin A pointer to the value being inserted.
+ */
+ inline void insert(const void *key_begin) {
+ const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+ bit_array_[hash >> 3].fetch_or(1 << (hash & 0x7), std::memory_order_relaxed);
+ }
+
+ /**
+ * @brief Test membership of a given value in the hash filter.
+ * If true is returned, then a value may or may not be present in the hash filter.
+ * If false is returned, a value is certainly not present in the hash filter.
+ *
+ * @param key_begin A pointer to the value being tested for membership.
+ */
+ inline bool contains(const void *key_begin) const {
+ const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+ return ((bit_array_[hash >> 3].load(std::memory_order_relaxed) & (1 << (hash & 0x7))) > 0);
+ }
+
+ private:
+ inline static std::size_t GetByteSize(const std::size_t bit_size) {
+ return (bit_size + 7) / 8;
+ }
+
+ template <bool is_attr_nullable, typename ValueAccessorT>
+ inline void insertValueAccessorInternal(ValueAccessorT *accessor,
+ const attribute_id attr_id) {
+ accessor->beginIteration();
+ while (accessor->next()) {
+ const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+ if (!is_attr_nullable || value != nullptr) {
+ insert(value);
+ }
+ }
+ }
+
+ template <bool is_attr_nullable, typename ValueAccessorT>
+ inline std::size_t filterBatchInternal(const ValueAccessorT *accessor,
+ const attribute_id attr_id,
+ std::vector<tuple_id> *batch,
+ const std::size_t batch_size) const {
+ std::size_t out_size = 0;
+ for (std::size_t i = 0; i < batch_size; ++i) {
+ const tuple_id tid = batch->at(i);
+ const void *value =
+ accessor->template getUntypedValueAtAbsolutePosition(attr_id, tid);
+ if (contains(value)) {
+ batch->at(out_size) = tid;
+ ++out_size;
+ }
+ }
+ return out_size;
+ }
+
+
+ std::size_t filter_cardinality_;
+ alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+
+ DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_