You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/20 05:17:47 UTC
[5/5] incubator-quickstep git commit: Add LIPFilter feature.
Add LIPFilter feature.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d14509f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d14509f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d14509f0
Branch: refs/heads/lip-refactor
Commit: d14509f00b9752f42485ba2139379c29fce49efc
Parents: fb6b284
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Wed Sep 7 13:20:43 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Oct 20 00:09:20 2016 -0500
----------------------------------------------------------------------
expressions/scalar/ScalarAttribute.cpp | 2 +-
relational_operators/AggregationOperator.cpp | 21 +++-
relational_operators/AggregationOperator.hpp | 9 +-
relational_operators/BuildHashOperator.cpp | 23 +++-
relational_operators/BuildHashOperator.hpp | 18 ++-
relational_operators/HashJoinOperator.cpp | 86 +++++++++++---
relational_operators/HashJoinOperator.hpp | 43 +++++--
relational_operators/SelectOperator.cpp | 67 +++++++++--
relational_operators/SelectOperator.hpp | 18 ++-
storage/AggregationOperationState.cpp | 60 ++++++----
storage/AggregationOperationState.hpp | 10 +-
storage/StorageBlock.cpp | 131 ++++++++++------------
storage/StorageBlock.hpp | 21 ++--
utility/lip_filter/LIPFilter.cpp | 24 ++++
14 files changed, 377 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/expressions/scalar/ScalarAttribute.cpp
----------------------------------------------------------------------
diff --git a/expressions/scalar/ScalarAttribute.cpp b/expressions/scalar/ScalarAttribute.cpp
index b29286b..cc42084 100644
--- a/expressions/scalar/ScalarAttribute.cpp
+++ b/expressions/scalar/ScalarAttribute.cpp
@@ -168,7 +168,7 @@ ColumnVector* ScalarAttribute::getAllValuesForJoin(
ValueAccessor *accessor = using_left_relation ? left_accessor
: right_accessor;
- return InvokeOnValueAccessorNotAdapter(
+ return InvokeOnAnyValueAccessor(
accessor,
[&joined_tuple_ids,
&attr_id,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/relational_operators/AggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp
index 056e76d..71baa53 100644
--- a/relational_operators/AggregationOperator.cpp
+++ b/relational_operators/AggregationOperator.cpp
@@ -38,14 +38,24 @@ bool AggregationOperator::getAllWorkOrders(
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
+ const LIPFilterDeployment *lip_filter_deployment = nullptr;
+ if (lip_deployment_index_ != QueryContext::kInvalidILIPDeploymentId) {
+ lip_filter_deployment = query_context->getLIPDeployment(lip_deployment_index_);
+ }
+
if (input_relation_is_stored_) {
if (!started_) {
for (const block_id input_block_id : input_relation_block_ids_) {
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+ if (lip_filter_deployment != nullptr) {
+ lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+ }
container->addNormalWorkOrder(
new AggregationWorkOrder(
query_id_,
input_block_id,
- query_context->getAggregationState(aggr_state_index_)),
+ query_context->getAggregationState(aggr_state_index_),
+ lip_filter_adaptive_prober),
op_index_);
}
started_ = true;
@@ -53,11 +63,16 @@ bool AggregationOperator::getAllWorkOrders(
return started_;
} else {
while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+ if (lip_filter_deployment != nullptr) {
+ lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+ }
container->addNormalWorkOrder(
new AggregationWorkOrder(
query_id_,
input_relation_block_ids_[num_workorders_generated_],
- query_context->getAggregationState(aggr_state_index_)),
+ query_context->getAggregationState(aggr_state_index_),
+ lip_filter_adaptive_prober),
op_index_);
++num_workorders_generated_;
}
@@ -98,7 +113,7 @@ serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_
void AggregationWorkOrder::execute() {
- state_->aggregateBlock(input_block_id_);
+ state_->aggregateBlock(input_block_id_, lip_filter_adaptive_prober_.get());
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index 31c1da4..da36d57 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -30,6 +30,7 @@
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "glog/logging.h"
@@ -140,10 +141,12 @@ class AggregationWorkOrder : public WorkOrder {
**/
AggregationWorkOrder(const std::size_t query_id,
const block_id input_block_id,
- AggregationOperationState *state)
+ AggregationOperationState *state,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
: WorkOrder(query_id),
input_block_id_(input_block_id),
- state_(DCHECK_NOTNULL(state)) {}
+ state_(DCHECK_NOTNULL(state)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
~AggregationWorkOrder() override {}
@@ -153,6 +156,8 @@ class AggregationWorkOrder : public WorkOrder {
const block_id input_block_id_;
AggregationOperationState *state_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(AggregationWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/relational_operators/BuildHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp
index 465621c..35bb5cf 100644
--- a/relational_operators/BuildHashOperator.cpp
+++ b/relational_operators/BuildHashOperator.cpp
@@ -34,6 +34,7 @@
#include "storage/TupleReference.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
#include "glog/logging.h"
@@ -68,6 +69,14 @@ bool BuildHashOperator::getAllWorkOrders(
tmb::MessageBus *bus) {
DCHECK(query_context != nullptr);
+ LIPFilterBuilderPtr lip_filter_builder = nullptr;
+ if (lip_deployment_index_ != QueryContext::kInvalidILIPDeploymentId) {
+ const LIPFilterDeployment *lip_filter_deployment =
+ query_context->getLIPDeployment(lip_deployment_index_);
+ lip_filter_builder = std::shared_ptr<LIPFilterBuilder>(
+ lip_filter_deployment->createLIPFilterBuilder());
+ }
+
JoinHashTable *hash_table = query_context->getJoinHashTable(hash_table_index_);
if (input_relation_is_stored_) {
if (!started_) {
@@ -79,7 +88,8 @@ bool BuildHashOperator::getAllWorkOrders(
any_join_key_attributes_nullable_,
input_block_id,
hash_table,
- storage_manager),
+ storage_manager,
+ lip_filter_builder),
op_index_);
}
started_ = true;
@@ -95,7 +105,8 @@ bool BuildHashOperator::getAllWorkOrders(
any_join_key_attributes_nullable_,
input_relation_block_ids_[num_workorders_generated_],
hash_table,
- storage_manager),
+ storage_manager,
+ lip_filter_builder),
op_index_);
++num_workorders_generated_;
}
@@ -136,17 +147,23 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id
any_join_key_attributes_nullable_);
proto->SetExtension(serialization::BuildHashWorkOrder::join_hash_table_index, hash_table_index_);
proto->SetExtension(serialization::BuildHashWorkOrder::block_id, block);
+ // TODO(jianqiao): update lip_filter related stuff
return proto;
}
-
void BuildHashWorkOrder::execute() {
BlockReference block(
storage_manager_->getBlock(build_block_id_, input_relation_));
TupleReferenceGenerator generator(build_block_id_);
std::unique_ptr<ValueAccessor> accessor(block->getTupleStorageSubBlock().createValueAccessor());
+
+ if (lip_filter_builder_ != nullptr) {
+ lip_filter_builder_->insertValueAccessor(accessor.get());
+ accessor->beginIterationVirtual();
+ }
+
HashTablePutResult result;
if (join_key_attributes_.size() == 1) {
result = hash_table_->putValueAccessor(accessor.get(),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index 4a80a8a..940298c 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -20,6 +20,7 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
+#include <memory>
#include <string>
#include <utility>
#include <vector>
@@ -31,6 +32,7 @@
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
#include "glog/logging.h"
@@ -162,6 +164,7 @@ class BuildHashWorkOrder : public WorkOrder {
* @param build_block_id The block id.
* @param hash_table The JoinHashTable to use.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_builder The attached builder for building LIP filters.
**/
BuildHashWorkOrder(const std::size_t query_id,
const CatalogRelationSchema &input_relation,
@@ -169,14 +172,16 @@ class BuildHashWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable,
const block_id build_block_id,
JoinHashTable *hash_table,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterBuilderPtr lip_filter_builder = nullptr)
: WorkOrder(query_id),
input_relation_(input_relation),
join_key_attributes_(join_key_attributes),
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
build_block_id_(build_block_id),
hash_table_(DCHECK_NOTNULL(hash_table)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_builder_(lip_filter_builder) {}
/**
* @brief Constructor for the distributed version.
@@ -189,6 +194,7 @@ class BuildHashWorkOrder : public WorkOrder {
* @param build_block_id The block id.
* @param hash_table The JoinHashTable to use.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_builder The attached builder for building LIP filters.
**/
BuildHashWorkOrder(const std::size_t query_id,
const CatalogRelationSchema &input_relation,
@@ -196,14 +202,16 @@ class BuildHashWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable,
const block_id build_block_id,
JoinHashTable *hash_table,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterBuilderPtr lip_filter_builder = nullptr)
: WorkOrder(query_id),
input_relation_(input_relation),
join_key_attributes_(std::move(join_key_attributes)),
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
build_block_id_(build_block_id),
hash_table_(DCHECK_NOTNULL(hash_table)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_builder_(lip_filter_builder) {}
~BuildHashWorkOrder() override {}
@@ -222,6 +230,8 @@ class BuildHashWorkOrder : public WorkOrder {
JoinHashTable *hash_table_;
StorageManager *storage_manager_;
+ LIPFilterBuilderPtr lip_filter_builder_;
+
DISALLOW_COPY_AND_ASSIGN(BuildHashWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 779c0fe..ddc2a40 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -48,6 +48,7 @@
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -95,8 +96,8 @@ class MapBasedJoinedTupleCollector {
class SemiAntiJoinTupleCollector {
public:
- explicit SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) {
- filter_.reset(tuple_store.getExistenceMap());
+ explicit SemiAntiJoinTupleCollector(TupleIdSequence *existence_map) {
+ filter_ = existence_map;
}
template <typename ValueAccessorT>
@@ -104,12 +105,8 @@ class SemiAntiJoinTupleCollector {
filter_->set(accessor.getCurrentPosition(), false);
}
- const TupleIdSequence* filter() const {
- return filter_.get();
- }
-
private:
- std::unique_ptr<TupleIdSequence> filter_;
+ TupleIdSequence *filter_;
};
class OuterJoinTupleCollector {
@@ -180,6 +177,11 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
if (blocking_dependencies_met_) {
DCHECK(query_context != nullptr);
+ const LIPFilterDeployment *lip_filter_deployment = nullptr;
+ if (lip_deployment_index_ != QueryContext::kInvalidILIPDeploymentId) {
+ lip_filter_deployment = query_context->getLIPDeployment(lip_deployment_index_);
+ }
+
const Predicate *residual_predicate =
query_context->getPredicate(residual_predicate_index_);
const vector<unique_ptr<const Scalar>> &selection =
@@ -192,6 +194,10 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
if (probe_relation_is_stored_) {
if (!started_) {
for (const block_id probe_block_id : probe_relation_block_ids_) {
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+ if (lip_filter_deployment != nullptr) {
+ lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+ }
container->addNormalWorkOrder(
new JoinWorkOrderClass(query_id_,
build_relation_,
@@ -203,7 +209,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
selection,
hash_table,
output_destination,
- storage_manager),
+ storage_manager,
+ lip_filter_adaptive_prober),
op_index_);
}
started_ = true;
@@ -211,6 +218,10 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
return started_;
} else {
while (num_workorders_generated_ < probe_relation_block_ids_.size()) {
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+ if (lip_filter_deployment != nullptr) {
+ lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+ }
container->addNormalWorkOrder(
new JoinWorkOrderClass(
query_id_,
@@ -223,7 +234,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
selection,
hash_table,
output_destination,
- storage_manager),
+ storage_manager,
+ lip_filter_adaptive_prober),
op_index_);
++num_workorders_generated_;
} // end while
@@ -423,6 +435,17 @@ void HashInnerJoinWorkOrder::execute() {
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+
+ std::unique_ptr<TupleIdSequence> lip_filter_existence_map;
+ std::unique_ptr<ValueAccessor> base_accessor;
+ if (lip_filter_adaptive_prober_ != nullptr) {
+ base_accessor.reset(probe_accessor.release());
+ lip_filter_existence_map.reset(
+ lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+ probe_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*lip_filter_existence_map));
+ }
+
MapBasedJoinedTupleCollector collector;
if (join_key_attributes_.size() == 1) {
hash_table_.getAllFromValueAccessor(
@@ -529,6 +552,16 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+ std::unique_ptr<TupleIdSequence> lip_filter_existence_map;
+ std::unique_ptr<ValueAccessor> base_accessor;
+ if (lip_filter_adaptive_prober_ != nullptr) {
+ base_accessor.reset(probe_accessor.release());
+ lip_filter_existence_map.reset(
+ lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+ probe_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*lip_filter_existence_map));
+ }
+
// We collect all the matching probe relation tuples, as there's a residual
// preidcate that needs to be applied after collecting these matches.
MapBasedJoinedTupleCollector collector;
@@ -548,7 +581,6 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
// Get a filter for tuples in the given probe block.
TupleIdSequence filter(probe_store.getMaxTupleID() + 1);
- filter.setRange(0, filter.length(), false);
for (const std::pair<const block_id,
std::vector<std::pair<tuple_id, tuple_id>>>
&build_block_entry : *collector.getJoinedTuples()) {
@@ -609,7 +641,22 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
- SemiAntiJoinTupleCollector collector(probe_store);
+ std::unique_ptr<TupleIdSequence> existence_map;
+
+ std::unique_ptr<ValueAccessor> base_accessor;
+ if (lip_filter_adaptive_prober_ != nullptr) {
+ base_accessor.reset(probe_accessor.release());
+ existence_map.reset(
+ lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+ probe_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+ }
+
+ if (existence_map == nullptr) {
+ existence_map.reset(probe_store.getExistenceMap());
+ }
+
+ SemiAntiJoinTupleCollector collector(existence_map.get());
// We collect all the probe relation tuples which have at least one matching
// tuple in the build relation. As a performance optimization, the hash table
// just looks for the existence of the probing key in the hash table and sets
@@ -636,8 +683,15 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
probe_block->getIndices(),
probe_block->getIndicesConsistent());
- std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
- probe_store.createValueAccessor(collector.filter()));
+ std::unique_ptr<ValueAccessor> probe_accessor_with_filter;
+ if (base_accessor != nullptr) {
+ probe_accessor_with_filter.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+ } else {
+ probe_accessor_with_filter.reset(
+ probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+ }
+
ColumnVectorsValueAccessor temp_result;
for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
selection_it != selection_.end(); ++selection_it) {
@@ -656,7 +710,9 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
- SemiAntiJoinTupleCollector collector(probe_store);
+ std::unique_ptr<TupleIdSequence> existence_map(probe_store.getExistenceMap());
+
+ SemiAntiJoinTupleCollector collector(existence_map.get());
// We probe the hash table to find the keys which have an entry in the
// hash table.
if (join_key_attributes_.size() == 1) {
@@ -680,7 +736,7 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
probe_block->getIndicesConsistent());
std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
- probe_store.createValueAccessor(collector.filter()));
+ probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
ColumnVectorsValueAccessor temp_result;
for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
selection_it != selection_.end(); ++selection_it) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index fa393b6..29d6eba 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -35,6 +35,7 @@
#include "storage/HashTable.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "glog/logging.h"
@@ -307,7 +308,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -318,7 +320,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
/**
* @brief Constructor for the distributed version.
@@ -354,7 +357,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -365,7 +369,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
~HashInnerJoinWorkOrder() override {}
@@ -392,6 +397,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder);
};
@@ -435,7 +442,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -446,7 +454,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
/**
* @brief Constructor for the distributed version.
@@ -482,7 +491,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -493,7 +503,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
~HashSemiJoinWorkOrder() override {}
@@ -516,6 +527,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder);
};
@@ -559,7 +572,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -570,7 +584,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
/**
* @brief Constructor for the distributed version.
@@ -606,7 +621,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -617,7 +633,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
~HashAntiJoinWorkOrder() override {}
@@ -646,6 +663,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index d56326e..d7855cf 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -30,6 +30,7 @@
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
#include "glog/logging.h"
@@ -43,9 +44,14 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
StorageManager *storage_manager,
const Predicate *predicate,
const std::vector<std::unique_ptr<const Scalar>> *selection,
- InsertDestination *output_destination) {
+ InsertDestination *output_destination,
+ const LIPFilterDeployment *lip_filter_deployment) {
if (input_relation_is_stored_) {
for (const block_id input_block_id : input_relation_block_ids_) {
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+ if (lip_filter_deployment != nullptr) {
+ lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+ }
container->addNormalWorkOrder(new SelectWorkOrder(query_id_,
input_relation_,
input_block_id,
@@ -54,11 +60,16 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
simple_selection_,
selection,
output_destination,
- storage_manager),
+ storage_manager,
+ lip_filter_adaptive_prober),
op_index_);
}
} else {
while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+ if (lip_filter_deployment != nullptr) {
+ lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+ }
container->addNormalWorkOrder(
new SelectWorkOrder(
query_id_,
@@ -81,13 +92,18 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
StorageManager *storage_manager,
const Predicate *predicate,
const std::vector<std::unique_ptr<const Scalar>> *selection,
- InsertDestination *output_destination) {
+ InsertDestination *output_destination,
+ const LIPFilterDeployment *lip_filter_deployment) {
DCHECK(placement_scheme_ != nullptr);
const std::size_t num_partitions = input_relation_.getPartitionScheme().getPartitionSchemeHeader().getNumPartitions();
if (input_relation_is_stored_) {
for (std::size_t part_id = 0; part_id < num_partitions; ++part_id) {
for (const block_id input_block_id :
input_relation_block_ids_in_partition_[part_id]) {
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+ if (lip_filter_deployment != nullptr) {
+ lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+ }
container->addNormalWorkOrder(
new SelectWorkOrder(
query_id_,
@@ -99,6 +115,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
selection,
output_destination,
storage_manager,
+ lip_filter_adaptive_prober,
placement_scheme_->getNUMANodeForBlock(input_block_id)),
op_index_);
}
@@ -109,6 +126,10 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
input_relation_block_ids_in_partition_[part_id].size()) {
block_id block_in_partition
= input_relation_block_ids_in_partition_[part_id][num_workorders_generated_in_partition_[part_id]];
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr;
+ if (lip_filter_deployment != nullptr) {
+ lip_filter_adaptive_prober = lip_filter_deployment->createLIPFilterAdaptiveProber();
+ }
container->addNormalWorkOrder(
new SelectWorkOrder(
query_id_,
@@ -120,6 +141,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
selection,
output_destination,
storage_manager,
+ lip_filter_adaptive_prober,
placement_scheme_->getNUMANodeForBlock(block_in_partition)),
op_index_);
++num_workorders_generated_in_partition_[part_id];
@@ -146,16 +168,31 @@ bool SelectOperator::getAllWorkOrders(
InsertDestination *output_destination =
query_context->getInsertDestination(output_destination_index_);
+ const LIPFilterDeployment *lip_filter_deployment = nullptr;
+ if (lip_deployment_index_ != QueryContext::kInvalidILIPDeploymentId) {
+ lip_filter_deployment = query_context->getLIPDeployment(lip_deployment_index_);
+ }
+
if (input_relation_is_stored_) {
if (!started_) {
if (input_relation_.hasPartitionScheme()) {
#ifdef QUICKSTEP_HAVE_LIBNUMA
if (input_relation_.hasNUMAPlacementScheme()) {
- addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+ addPartitionAwareWorkOrders(container,
+ storage_manager,
+ predicate,
+ selection,
+ output_destination,
+ lip_filter_deployment);
}
#endif
} else {
- addWorkOrders(container, storage_manager, predicate, selection, output_destination);
+ addWorkOrders(container,
+ storage_manager,
+ predicate,
+ selection,
+ output_destination,
+ lip_filter_deployment);
}
started_ = true;
}
@@ -164,11 +201,21 @@ bool SelectOperator::getAllWorkOrders(
if (input_relation_.hasPartitionScheme()) {
#ifdef QUICKSTEP_HAVE_LIBNUMA
if (input_relation_.hasNUMAPlacementScheme()) {
- addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+ addPartitionAwareWorkOrders(container,
+ storage_manager,
+ predicate,
+ selection,
+ output_destination,
+ lip_filter_deployment);
}
#endif
} else {
- addWorkOrders(container, storage_manager, predicate, selection, output_destination);
+ addWorkOrders(container,
+ storage_manager,
+ predicate,
+ selection,
+ output_destination,
+ lip_filter_deployment);
}
return done_feeding_input_relation_;
}
@@ -222,11 +269,13 @@ void SelectWorkOrder::execute() {
if (simple_projection_) {
block->selectSimple(simple_selection_,
predicate_,
- output_destination_);
+ output_destination_,
+ lip_filter_adaptive_prober_.get());
} else {
block->select(*DCHECK_NOTNULL(selection_),
predicate_,
- output_destination_);
+ output_destination_,
+ lip_filter_adaptive_prober_.get());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index 0f5c712..0d2ae16 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -38,6 +38,7 @@
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "glog/logging.h"
@@ -49,6 +50,7 @@ namespace quickstep {
class CatalogRelationSchema;
class InsertDestination;
+class LIPFilterDeployment;
class Predicate;
class Scalar;
class StorageManager;
@@ -250,13 +252,15 @@ class SelectOperator : public RelationalOperator {
StorageManager *storage_manager,
const Predicate *predicate,
const std::vector<std::unique_ptr<const Scalar>> *selection,
- InsertDestination *output_destination);
+ InsertDestination *output_destination,
+ const LIPFilterDeployment *lip_filter_deployment);
void addPartitionAwareWorkOrders(WorkOrdersContainer *container,
StorageManager *storage_manager,
const Predicate *predicate,
const std::vector<std::unique_ptr<const Scalar>> *selection,
- InsertDestination *output_destination);
+ InsertDestination *output_destination,
+ const LIPFilterDeployment *lip_filter_deployment);
private:
/**
@@ -328,6 +332,7 @@ class SelectWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> *selection,
InsertDestination *output_destination,
StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr,
const numa_node_id numa_node = 0)
: WorkOrder(query_id),
input_relation_(input_relation),
@@ -337,7 +342,8 @@ class SelectWorkOrder : public WorkOrder {
simple_selection_(simple_selection),
selection_(selection),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {
preferred_numa_nodes_.push_back(numa_node);
}
@@ -370,6 +376,7 @@ class SelectWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> *selection,
InsertDestination *output_destination,
StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr,
const numa_node_id numa_node = 0)
: WorkOrder(query_id),
input_relation_(input_relation),
@@ -379,7 +386,8 @@ class SelectWorkOrder : public WorkOrder {
simple_selection_(std::move(simple_selection)),
selection_(selection),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {
preferred_numa_nodes_.push_back(numa_node);
}
@@ -407,6 +415,8 @@ class SelectWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(SelectWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 7908db1..707f0fe 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -46,10 +46,12 @@
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "types/containers/Tuple.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "glog/logging.h"
@@ -332,11 +334,12 @@ bool AggregationOperationState::ProtoIsValid(
return true;
}
-void AggregationOperationState::aggregateBlock(const block_id input_block) {
+void AggregationOperationState::aggregateBlock(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
if (group_by_list_.empty()) {
- aggregateBlockSingleState(input_block);
+ aggregateBlockSingleState(input_block, lip_filter_adaptive_prober);
} else {
- aggregateBlockHashTable(input_block);
+ aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
}
}
@@ -361,17 +364,27 @@ void AggregationOperationState::mergeSingleState(
}
void AggregationOperationState::aggregateBlockSingleState(
- const block_id input_block) {
+ const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
// Aggregate per-block state for each aggregate.
std::vector<std::unique_ptr<AggregationState>> local_state;
BlockReference block(
storage_manager_->getBlock(input_block, input_relation_));
- // If there is a filter predicate, 'reuse_matches' holds the set of matching
- // tuples so that it can be reused across multiple aggregates (i.e. we only
- // pay the cost of evaluating the predicate once).
- std::unique_ptr<TupleIdSequence> reuse_matches;
+ // TODO: predicate + lip_filter
+ std::unique_ptr<TupleIdSequence> filter;
+ if (lip_filter_adaptive_prober != nullptr || predicate_ != nullptr) {
+ std::unique_ptr<ValueAccessor> accessor(
+ block->getTupleStorageSubBlock().createValueAccessor());
+ if (lip_filter_adaptive_prober != nullptr) {
+ filter.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
+ }
+ if (predicate_ != nullptr) {
+ filter.reset(block->getMatchesForPredicate(predicate_.get(), filter.get()));
+ }
+ }
+
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -388,9 +401,8 @@ void AggregationOperationState::aggregateBlockSingleState(
arguments_[agg_idx],
local_arguments_as_attributes,
{}, /* group_by */
- predicate_.get(),
+ filter.get(),
distinctify_hashtables_[agg_idx].get(),
- &reuse_matches,
nullptr /* reuse_group_by_vectors */);
local_state.emplace_back(nullptr);
} else {
@@ -398,8 +410,7 @@ void AggregationOperationState::aggregateBlockSingleState(
local_state.emplace_back(block->aggregate(*handles_[agg_idx],
arguments_[agg_idx],
local_arguments_as_attributes,
- predicate_.get(),
- &reuse_matches));
+ filter.get()));
}
}
@@ -408,14 +419,23 @@ void AggregationOperationState::aggregateBlockSingleState(
}
void AggregationOperationState::aggregateBlockHashTable(
- const block_id input_block) {
+ const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
BlockReference block(
storage_manager_->getBlock(input_block, input_relation_));
- // If there is a filter predicate, 'reuse_matches' holds the set of matching
- // tuples so that it can be reused across multiple aggregates (i.e. we only
- // pay the cost of evaluating the predicate once).
- std::unique_ptr<TupleIdSequence> reuse_matches;
+ // TODO: predicate + lip_filter
+ std::unique_ptr<TupleIdSequence> filter;
+ if (lip_filter_adaptive_prober != nullptr || predicate_ != nullptr) {
+ std::unique_ptr<ValueAccessor> accessor(
+ block->getTupleStorageSubBlock().createValueAccessor());
+ if (lip_filter_adaptive_prober != nullptr) {
+ filter.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
+ }
+ if (predicate_ != nullptr) {
+ filter.reset(block->getMatchesForPredicate(predicate_.get(), filter.get()));
+ }
+ }
// This holds values of all the GROUP BY attributes so that the can be reused
// across multiple aggregates (i.e. we only pay the cost of evaluatin the
@@ -432,9 +452,8 @@ void AggregationOperationState::aggregateBlockHashTable(
arguments_[agg_idx],
nullptr, /* arguments_as_attributes */
group_by_list_,
- predicate_.get(),
+ filter.get(),
distinctify_hashtables_[agg_idx].get(),
- &reuse_matches,
&reuse_group_by_vectors);
}
}
@@ -448,9 +467,8 @@ void AggregationOperationState::aggregateBlockHashTable(
DCHECK(agg_hash_table != nullptr);
block->aggregateGroupBy(arguments_,
group_by_list_,
- predicate_.get(),
+ filter.get(),
agg_hash_table,
- &reuse_matches,
&reuse_group_by_vectors);
group_by_hashtable_pool_->returnHashTable(agg_hash_table);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index cbbfc22..a80bcb0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -41,6 +41,7 @@ class AggregateFunction;
class CatalogDatabaseLite;
class CatalogRelationSchema;
class InsertDestination;
+class LIPFilterAdaptiveProber;
class StorageManager;
/** \addtogroup Storage
@@ -156,7 +157,8 @@ class AggregationOperationState {
* @param input_block The block ID of the storage block where the aggreates
* are going to be computed.
**/
- void aggregateBlock(const block_id input_block);
+ void aggregateBlock(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
/**
* @brief Generate the final results for the aggregates managed by this
@@ -179,8 +181,10 @@ class AggregationOperationState {
const std::vector<std::unique_ptr<AggregationState>> &local_state);
// Aggregate on input block.
- void aggregateBlockSingleState(const block_id input_block);
- void aggregateBlockHashTable(const block_id input_block);
+ void aggregateBlockSingleState(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
+ void aggregateBlockHashTable(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
void finalizeSingleState(InsertDestination *output_destination);
void finalizeHashTable(InsertDestination *output_destination);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index ec5990f..7c16c34 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -60,6 +60,7 @@
#include "types/containers/Tuple.hpp"
#include "types/operations/comparisons/ComparisonUtil.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "glog/logging.h"
@@ -341,20 +342,30 @@ void StorageBlock::sample(const bool is_block_sample,
void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const {
+ InsertDestinationInterface *destination,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const {
ColumnVectorsValueAccessor temp_result;
{
SubBlocksReference sub_blocks_ref(*tuple_store_,
indices_,
indices_consistent_);
+ std::unique_ptr<ValueAccessor> base_accessor(tuple_store_->createValueAccessor());
std::unique_ptr<TupleIdSequence> matches;
+
+ if (lip_filter_adaptive_prober != nullptr) {
+ matches.reset(lip_filter_adaptive_prober->filterValueAccessor(base_accessor.get()));
+ }
+
+ if (predicate != nullptr) {
+ matches.reset(getMatchesForPredicate(predicate));
+ }
+
std::unique_ptr<ValueAccessor> accessor;
- if (predicate == nullptr) {
- accessor.reset(tuple_store_->createValueAccessor());
+ if (matches == nullptr) {
+ accessor.reset(base_accessor.release());
} else {
- matches.reset(getMatchesForPredicate(predicate));
- accessor.reset(tuple_store_->createValueAccessor(matches.get()));
+ accessor.reset(base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
}
for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection.begin();
@@ -371,14 +382,24 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
void StorageBlock::selectSimple(const std::vector<attribute_id> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const {
- std::unique_ptr<ValueAccessor> accessor;
+ InsertDestinationInterface *destination,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const {
+ std::unique_ptr<ValueAccessor> base_accessor(tuple_store_->createValueAccessor());
std::unique_ptr<TupleIdSequence> matches;
- if (predicate == nullptr) {
- accessor.reset(tuple_store_->createValueAccessor());
- } else {
+
+ if (lip_filter_adaptive_prober != nullptr) {
+ matches.reset(lip_filter_adaptive_prober->filterValueAccessor(base_accessor.get()));
+ }
+
+ if (predicate != nullptr) {
matches.reset(getMatchesForPredicate(predicate));
- accessor.reset(tuple_store_->createValueAccessor(matches.get()));
+ }
+
+ std::unique_ptr<ValueAccessor> accessor;
+ if (matches == nullptr) {
+ accessor.reset(base_accessor.release());
+ } else {
+ accessor.reset(base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
}
destination->bulkInsertTuplesWithRemappedAttributes(selection,
@@ -389,37 +410,28 @@ AggregationState* StorageBlock::aggregate(
const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
- const Predicate *predicate,
- std::unique_ptr<TupleIdSequence> *reuse_matches) const {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this same
- // block.
- if (predicate && !*reuse_matches) {
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
+ const TupleIdSequence *filter) const {
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all the arguments to this aggregate are plain relation attributes,
// aggregate directly on a ValueAccessor from this block to avoid a copy.
if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
<< "Mismatch between number of arguments and number of attribute_ids";
- return aggregateHelperValueAccessor(handle, *arguments_as_attributes, reuse_matches->get());
+ return aggregateHelperValueAccessor(handle, *arguments_as_attributes, filter);
}
// TODO(shoban): We may want to optimize for ScalarLiteral here.
#endif
// Call aggregateHelperColumnVector() to materialize each argument as a
// ColumnVector, then aggregate over those.
- return aggregateHelperColumnVector(handle, arguments, reuse_matches->get());
+ return aggregateHelperColumnVector(handle, arguments, filter);
}
void StorageBlock::aggregateGroupBy(
const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
DCHECK_GT(group_by.size(), 0u)
<< "Called aggregateGroupBy() with zero GROUP BY expressions";
@@ -438,23 +450,7 @@ void StorageBlock::aggregateGroupBy(
// this aggregate, as well as the GROUP BY expression values.
ColumnVectorsValueAccessor temp_result;
{
- std::unique_ptr<ValueAccessor> accessor;
- if (predicate) {
- if (!*reuse_matches) {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this
- // same block.
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
- // Create a filtered ValueAccessor that only iterates over predicate
- // matches.
- accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
- } else {
- // Create a ValueAccessor that iterates over all tuples in this block
- accessor.reset(tuple_store_->createValueAccessor());
- }
-
+ std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
attribute_id attr_id = 0;
// First, put GROUP BY keys into 'temp_result'.
@@ -503,9 +499,8 @@ void StorageBlock::aggregateDistinct(
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *distinctify_hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
DCHECK_GT(arguments.size(), 0u)
<< "Called aggregateDistinct() with zero argument expressions";
@@ -517,22 +512,7 @@ void StorageBlock::aggregateDistinct(
// this aggregate, as well as the GROUP BY expression values.
ColumnVectorsValueAccessor temp_result;
{
- std::unique_ptr<ValueAccessor> accessor;
- if (predicate) {
- if (!*reuse_matches) {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this
- // same block.
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
- // Create a filtered ValueAccessor that only iterates over predicate
- // matches.
- accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
- } else {
- // Create a ValueAccessor that iterates over all tuples in this block
- accessor.reset(tuple_store_->createValueAccessor());
- }
+ std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all the arguments to this aggregate are plain relation attributes,
@@ -1246,23 +1226,36 @@ bool StorageBlock::rebuildIndexes(bool short_circuit) {
return all_indices_consistent_;
}
-TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate) const {
+TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate,
+ const TupleIdSequence *filter) const {
if (predicate == nullptr) {
- return tuple_store_->getExistenceMap();
+ TupleIdSequence *matched = tuple_store_->getExistenceMap();
+ if (filter != nullptr) {
+ matched->intersectWith(*filter);
+ }
+ return matched;
}
std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor());
- std::unique_ptr<TupleIdSequence> existence_map;
- if (!tuple_store_->isPacked()) {
- existence_map.reset(tuple_store_->getExistenceMap());
- }
SubBlocksReference sub_blocks_ref(*tuple_store_,
indices_,
indices_consistent_);
- return predicate->getAllMatches(value_accessor.get(),
- &sub_blocks_ref,
- nullptr,
- existence_map.get());
+
+ if (!tuple_store_->isPacked()) {
+ std::unique_ptr<TupleIdSequence> existence_map(tuple_store_->getExistenceMap());
+ if (filter != nullptr) {
+ existence_map->intersectWith(*filter);
+ }
+ return predicate->getAllMatches(value_accessor.get(),
+ &sub_blocks_ref,
+ nullptr,
+ existence_map.get());
+ } else {
+ return predicate->getAllMatches(value_accessor.get(),
+ &sub_blocks_ref,
+ nullptr,
+ filter);
+ }
}
std::unordered_map<attribute_id, TypedValue>* StorageBlock::generateUpdatedValues(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index bab5bab..77fb137 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -44,6 +44,7 @@ class AggregationState;
class CatalogRelationSchema;
class ColumnVector;
class InsertDestinationInterface;
+class LIPFilterAdaptiveProber;
class Predicate;
class Scalar;
class StorageBlockLayout;
@@ -312,6 +313,9 @@ class StorageBlock : public StorageBlockBase {
const std::vector<attribute_id> &attribute_map,
ValueAccessor *accessor);
+ TupleIdSequence* getMatchesForPredicate(const Predicate *predicate,
+ const TupleIdSequence *filter = nullptr) const;
+
/**
* @brief Perform a random sampling of data on the StorageBlock. The number
* of records sampled is determined by the sample percentage in case of
@@ -349,7 +353,8 @@ class StorageBlock : public StorageBlockBase {
**/
void select(const std::vector<std::unique_ptr<const Scalar>> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const;
+ InsertDestinationInterface *destination,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const;
/**
* @brief Perform a simple SELECT query on this StorageBlock which only
@@ -372,7 +377,8 @@ class StorageBlock : public StorageBlockBase {
**/
void selectSimple(const std::vector<attribute_id> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const;
+ InsertDestinationInterface *destination,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) const;
/**
* @brief Perform non GROUP BY aggregation on the tuples in the this storage
@@ -412,8 +418,7 @@ class StorageBlock : public StorageBlockBase {
const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
- const Predicate *predicate,
- std::unique_ptr<TupleIdSequence> *reuse_matches) const;
+ const TupleIdSequence *filter) const;
/**
* @brief Perform GROUP BY aggregation on the tuples in the this storage
@@ -461,9 +466,8 @@ class StorageBlock : public StorageBlockBase {
void aggregateGroupBy(
const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
/**
@@ -505,9 +509,8 @@ class StorageBlock : public StorageBlockBase {
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *distinctify_hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
/**
@@ -627,8 +630,6 @@ class StorageBlock : public StorageBlockBase {
// StorageBlock's header.
bool rebuildIndexes(bool short_circuit);
- TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
-
std::unordered_map<attribute_id, TypedValue>* generateUpdatedValues(
const ValueAccessor &accessor,
const tuple_id tuple,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d14509f0/utility/lip_filter/LIPFilter.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.cpp b/utility/lip_filter/LIPFilter.cpp
new file mode 100644
index 0000000..92bfab1
--- /dev/null
+++ b/utility/lip_filter/LIPFilter.cpp
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/lip_filter/LIPFilter.hpp"
+
+namespace quickstep {
+
+} // namespace quickstep