You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/22 05:15:38 UTC
incubator-quickstep git commit: Add backend support for LIPFilters.
[Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/lip-refactor-backend 96ef35071 -> 47fe8d6d2 (forced update)
Add backend support for LIPFilters.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/47fe8d6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/47fe8d6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/47fe8d6d
Branch: refs/heads/lip-refactor-backend
Commit: 47fe8d6d2ac1fe695cdaa7fbc804b1f8c0d7ea8a
Parents: 9c32ea4
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Wed Sep 7 13:20:43 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Sat Oct 22 00:15:33 2016 -0500
----------------------------------------------------------------------
expressions/scalar/ScalarAttribute.cpp | 2 +-
query_execution/QueryContext.hpp | 2 +-
relational_operators/AggregationOperator.cpp | 12 ++-
relational_operators/AggregationOperator.hpp | 10 +-
relational_operators/BuildHashOperator.cpp | 17 +++-
relational_operators/BuildHashOperator.hpp | 18 +++-
relational_operators/CMakeLists.txt | 11 ++
relational_operators/HashJoinOperator.cpp | 78 +++++++++++---
relational_operators/HashJoinOperator.hpp | 49 ++++++---
relational_operators/SelectOperator.cpp | 77 ++++++++++----
relational_operators/SelectOperator.hpp | 16 ++-
relational_operators/WorkOrder.proto | 8 +-
relational_operators/WorkOrderFactory.cpp | 26 +++--
storage/AggregationOperationState.cpp | 51 ++++++----
storage/AggregationOperationState.hpp | 9 +-
storage/CMakeLists.txt | 5 +-
storage/StorageBlock.cpp | 118 +++++++++-------------
storage/StorageBlock.hpp | 82 ++++++---------
utility/lip_filter/CMakeLists.txt | 4 +
utility/lip_filter/LIPFilterBuilder.hpp | 3 -
utility/lip_filter/LIPFilterUtil.hpp | 79 +++++++++++++++
21 files changed, 463 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/expressions/scalar/ScalarAttribute.cpp
----------------------------------------------------------------------
diff --git a/expressions/scalar/ScalarAttribute.cpp b/expressions/scalar/ScalarAttribute.cpp
index b29286b..cc42084 100644
--- a/expressions/scalar/ScalarAttribute.cpp
+++ b/expressions/scalar/ScalarAttribute.cpp
@@ -168,7 +168,7 @@ ColumnVector* ScalarAttribute::getAllValuesForJoin(
ValueAccessor *accessor = using_left_relation ? left_accessor
: right_accessor;
- return InvokeOnValueAccessorNotAdapter(
+ return InvokeOnAnyValueAccessor(
accessor,
[&joined_tuple_ids,
&attr_id,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 4ebb042..5b186b5 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -88,7 +88,7 @@ class QueryContext {
/**
* @brief A unique identifier for a LIPFilterDeployment per query.
**/
- typedef std::uint32_t lip_deployment_id;
+ typedef std::int32_t lip_deployment_id;
static constexpr lip_deployment_id kInvalidLIPDeploymentId = static_cast<lip_deployment_id>(-1);
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/relational_operators/AggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp
index 056e76d..e111f5b 100644
--- a/relational_operators/AggregationOperator.cpp
+++ b/relational_operators/AggregationOperator.cpp
@@ -27,6 +27,8 @@
#include "relational_operators/WorkOrder.pb.h"
#include "storage/AggregationOperationState.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
#include "tmb/id_typedefs.h"
@@ -45,7 +47,8 @@ bool AggregationOperator::getAllWorkOrders(
new AggregationWorkOrder(
query_id_,
input_block_id,
- query_context->getAggregationState(aggr_state_index_)),
+ query_context->getAggregationState(aggr_state_index_),
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
op_index_);
}
started_ = true;
@@ -57,7 +60,8 @@ bool AggregationOperator::getAllWorkOrders(
new AggregationWorkOrder(
query_id_,
input_relation_block_ids_[num_workorders_generated_],
- query_context->getAggregationState(aggr_state_index_)),
+ query_context->getAggregationState(aggr_state_index_),
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
op_index_);
++num_workorders_generated_;
}
@@ -92,13 +96,13 @@ serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_
proto->SetExtension(serialization::AggregationWorkOrder::block_id, block);
proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_);
+ proto->SetExtension(serialization::AggregationWorkOrder::lip_deployment_index, lip_deployment_index_);
return proto;
}
-
void AggregationWorkOrder::execute() {
- state_->aggregateBlock(input_block_id_);
+ state_->aggregateBlock(input_block_id_, lip_filter_adaptive_prober_.get());
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index 31c1da4..b5ed977 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -30,6 +30,7 @@
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "glog/logging.h"
@@ -137,13 +138,16 @@ class AggregationWorkOrder : public WorkOrder {
* @param query_id The ID of this query.
* @param input_block_id The block id.
* @param state The AggregationState to use.
+ * @param lip_filter_adaptive_prober The attached LIP filter prober.
**/
AggregationWorkOrder(const std::size_t query_id,
const block_id input_block_id,
- AggregationOperationState *state)
+ AggregationOperationState *state,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
input_block_id_(input_block_id),
- state_(DCHECK_NOTNULL(state)) {}
+ state_(DCHECK_NOTNULL(state)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
~AggregationWorkOrder() override {}
@@ -153,6 +157,8 @@ class AggregationWorkOrder : public WorkOrder {
const block_id input_block_id_;
AggregationOperationState *state_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(AggregationWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/relational_operators/BuildHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp
index 465621c..60e091f 100644
--- a/relational_operators/BuildHashOperator.cpp
+++ b/relational_operators/BuildHashOperator.cpp
@@ -34,6 +34,8 @@
#include "storage/TupleReference.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
#include "glog/logging.h"
@@ -79,7 +81,8 @@ bool BuildHashOperator::getAllWorkOrders(
any_join_key_attributes_nullable_,
input_block_id,
hash_table,
- storage_manager),
+ storage_manager,
+ CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
op_index_);
}
started_ = true;
@@ -95,7 +98,8 @@ bool BuildHashOperator::getAllWorkOrders(
any_join_key_attributes_nullable_,
input_relation_block_ids_[num_workorders_generated_],
hash_table,
- storage_manager),
+ storage_manager,
+ CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
op_index_);
++num_workorders_generated_;
}
@@ -136,17 +140,24 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id
any_join_key_attributes_nullable_);
proto->SetExtension(serialization::BuildHashWorkOrder::join_hash_table_index, hash_table_index_);
proto->SetExtension(serialization::BuildHashWorkOrder::block_id, block);
+ proto->SetExtension(serialization::BuildHashWorkOrder::lip_deployment_index, lip_deployment_index_);
return proto;
}
-
void BuildHashWorkOrder::execute() {
BlockReference block(
storage_manager_->getBlock(build_block_id_, input_relation_));
TupleReferenceGenerator generator(build_block_id_);
std::unique_ptr<ValueAccessor> accessor(block->getTupleStorageSubBlock().createValueAccessor());
+
+ // Build LIPFilters if enabled.
+ if (lip_filter_builder_ != nullptr) {
+ lip_filter_builder_->insertValueAccessor(accessor.get());
+ accessor->beginIterationVirtual();
+ }
+
HashTablePutResult result;
if (join_key_attributes_.size() == 1) {
result = hash_table_->putValueAccessor(accessor.get(),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index 4a80a8a..0f96ef2 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -20,6 +20,7 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
+#include <memory>
#include <string>
#include <utility>
#include <vector>
@@ -31,6 +32,7 @@
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
#include "glog/logging.h"
@@ -162,6 +164,7 @@ class BuildHashWorkOrder : public WorkOrder {
* @param build_block_id The block id.
* @param hash_table The JoinHashTable to use.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_builder The attached LIP filter builer.
**/
BuildHashWorkOrder(const std::size_t query_id,
const CatalogRelationSchema &input_relation,
@@ -169,14 +172,16 @@ class BuildHashWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable,
const block_id build_block_id,
JoinHashTable *hash_table,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterBuilder *lip_filter_builder = nullptr)
: WorkOrder(query_id),
input_relation_(input_relation),
join_key_attributes_(join_key_attributes),
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
build_block_id_(build_block_id),
hash_table_(DCHECK_NOTNULL(hash_table)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_builder_(lip_filter_builder) {}
/**
* @brief Constructor for the distributed version.
@@ -189,6 +194,7 @@ class BuildHashWorkOrder : public WorkOrder {
* @param build_block_id The block id.
* @param hash_table The JoinHashTable to use.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_builder The attached LIP filter builer.
**/
BuildHashWorkOrder(const std::size_t query_id,
const CatalogRelationSchema &input_relation,
@@ -196,14 +202,16 @@ class BuildHashWorkOrder : public WorkOrder {
const bool any_join_key_attributes_nullable,
const block_id build_block_id,
JoinHashTable *hash_table,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterBuilder *lip_filter_builder = nullptr)
: WorkOrder(query_id),
input_relation_(input_relation),
join_key_attributes_(std::move(join_key_attributes)),
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
build_block_id_(build_block_id),
hash_table_(DCHECK_NOTNULL(hash_table)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_builder_(lip_filter_builder) {}
~BuildHashWorkOrder() override {}
@@ -222,6 +230,8 @@ class BuildHashWorkOrder : public WorkOrder {
JoinHashTable *hash_table_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterBuilder> lip_filter_builder_;
+
DISALLOW_COPY_AND_ASSIGN(BuildHashWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index a9645b4..8dd65d0 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -92,6 +92,8 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator
quickstep_storage_AggregationOperationState
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+ quickstep_utility_lipfilter_LIPFilterUtil
tmb)
target_link_libraries(quickstep_relationaloperators_BuildHashOperator
glog
@@ -111,6 +113,8 @@ target_link_libraries(quickstep_relationaloperators_BuildHashOperator
quickstep_storage_TupleStorageSubBlock
quickstep_storage_ValueAccessor
quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilterBuilder
+ quickstep_utility_lipfilter_LIPFilterUtil
tmb)
target_link_libraries(quickstep_relationaloperators_CreateIndexOperator
glog
@@ -223,6 +227,8 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
quickstep_types_containers_ColumnVector
quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+ quickstep_utility_lipfilter_LIPFilterUtil
tmb)
target_link_libraries(quickstep_relationaloperators_InsertOperator
glog
@@ -322,7 +328,11 @@ target_link_libraries(quickstep_relationaloperators_SelectOperator
quickstep_storage_StorageBlock
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
+ quickstep_storage_TupleIdSequence
+ quickstep_storage_ValueAccessor
quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+ quickstep_utility_lipfilter_LIPFilterUtil
tmb)
if(QUICKSTEP_HAVE_LIBNUMA)
target_link_libraries(quickstep_relationaloperators_SelectOperator
@@ -492,6 +502,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
quickstep_relationaloperators_WorkOrder_proto
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilterUtil
tmb)
target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
quickstep_relationaloperators_SortMergeRunOperator_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 779c0fe..eeb2096 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -48,6 +48,8 @@
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -95,8 +97,8 @@ class MapBasedJoinedTupleCollector {
class SemiAntiJoinTupleCollector {
public:
- explicit SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) {
- filter_.reset(tuple_store.getExistenceMap());
+ explicit SemiAntiJoinTupleCollector(TupleIdSequence *existence_map) {
+ filter_ = existence_map;
}
template <typename ValueAccessorT>
@@ -104,12 +106,8 @@ class SemiAntiJoinTupleCollector {
filter_->set(accessor.getCurrentPosition(), false);
}
- const TupleIdSequence* filter() const {
- return filter_.get();
- }
-
private:
- std::unique_ptr<TupleIdSequence> filter_;
+ TupleIdSequence *filter_;
};
class OuterJoinTupleCollector {
@@ -203,7 +201,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
selection,
hash_table,
output_destination,
- storage_manager),
+ storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
op_index_);
}
started_ = true;
@@ -223,7 +222,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
selection,
hash_table,
output_destination,
- storage_manager),
+ storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
op_index_);
++num_workorders_generated_;
} // end while
@@ -360,6 +360,7 @@ serialization::WorkOrder* HashJoinOperator::createNonOuterJoinWorkOrderProto(
proto->SetExtension(serialization::HashJoinWorkOrder::selection_index, selection_index_);
proto->SetExtension(serialization::HashJoinWorkOrder::block_id, block);
proto->SetExtension(serialization::HashJoinWorkOrder::residual_predicate_index, residual_predicate_index_);
+ proto->SetExtension(serialization::HashJoinWorkOrder::lip_deployment_index, lip_deployment_index_);
return proto;
}
@@ -423,6 +424,18 @@ void HashInnerJoinWorkOrder::execute() {
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+
+ // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+ std::unique_ptr<TupleIdSequence> existence_map;
+ std::unique_ptr<ValueAccessor> base_accessor;
+ if (lip_filter_adaptive_prober_ != nullptr) {
+ base_accessor.reset(probe_accessor.release());
+ existence_map.reset(
+ lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+ probe_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+ }
+
MapBasedJoinedTupleCollector collector;
if (join_key_attributes_.size() == 1) {
hash_table_.getAllFromValueAccessor(
@@ -529,6 +542,17 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+ // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+ std::unique_ptr<TupleIdSequence> existence_map;
+ std::unique_ptr<ValueAccessor> base_accessor;
+ if (lip_filter_adaptive_prober_ != nullptr) {
+ base_accessor.reset(probe_accessor.release());
+ existence_map.reset(
+ lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+ probe_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+ }
+
// We collect all the matching probe relation tuples, as there's a residual
// preidcate that needs to be applied after collecting these matches.
MapBasedJoinedTupleCollector collector;
@@ -548,7 +572,6 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
// Get a filter for tuples in the given probe block.
TupleIdSequence filter(probe_store.getMaxTupleID() + 1);
- filter.setRange(0, filter.length(), false);
for (const std::pair<const block_id,
std::vector<std::pair<tuple_id, tuple_id>>>
&build_block_entry : *collector.getJoinedTuples()) {
@@ -609,7 +632,23 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
- SemiAntiJoinTupleCollector collector(probe_store);
+
+ // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+ std::unique_ptr<TupleIdSequence> existence_map;
+ std::unique_ptr<ValueAccessor> base_accessor;
+ if (lip_filter_adaptive_prober_ != nullptr) {
+ base_accessor.reset(probe_accessor.release());
+ existence_map.reset(
+ lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+ probe_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+ }
+
+ if (existence_map == nullptr) {
+ existence_map.reset(probe_store.getExistenceMap());
+ }
+
+ SemiAntiJoinTupleCollector collector(existence_map.get());
// We collect all the probe relation tuples which have at least one matching
// tuple in the build relation. As a performance optimization, the hash table
// just looks for the existence of the probing key in the hash table and sets
@@ -636,8 +675,15 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
probe_block->getIndices(),
probe_block->getIndicesConsistent());
- std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
- probe_store.createValueAccessor(collector.filter()));
+ std::unique_ptr<ValueAccessor> probe_accessor_with_filter;
+ if (base_accessor != nullptr) {
+ probe_accessor_with_filter.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+ } else {
+ probe_accessor_with_filter.reset(
+ probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+ }
+
ColumnVectorsValueAccessor temp_result;
for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
selection_it != selection_.end(); ++selection_it) {
@@ -656,7 +702,9 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
- SemiAntiJoinTupleCollector collector(probe_store);
+ std::unique_ptr<TupleIdSequence> existence_map(probe_store.getExistenceMap());
+
+ SemiAntiJoinTupleCollector collector(existence_map.get());
// We probe the hash table to find the keys which have an entry in the
// hash table.
if (join_key_attributes_.size() == 1) {
@@ -680,7 +728,7 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
probe_block->getIndicesConsistent());
std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
- probe_store.createValueAccessor(collector.filter()));
+ probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
ColumnVectorsValueAccessor temp_result;
for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
selection_it != selection_.end(); ++selection_it) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index fa393b6..566a367 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -35,6 +35,7 @@
#include "storage/HashTable.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "glog/logging.h"
@@ -295,6 +296,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
* @param hash_table The JoinHashTable to use.
* @param output_destination The InsertDestination to insert the join results.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_adaptive_prober The attached LIP filter prober.
**/
HashInnerJoinWorkOrder(
const std::size_t query_id,
@@ -307,7 +309,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -318,7 +321,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
/**
* @brief Constructor for the distributed version.
@@ -342,6 +346,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
* @param hash_table The JoinHashTable to use.
* @param output_destination The InsertDestination to insert the join results.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_adaptive_prober The attached LIP filter prober.
**/
HashInnerJoinWorkOrder(
const std::size_t query_id,
@@ -354,7 +359,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -365,7 +371,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
~HashInnerJoinWorkOrder() override {}
@@ -392,6 +399,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder);
};
@@ -423,6 +432,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
* @param hash_table The JoinHashTable to use.
* @param output_destination The InsertDestination to insert the join results.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_adaptive_prober The attached LIP filter prober.
**/
HashSemiJoinWorkOrder(
const std::size_t query_id,
@@ -435,7 +445,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -446,7 +457,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
/**
* @brief Constructor for the distributed version.
@@ -470,6 +482,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
* @param hash_table The JoinHashTable to use.
* @param output_destination The InsertDestination to insert the join results.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_adaptive_prober The attached LIP filter prober.
**/
HashSemiJoinWorkOrder(
const std::size_t query_id,
@@ -482,7 +495,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -493,7 +507,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
~HashSemiJoinWorkOrder() override {}
@@ -516,6 +531,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder);
};
@@ -547,6 +564,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
* @param hash_table The JoinHashTable to use.
* @param output_destination The InsertDestination to insert the join results.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_adaptive_prober The attached LIP filter prober.
**/
HashAntiJoinWorkOrder(
const std::size_t query_id,
@@ -559,7 +577,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -570,7 +589,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
/**
* @brief Constructor for the distributed version.
@@ -594,6 +614,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
* @param hash_table The JoinHashTable to use.
* @param output_destination The InsertDestination to insert the join results.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_adaptive_prober The attached LIP filter prober.
**/
HashAntiJoinWorkOrder(
const std::size_t query_id,
@@ -606,7 +627,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> &selection,
const JoinHashTable &hash_table,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
: WorkOrder(query_id),
build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -617,7 +639,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
selection_(selection),
hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
~HashAntiJoinWorkOrder() override {}
@@ -646,6 +669,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index d56326e..36a3fbe 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -30,6 +30,10 @@
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
#include "glog/logging.h"
@@ -40,22 +44,26 @@ namespace quickstep {
class Predicate;
void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
StorageManager *storage_manager,
const Predicate *predicate,
const std::vector<std::unique_ptr<const Scalar>> *selection,
InsertDestination *output_destination) {
if (input_relation_is_stored_) {
for (const block_id input_block_id : input_relation_block_ids_) {
- container->addNormalWorkOrder(new SelectWorkOrder(query_id_,
- input_relation_,
- input_block_id,
- predicate,
- simple_projection_,
- simple_selection_,
- selection,
- output_destination,
- storage_manager),
- op_index_);
+ container->addNormalWorkOrder(
+ new SelectWorkOrder(
+ query_id_,
+ input_relation_,
+ input_block_id,
+ predicate,
+ simple_projection_,
+ simple_selection_,
+ selection,
+ output_destination,
+ storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+ op_index_);
}
} else {
while (num_workorders_generated_ < input_relation_block_ids_.size()) {
@@ -69,7 +77,8 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
simple_selection_,
selection,
output_destination,
- storage_manager),
+ storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
op_index_);
++num_workorders_generated_;
}
@@ -78,6 +87,7 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
#ifdef QUICKSTEP_HAVE_LIBNUMA
void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
StorageManager *storage_manager,
const Predicate *predicate,
const std::vector<std::unique_ptr<const Scalar>> *selection,
@@ -99,6 +109,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
selection,
output_destination,
storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
placement_scheme_->getNUMANodeForBlock(input_block_id)),
op_index_);
}
@@ -120,6 +131,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
selection,
output_destination,
storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
placement_scheme_->getNUMANodeForBlock(block_in_partition)),
op_index_);
++num_workorders_generated_in_partition_[part_id];
@@ -151,11 +163,21 @@ bool SelectOperator::getAllWorkOrders(
if (input_relation_.hasPartitionScheme()) {
#ifdef QUICKSTEP_HAVE_LIBNUMA
if (input_relation_.hasNUMAPlacementScheme()) {
- addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+ addPartitionAwareWorkOrders(container,
+ query_context,
+ storage_manager,
+ predicate,
+ selection,
+ output_destination);
}
#endif
} else {
- addWorkOrders(container, storage_manager, predicate, selection, output_destination);
+ addWorkOrders(container,
+ query_context,
+ storage_manager,
+ predicate,
+ selection,
+ output_destination);
}
started_ = true;
}
@@ -164,11 +186,21 @@ bool SelectOperator::getAllWorkOrders(
if (input_relation_.hasPartitionScheme()) {
#ifdef QUICKSTEP_HAVE_LIBNUMA
if (input_relation_.hasNUMAPlacementScheme()) {
- addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+ addPartitionAwareWorkOrders(container,
+ query_context,
+ storage_manager,
+ predicate,
+ selection,
+ output_destination);
}
#endif
} else {
- addWorkOrders(container, storage_manager, predicate, selection, output_destination);
+ addWorkOrders(container,
+ query_context,
+ storage_manager,
+ predicate,
+ selection,
+ output_destination);
}
return done_feeding_input_relation_;
}
@@ -210,6 +242,7 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id bl
}
}
proto->SetExtension(serialization::SelectWorkOrder::selection_index, selection_index_);
+ proto->SetExtension(serialization::SelectWorkOrder::lip_deployment_index, lip_deployment_index_);
return proto;
}
@@ -219,14 +252,24 @@ void SelectWorkOrder::execute() {
BlockReference block(
storage_manager_->getBlock(input_block_id_, input_relation_, getPreferredNUMANodes()[0]));
+ std::unique_ptr<TupleIdSequence> lip_filter_matches;
+ if (lip_filter_adaptive_prober_ != nullptr) {
+ std::unique_ptr<ValueAccessor> accessor(
+ block->getTupleStorageSubBlock().createValueAccessor());
+ lip_filter_matches.reset(
+ lip_filter_adaptive_prober_->filterValueAccessor(accessor.get()));
+ }
+
if (simple_projection_) {
block->selectSimple(simple_selection_,
predicate_,
- output_destination_);
+ output_destination_,
+ lip_filter_matches.get());
} else {
block->select(*DCHECK_NOTNULL(selection_),
predicate_,
- output_destination_);
+ output_destination_,
+ lip_filter_matches.get());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index 0f5c712..2ace458 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -38,6 +38,7 @@
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "glog/logging.h"
@@ -49,6 +50,7 @@ namespace quickstep {
class CatalogRelationSchema;
class InsertDestination;
+class LIPFilterDeployment;
class Predicate;
class Scalar;
class StorageManager;
@@ -247,12 +249,14 @@ class SelectOperator : public RelationalOperator {
}
void addWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
StorageManager *storage_manager,
const Predicate *predicate,
const std::vector<std::unique_ptr<const Scalar>> *selection,
InsertDestination *output_destination);
void addPartitionAwareWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
StorageManager *storage_manager,
const Predicate *predicate,
const std::vector<std::unique_ptr<const Scalar>> *selection,
@@ -318,6 +322,7 @@ class SelectWorkOrder : public WorkOrder {
* @param output_destination The InsertDestination to insert the selection
* results.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_adaptive_prober The attached LIP filter prober.
**/
SelectWorkOrder(const std::size_t query_id,
const CatalogRelationSchema &input_relation,
@@ -328,6 +333,7 @@ class SelectWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> *selection,
InsertDestination *output_destination,
StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr,
const numa_node_id numa_node = 0)
: WorkOrder(query_id),
input_relation_(input_relation),
@@ -337,7 +343,8 @@ class SelectWorkOrder : public WorkOrder {
simple_selection_(simple_selection),
selection_(selection),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {
preferred_numa_nodes_.push_back(numa_node);
}
@@ -360,6 +367,7 @@ class SelectWorkOrder : public WorkOrder {
* @param output_destination The InsertDestination to insert the selection
* results.
* @param storage_manager The StorageManager to use.
+ * @param lip_filter_adaptive_prober The attached LIP filter prober.
**/
SelectWorkOrder(const std::size_t query_id,
const CatalogRelationSchema &input_relation,
@@ -370,6 +378,7 @@ class SelectWorkOrder : public WorkOrder {
const std::vector<std::unique_ptr<const Scalar>> *selection,
InsertDestination *output_destination,
StorageManager *storage_manager,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr,
const numa_node_id numa_node = 0)
: WorkOrder(query_id),
input_relation_(input_relation),
@@ -379,7 +388,8 @@ class SelectWorkOrder : public WorkOrder {
simple_selection_(std::move(simple_selection)),
selection_(selection),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {
preferred_numa_nodes_.push_back(numa_node);
}
@@ -407,6 +417,8 @@ class SelectWorkOrder : public WorkOrder {
InsertDestination *output_destination_;
StorageManager *storage_manager_;
+ std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
DISALLOW_COPY_AND_ASSIGN(SelectWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 3eed379..a4200b0 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -59,6 +59,7 @@ message AggregationWorkOrder {
// All required.
optional uint32 aggr_state_index = 16;
optional fixed64 block_id = 17;
+ optional int32 lip_deployment_index = 18;
}
}
@@ -70,6 +71,7 @@ message BuildHashWorkOrder {
optional bool any_join_key_attributes_nullable = 34;
optional uint32 join_hash_table_index = 35;
optional fixed64 block_id = 36;
+ optional int32 lip_deployment_index = 37;
}
}
@@ -129,8 +131,9 @@ message HashJoinWorkOrder {
// Used by all but HashOuterJoinWorkOrder.
optional int32 residual_predicate_index = 169;
+ optional int32 lip_deployment_index = 170;
// Used by HashOuterJoinWorkOrder only.
- repeated bool is_selection_on_build = 170;
+ repeated bool is_selection_on_build = 171;
}
}
@@ -185,9 +188,10 @@ message SelectWorkOrder {
// When 'simple_projection' is true.
repeated int32 simple_selection = 245;
-
// Otherwise.
optional int32 selection_index = 246;
+
+ optional int32 lip_deployment_index = 247;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 2356bab..462fa04 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -50,6 +50,7 @@
#include "relational_operators/WindowAggregationOperator.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/StorageBlockInfo.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
#include "glog/logging.h"
@@ -61,6 +62,7 @@ using std::vector;
namespace quickstep {
class InsertDestination;
+class LIPFilterAdaptiveProber;
class Predicate;
class Scalar;
@@ -82,7 +84,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.query_id(),
proto.GetExtension(serialization::AggregationWorkOrder::block_id),
query_context->getAggregationState(
- proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)));
+ proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)),
+ CreateLIPFilterAdaptiveProberHelper(
+ proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context));
}
case serialization::BUILD_HASH: {
LOG(INFO) << "Creating BuildHashWorkOrder";
@@ -101,7 +105,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::BuildHashWorkOrder::block_id),
query_context->getJoinHashTable(
proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index)),
- storage_manager);
+ storage_manager,
+ CreateLIPFilterBuilderHelper(
+ proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index), query_context));
}
case serialization::DELETE: {
LOG(INFO) << "Creating DeleteWorkOrder";
@@ -200,6 +206,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
InsertDestination *output_destination =
query_context->getInsertDestination(
proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index));
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober =
+ CreateLIPFilterAdaptiveProberHelper(
+ proto.GetExtension(serialization::HashJoinWorkOrder::lip_deployment_index), query_context));
switch (hash_join_work_order_type) {
case serialization::HashJoinWorkOrder::HASH_ANTI_JOIN: {
@@ -215,7 +224,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
selection,
hash_table,
output_destination,
- storage_manager);
+ storage_manager,
+ lip_filter_adaptive_prober);
}
case serialization::HashJoinWorkOrder::HASH_INNER_JOIN: {
LOG(INFO) << "Creating HashInnerJoinWorkOrder";
@@ -230,7 +240,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
selection,
hash_table,
output_destination,
- storage_manager);
+ storage_manager,
+ lip_filter_adaptive_prober);
}
case serialization::HashJoinWorkOrder::HASH_OUTER_JOIN: {
vector<bool> is_selection_on_build;
@@ -268,7 +279,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
selection,
hash_table,
output_destination,
- storage_manager);
+ storage_manager,
+ lip_filter_adaptive_prober);
}
default:
LOG(FATAL) << "Unknown HashJoinWorkOrder Type in WorkOrderFactory::ReconstructFromProto";
@@ -346,7 +358,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::SelectWorkOrder::selection_index)),
query_context->getInsertDestination(
proto.GetExtension(serialization::SelectWorkOrder::insert_destination_index)),
- storage_manager);
+ storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(
+ proto.GetExtension(serialization::HashJoinWorkOrder::lip_deployment_index), query_context));
}
case serialization::SORT_MERGE_RUN: {
LOG(INFO) << "Creating SortMergeRunWorkOrder";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 249026d..f89fd7a 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -46,10 +46,13 @@
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/ValueAccessor.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "types/containers/Tuple.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "glog/logging.h"
@@ -331,11 +334,12 @@ bool AggregationOperationState::ProtoIsValid(
return true;
}
-void AggregationOperationState::aggregateBlock(const block_id input_block) {
+void AggregationOperationState::aggregateBlock(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
if (group_by_list_.empty()) {
aggregateBlockSingleState(input_block);
} else {
- aggregateBlockHashTable(input_block);
+ aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
}
}
@@ -367,10 +371,13 @@ void AggregationOperationState::aggregateBlockSingleState(
BlockReference block(
storage_manager_->getBlock(input_block, input_relation_));
- // If there is a filter predicate, 'reuse_matches' holds the set of matching
- // tuples so that it can be reused across multiple aggregates (i.e. we only
- // pay the cost of evaluating the predicate once).
- std::unique_ptr<TupleIdSequence> reuse_matches;
+ std::unique_ptr<TupleIdSequence> matches;
+ if (predicate_ != nullptr) {
+ std::unique_ptr<ValueAccessor> accessor(
+ block->getTupleStorageSubBlock().createValueAccessor());
+ matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
+ }
+
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -387,9 +394,8 @@ void AggregationOperationState::aggregateBlockSingleState(
arguments_[agg_idx],
local_arguments_as_attributes,
{}, /* group_by */
- predicate_.get(),
+ matches.get(),
distinctify_hashtables_[agg_idx].get(),
- &reuse_matches,
nullptr /* reuse_group_by_vectors */);
local_state.emplace_back(nullptr);
} else {
@@ -397,8 +403,7 @@ void AggregationOperationState::aggregateBlockSingleState(
local_state.emplace_back(block->aggregate(*handles_[agg_idx],
arguments_[agg_idx],
local_arguments_as_attributes,
- predicate_.get(),
- &reuse_matches));
+ matches.get()));
}
}
@@ -407,14 +412,24 @@ void AggregationOperationState::aggregateBlockSingleState(
}
void AggregationOperationState::aggregateBlockHashTable(
- const block_id input_block) {
+ const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
BlockReference block(
storage_manager_->getBlock(input_block, input_relation_));
- // If there is a filter predicate, 'reuse_matches' holds the set of matching
- // tuples so that it can be reused across multiple aggregates (i.e. we only
- // pay the cost of evaluating the predicate once).
- std::unique_ptr<TupleIdSequence> reuse_matches;
+ // Apply LIPFilters first, and then the predicate, to generate a TupleIdSequence
+ // as the existence map for the tuples.
+ std::unique_ptr<TupleIdSequence> matches;
+ if (lip_filter_adaptive_prober != nullptr || predicate_ != nullptr) {
+ std::unique_ptr<ValueAccessor> accessor(
+ block->getTupleStorageSubBlock().createValueAccessor());
+ if (lip_filter_adaptive_prober != nullptr) {
+ matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
+ }
+ if (predicate_ != nullptr) {
+ matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
+ }
+ }
// This holds values of all the GROUP BY attributes so that the can be reused
// across multiple aggregates (i.e. we only pay the cost of evaluatin the
@@ -431,9 +446,8 @@ void AggregationOperationState::aggregateBlockHashTable(
arguments_[agg_idx],
nullptr, /* arguments_as_attributes */
group_by_list_,
- predicate_.get(),
+ matches.get(),
distinctify_hashtables_[agg_idx].get(),
- &reuse_matches,
&reuse_group_by_vectors);
}
}
@@ -447,9 +461,8 @@ void AggregationOperationState::aggregateBlockHashTable(
DCHECK(agg_hash_table != nullptr);
block->aggregateGroupBy(arguments_,
group_by_list_,
- predicate_.get(),
+ matches.get(),
agg_hash_table,
- &reuse_matches,
&reuse_group_by_vectors);
group_by_hashtable_pool_->returnHashTable(agg_hash_table);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 3b0f286..f3332a7 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -41,6 +41,7 @@ class AggregateFunction;
class CatalogDatabaseLite;
class CatalogRelationSchema;
class InsertDestination;
+class LIPFilterAdaptiveProber;
class StorageManager;
/** \addtogroup Storage
@@ -155,8 +156,11 @@ class AggregationOperationState {
*
* @param input_block The block ID of the storage block where the aggreates
* are going to be computed.
+ * @param lip_filter_adaptive_prober The LIPFilter prober for pre-filtering
+ * the block.
**/
- void aggregateBlock(const block_id input_block);
+ void aggregateBlock(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
/**
* @brief Generate the final results for the aggregates managed by this
@@ -185,7 +189,8 @@ class AggregationOperationState {
// Aggregate on input block.
void aggregateBlockSingleState(const block_id input_block);
- void aggregateBlockHashTable(const block_id input_block);
+ void aggregateBlockHashTable(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
void finalizeSingleState(InsertDestination *output_destination);
void finalizeHashTable(InsertDestination *output_destination);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 325a7cb..0e32cc1 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -292,11 +292,14 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_storage_StorageBlock
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
+ quickstep_storage_TupleIdSequence
+ quickstep_storage_ValueAccessor
quickstep_types_TypedValue
quickstep_types_containers_ColumnVector
quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_types_containers_Tuple
- quickstep_utility_Macros)
+ quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilterAdaptiveProber)
target_link_libraries(quickstep_storage_AggregationOperationState_proto
quickstep_expressions_Expressions_proto
quickstep_expressions_aggregation_AggregateFunction_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index ec5990f..63acf7d 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -341,20 +341,24 @@ void StorageBlock::sample(const bool is_block_sample,
void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const {
+ InsertDestinationInterface *destination,
+ const TupleIdSequence *filter) const {
ColumnVectorsValueAccessor temp_result;
{
SubBlocksReference sub_blocks_ref(*tuple_store_,
indices_,
indices_consistent_);
+ std::unique_ptr<ValueAccessor> base_accessor(
+ tuple_store_->createValueAccessor(filter));
+
std::unique_ptr<TupleIdSequence> matches;
std::unique_ptr<ValueAccessor> accessor;
- if (predicate == nullptr) {
- accessor.reset(tuple_store_->createValueAccessor());
- } else {
+ if (predicate != nullptr) {
matches.reset(getMatchesForPredicate(predicate));
- accessor.reset(tuple_store_->createValueAccessor(matches.get()));
+ accessor.reset(base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+ } else {
+ accessor.reset(base_accessor.release());
}
for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection.begin();
@@ -371,14 +375,18 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
void StorageBlock::selectSimple(const std::vector<attribute_id> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const {
- std::unique_ptr<ValueAccessor> accessor;
+ InsertDestinationInterface *destination,
+ const TupleIdSequence *filter) const {
+ std::unique_ptr<ValueAccessor> base_accessor(
+ tuple_store_->createValueAccessor(filter));
+
std::unique_ptr<TupleIdSequence> matches;
- if (predicate == nullptr) {
- accessor.reset(tuple_store_->createValueAccessor());
- } else {
+ std::unique_ptr<ValueAccessor> accessor;
+ if (predicate != nullptr) {
matches.reset(getMatchesForPredicate(predicate));
- accessor.reset(tuple_store_->createValueAccessor(matches.get()));
+ accessor.reset(base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+ } else {
+ accessor.reset(base_accessor.release());
}
destination->bulkInsertTuplesWithRemappedAttributes(selection,
@@ -389,37 +397,28 @@ AggregationState* StorageBlock::aggregate(
const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
- const Predicate *predicate,
- std::unique_ptr<TupleIdSequence> *reuse_matches) const {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this same
- // block.
- if (predicate && !*reuse_matches) {
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
+ const TupleIdSequence *filter) const {
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all the arguments to this aggregate are plain relation attributes,
// aggregate directly on a ValueAccessor from this block to avoid a copy.
if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
<< "Mismatch between number of arguments and number of attribute_ids";
- return aggregateHelperValueAccessor(handle, *arguments_as_attributes, reuse_matches->get());
+ return aggregateHelperValueAccessor(handle, *arguments_as_attributes, filter);
}
// TODO(shoban): We may want to optimize for ScalarLiteral here.
#endif
// Call aggregateHelperColumnVector() to materialize each argument as a
// ColumnVector, then aggregate over those.
- return aggregateHelperColumnVector(handle, arguments, reuse_matches->get());
+ return aggregateHelperColumnVector(handle, arguments, filter);
}
void StorageBlock::aggregateGroupBy(
const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
DCHECK_GT(group_by.size(), 0u)
<< "Called aggregateGroupBy() with zero GROUP BY expressions";
@@ -438,23 +437,7 @@ void StorageBlock::aggregateGroupBy(
// this aggregate, as well as the GROUP BY expression values.
ColumnVectorsValueAccessor temp_result;
{
- std::unique_ptr<ValueAccessor> accessor;
- if (predicate) {
- if (!*reuse_matches) {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this
- // same block.
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
- // Create a filtered ValueAccessor that only iterates over predicate
- // matches.
- accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
- } else {
- // Create a ValueAccessor that iterates over all tuples in this block
- accessor.reset(tuple_store_->createValueAccessor());
- }
-
+ std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
attribute_id attr_id = 0;
// First, put GROUP BY keys into 'temp_result'.
@@ -503,9 +486,8 @@ void StorageBlock::aggregateDistinct(
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *distinctify_hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
DCHECK_GT(arguments.size(), 0u)
<< "Called aggregateDistinct() with zero argument expressions";
@@ -517,22 +499,7 @@ void StorageBlock::aggregateDistinct(
// this aggregate, as well as the GROUP BY expression values.
ColumnVectorsValueAccessor temp_result;
{
- std::unique_ptr<ValueAccessor> accessor;
- if (predicate) {
- if (!*reuse_matches) {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this
- // same block.
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
- // Create a filtered ValueAccessor that only iterates over predicate
- // matches.
- accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
- } else {
- // Create a ValueAccessor that iterates over all tuples in this block
- accessor.reset(tuple_store_->createValueAccessor());
- }
+ std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all the arguments to this aggregate are plain relation attributes,
@@ -1246,23 +1213,36 @@ bool StorageBlock::rebuildIndexes(bool short_circuit) {
return all_indices_consistent_;
}
-TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate) const {
+TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate,
+ const TupleIdSequence *filter) const {
if (predicate == nullptr) {
- return tuple_store_->getExistenceMap();
+ TupleIdSequence *matches = tuple_store_->getExistenceMap();
+ if (filter != nullptr) {
+ matches->intersectWith(*filter);
+ }
+ return matches;
}
std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor());
- std::unique_ptr<TupleIdSequence> existence_map;
- if (!tuple_store_->isPacked()) {
- existence_map.reset(tuple_store_->getExistenceMap());
- }
SubBlocksReference sub_blocks_ref(*tuple_store_,
indices_,
indices_consistent_);
- return predicate->getAllMatches(value_accessor.get(),
- &sub_blocks_ref,
- nullptr,
- existence_map.get());
+
+ if (!tuple_store_->isPacked()) {
+ std::unique_ptr<TupleIdSequence> existence_map(tuple_store_->getExistenceMap());
+ if (filter != nullptr) {
+ existence_map->intersectWith(*filter);
+ }
+ return predicate->getAllMatches(value_accessor.get(),
+ &sub_blocks_ref,
+ nullptr,
+ existence_map.get());
+ } else {
+ return predicate->getAllMatches(value_accessor.get(),
+ &sub_blocks_ref,
+ nullptr,
+ filter);
+ }
}
std::unordered_map<attribute_id, TypedValue>* StorageBlock::generateUpdatedValues(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index bab5bab..61a35fe 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -313,6 +313,17 @@ class StorageBlock : public StorageBlockBase {
ValueAccessor *accessor);
/**
+ * @brief Get the IDs of tuples in this StorageBlock which match a given Predicate.
+ *
+ * @param predicate The predicate to match.
+ * @param filter If non-NULL, then only tuple IDs which are set in the
+ * filter will be checked (all others will be assumed to be false).
+ * @return A TupleIdSequence which contains matching tuple IDs for predicate.
+ **/
+ TupleIdSequence* getMatchesForPredicate(const Predicate *predicate,
+ const TupleIdSequence *filter = nullptr) const;
+
+ /**
* @brief Perform a random sampling of data on the StorageBlock. The number
* of records sampled is determined by the sample percentage in case of
* tuple sample. For block sample all the records in a block are taken.
@@ -340,6 +351,8 @@ class StorageBlock : public StorageBlockBase {
* should be matched.
* @param destination Where to insert the tuples resulting from the SELECT
* query.
+ * @param filter If non-NULL, then only tuple IDs which are set in the
+ * filter will be checked (all others will be assumed to be false).
* @exception TupleTooLargeForBlock A tuple produced by this selection was
* too large to insert into an empty block provided by
* destination. Selection may be partially complete (with some
@@ -349,17 +362,20 @@ class StorageBlock : public StorageBlockBase {
**/
void select(const std::vector<std::unique_ptr<const Scalar>> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const;
+ InsertDestinationInterface *destination,
+ const TupleIdSequence *filter) const;
/**
* @brief Perform a simple SELECT query on this StorageBlock which only
* projects attributes and does not evaluate expressions.
*
- * @param destination Where to insert the tuples resulting from the SELECT
- * query.
* @param selection The attributes to project.
* @param predicate A predicate for selection. NULL indicates that all tuples
* should be matched.
+ * @param destination Where to insert the tuples resulting from the SELECT
+ * query.
+ * @param filter If non-NULL, then only tuple IDs which are set in the
+ * filter will be checked (all others will be assumed to be false).
* @exception TupleTooLargeForBlock A tuple produced by this selection was
* too large to insert into an empty block provided by
* destination. Selection may be partially complete (with some
@@ -372,7 +388,8 @@ class StorageBlock : public StorageBlockBase {
**/
void selectSimple(const std::vector<attribute_id> &selection,
const Predicate *predicate,
- InsertDestinationInterface *destination) const;
+ InsertDestinationInterface *destination,
+ const TupleIdSequence *filter) const;
/**
* @brief Perform non GROUP BY aggregation on the tuples in the this storage
@@ -384,23 +401,8 @@ class StorageBlock : public StorageBlockBase {
* @param arguments_as_attributes If non-NULL, indicates a valid attribute_id
* for each of the elements in arguments, and is used to elide a copy.
* Has no effect if NULL, or if VECTOR_COPY_ELISION_LEVEL is NONE.
- * @param predicate A predicate for selection. nullptr indicates that all
- * tuples should be aggregated on.
- * @param reuse_matches This parameter is used to store and reuse tuple-id
- * sequence of matches pre-computed in an earlier invocations to
- * aggregate(). \c reuse_matches is never \c nullptr for ease of use.
- * Current invocation of aggregate() will reuse TupleIdSequence if
- * passed, otherwise compute a TupleIdSequence based on \c predicate
- * and store in \c reuse_matches. We use std::unique_ptr for each of
- * use, since the caller will not have to selective free.
- *
- * For example, see this relevant pseudo-C++ code:
- * \code
- * std::unique_ptr<TupleIdSequence> matches;
- * for each aggregate {
- * block.aggregate(..., &matches);
- * }
- * \endcode
+ * @param filter If non-NULL, then only tuple IDs which are set in the
+ * filter will be checked (all others will be assumed to be false).
*
* @return Aggregated state for this block in the form of an
* AggregationState. AggregationHandle::mergeStates() can be called
@@ -412,8 +414,7 @@ class StorageBlock : public StorageBlockBase {
const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
- const Predicate *predicate,
- std::unique_ptr<TupleIdSequence> *reuse_matches) const;
+ const TupleIdSequence *filter) const;
/**
* @brief Perform GROUP BY aggregation on the tuples in the this storage
@@ -423,18 +424,10 @@ class StorageBlock : public StorageBlockBase {
* @param group_by The list of GROUP BY attributes/expressions. The tuples in
* this storage block are grouped by these attributes before
* aggregation.
- * @param predicate A predicate for selection. nullptr indicates that all
- * tuples should be aggregated on.
+ * @param filter If non-NULL, then only tuple IDs which are set in the
+ * filter will be checked (all others will be assumed to be false).
* @param hash_table Hash table to store aggregation state mapped based on
* GROUP BY value list (defined by \c group_by).
- * @param reuse_matches This parameter is used to store and reuse tuple-id
- * sequence of matches pre-computed in an earlier invocations of
- * aggregateGroupBy(). \c reuse_matches is never \c nullptr for ease of
- * use. Current invocation of aggregateGroupBy() will reuse
- * TupleIdSequence if passed, otherwise computes a TupleIdSequence based
- * on \c predicate and stores in \c reuse_matches. We use
- * std::unique_ptr for each of use, since the caller will not have to
- * selective free.
* @param reuse_group_by_vectors This parameter is used to store and reuse
* GROUP BY attribute vectors pre-computed in an earlier invocation of
* aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
@@ -444,10 +437,9 @@ class StorageBlock : public StorageBlockBase {
*
* For sample usage of aggregateGroupBy, see this relevant pseudo-C++ code:
* \code
- * std::unique_ptr<TupleIdSequence> matches;
* std::vector<std::unique_ptr<ColumnVector>> group_by_vectors;
* for each aggregate {
- * block.aggregateGroupBy(..., &matches, &group_by_vectors);
+ * block.aggregateGroupBy(..., &group_by_vectors);
* }
* \endcode
**/
@@ -461,9 +453,8 @@ class StorageBlock : public StorageBlockBase {
void aggregateGroupBy(
const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
/**
@@ -481,19 +472,11 @@ class StorageBlock : public StorageBlockBase {
* for each of the elements in arguments, and is used to elide a copy.
* Has no effect if NULL, or if VECTOR_COPY_ELISION_LEVEL is NONE.
* @param group_by The list of GROUP BY attributes/expressions.
- * @param predicate A predicate for selection. \c nullptr indicates that all
- * tuples should be aggregated on.
+ * @param filter If non-NULL, then only tuple IDs which are set in the
+ * filter will be checked (all others will be assumed to be false).
* @param distinctify_hash_table Hash table to store the arguments and GROUP
* BY expressions together as hash table key and a bool constant \c true
* as hash table value. (So the hash table actually serves as a hash set.)
- * @param reuse_matches This parameter is used to store and reuse tuple-id
- * sequence of matches pre-computed in an earlier invocations of
- * aggregateGroupBy(). \c reuse_matches is never \c nullptr for ease of
- * use. Current invocation of aggregateGroupBy() will reuse
- * TupleIdSequence if passed, otherwise computes a TupleIdSequence based
- * on \c predicate and stores in \c reuse_matches. We use
- * std::unique_ptr for each of use, since the caller will not have to
- * selective free.
* @param reuse_group_by_vectors This parameter is used to store and reuse
* GROUP BY attribute vectors pre-computed in an earlier invocation of
* aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
@@ -505,9 +488,8 @@ class StorageBlock : public StorageBlockBase {
const std::vector<std::unique_ptr<const Scalar>> &arguments,
const std::vector<attribute_id> *arguments_as_attributes,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
+ const TupleIdSequence *filter,
AggregationStateHashTableBase *distinctify_hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
/**
@@ -627,8 +609,6 @@ class StorageBlock : public StorageBlockBase {
// StorageBlock's header.
bool rebuildIndexes(bool short_circuit);
- TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
-
std::unordered_map<attribute_id, TypedValue>* generateUpdatedValues(
const ValueAccessor &accessor,
const tuple_id tuple,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index b7224d2..23b3763 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -25,6 +25,7 @@ add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.
add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp)
add_library(quickstep_utility_lipfilter_LIPFilterDeployment LIPFilterDeployment.cpp LIPFilterDeployment.hpp)
add_library(quickstep_utility_lipfilter_LIPFilterFactory LIPFilterFactory.cpp LIPFilterFactory.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterUtil ../../empty_src.cpp LIPFilterUtil.hpp)
add_library(quickstep_utility_lipfilter_LIPFilter_proto
${utility_lipfilter_LIPFilter_proto_srcs})
add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp)
@@ -58,6 +59,9 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory
quickstep_utility_lipfilter_LIPFilter_proto
quickstep_utility_lipfilter_SingleIdentityHashFilter
quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterUtil
+ quickstep_queryexecution_QueryContext
+ quickstep_utility_lipfilter_LIPFilterDeployment)
target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto
${PROTOBUF_LIBRARY}
quickstep_types_Type_proto)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/utility/lip_filter/LIPFilterBuilder.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterBuilder.hpp b/utility/lip_filter/LIPFilterBuilder.hpp
index deb8f66..aa84a06 100644
--- a/utility/lip_filter/LIPFilterBuilder.hpp
+++ b/utility/lip_filter/LIPFilterBuilder.hpp
@@ -39,9 +39,6 @@ class ValueAccessor;
* @{
*/
-class LIPFilterBuilder;
-typedef std::shared_ptr<LIPFilterBuilder> LIPFilterBuilderPtr;
-
/**
* @brief Helper class for building LIPFilters from a relation (i.e. ValueAccessor).
*/
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47fe8d6d/utility/lip_filter/LIPFilterUtil.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterUtil.hpp b/utility/lip_filter/LIPFilterUtil.hpp
new file mode 100644
index 0000000..5d43c49
--- /dev/null
+++ b/utility/lip_filter/LIPFilterUtil.hpp
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_UTIL_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_UTIL_HPP_
+
+#include "query_execution/QueryContext.hpp"
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
+
+namespace quickstep {
+
+class LIPFilterBuilder;
+class LIPFilterAdaptiveProber;
+
+/** \addtogroup Utility
+ * @{
+ */
+
+/**
+ * @brief Create a LIPFilterBuilder for the given LIPFilterDeployment in QueryContext.
+ *
+ * @param lip_deployment_index The id of the LIPFilterDeployment in QueryContext.
+ * @param query_context The QueryContext.
+ * @return A LIPFilterBuilder object, or nullptr if \p lip_deployment_index is invalid.
+ */
+inline LIPFilterBuilder* CreateLIPFilterBuilderHelper(
+ const QueryContext::lip_deployment_id lip_deployment_index,
+ const QueryContext *query_context) {
+ if (lip_deployment_index == QueryContext::kInvalidLIPDeploymentId) {
+ return nullptr;
+ } else {
+ const LIPFilterDeployment *lip_filter_deployment =
+ query_context->getLIPDeployment(lip_deployment_index);
+ return lip_filter_deployment->createLIPFilterBuilder();
+ }
+}
+
+/**
+ * @brief Create a LIPFilterAdaptiveProber for the given LIPFilterDeployment
+ * in QueryContext.
+ *
+ * @param lip_deployment_index The id of the LIPFilterDeployment in QueryContext.
+ * @param query_context The QueryContext.
+ * @return A LIPFilterAdaptiveProber object, or nullptr if \p lip_deployment_index
+ * is invalid.
+ */
+inline LIPFilterAdaptiveProber* CreateLIPFilterAdaptiveProberHelper(
+ const QueryContext::lip_deployment_id lip_deployment_index,
+ const QueryContext *query_context) {
+ if (lip_deployment_index == QueryContext::kInvalidLIPDeploymentId) {
+ return nullptr;
+ } else {
+ const LIPFilterDeployment *lip_filter_deployment =
+ query_context->getLIPDeployment(lip_deployment_index);
+ return lip_filter_deployment->createLIPFilterAdaptiveProber();
+ }
+}
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_UTIL_HPP_