You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/20 20:30:32 UTC
[4/8] incubator-quickstep git commit: Initial commit for QUICKSTEP-28
and QUICKSTEP-29.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 3f6e23a..073b813 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -59,7 +59,7 @@ namespace quickstep {
AggregationOperationState::AggregationOperationState(
const CatalogRelationSchema &input_relation,
- const std::vector<const AggregateFunction*> &aggregate_functions,
+ const std::vector<const AggregateFunction *> &aggregate_functions,
std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
std::vector<bool> &&is_distinct,
std::vector<std::unique_ptr<const Scalar>> &&group_by,
@@ -78,11 +78,14 @@ AggregationOperationState::AggregationOperationState(
DCHECK(aggregate_functions.size() == arguments_.size());
// Get the types of GROUP BY expressions for creating HashTables below.
- std::vector<const Type*> group_by_types;
+ std::vector<const Type *> group_by_types;
for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
group_by_types.emplace_back(&group_by_element->getType());
}
+ std::vector<AggregationHandle *> group_by_handles;
+ group_by_handles.clear();
+
if (aggregate_functions.size() == 0) {
// If there is no aggregation function, then it is a distinctify operation
// on the group-by expressions.
@@ -91,26 +94,28 @@ AggregationOperationState::AggregationOperationState(
handles_.emplace_back(new AggregationHandleDistinct());
arguments_.push_back({});
is_distinct_.emplace_back(false);
-
- group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
- new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- handles_.back().get(),
- storage_manager)));
+ group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types,
+ {1},
+ handles_,
+ storage_manager));
} else {
// Set up each individual aggregate in this operation.
- std::vector<const AggregateFunction*>::const_iterator agg_func_it
- = aggregate_functions.begin();
- std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator args_it
- = arguments_.begin();
+ std::vector<const AggregateFunction *>::const_iterator agg_func_it =
+ aggregate_functions.begin();
+ std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
+ args_it = arguments_.begin();
std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
- std::vector<HashTableImplType>::const_iterator distinctify_hash_table_impl_types_it
- = distinctify_hash_table_impl_types.begin();
- for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) {
+ std::vector<HashTableImplType>::const_iterator
+ distinctify_hash_table_impl_types_it =
+ distinctify_hash_table_impl_types.begin();
+ std::vector<std::size_t> payload_sizes;
+ for (; agg_func_it != aggregate_functions.end();
+ ++agg_func_it, ++args_it, ++is_distinct_it) {
// Get the Types of this aggregate's arguments so that we can create an
// AggregationHandle.
- std::vector<const Type*> argument_types;
+ std::vector<const Type *> argument_types;
for (const std::unique_ptr<const Scalar> &argument : *args_it) {
argument_types.emplace_back(&argument->getType());
}
@@ -125,13 +130,13 @@ AggregationOperationState::AggregationOperationState(
handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
if (!group_by_list_.empty()) {
- // Aggregation with GROUP BY: create a HashTable pool for per-group states.
- group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
- new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- handles_.back().get(),
- storage_manager)));
+ // Aggregation with GROUP BY: combined payload is partially updated in
+ // the presence of DISTINCT.
+ if (*is_distinct_it) {
+ handles_.back()->blockUpdate();
+ }
+ group_by_handles.emplace_back(handles_.back());
+ payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
} else {
// Aggregation without GROUP BY: create a single global state.
single_states_.emplace_back(handles_.back()->createInitialState());
@@ -143,40 +148,60 @@ AggregationOperationState::AggregationOperationState(
std::vector<attribute_id> local_arguments_as_attributes;
local_arguments_as_attributes.reserve(args_it->size());
for (const std::unique_ptr<const Scalar> &argument : *args_it) {
- const attribute_id argument_id = argument->getAttributeIdForValueAccessor();
+ const attribute_id argument_id =
+ argument->getAttributeIdForValueAccessor();
if (argument_id == -1) {
local_arguments_as_attributes.clear();
break;
} else {
- DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
+ DCHECK_EQ(input_relation_.getID(),
+ argument->getRelationIdForValueAccessor());
local_arguments_as_attributes.push_back(argument_id);
}
}
- arguments_as_attributes_.emplace_back(std::move(local_arguments_as_attributes));
+ arguments_as_attributes_.emplace_back(
+ std::move(local_arguments_as_attributes));
#endif
}
- // Initialize the corresponding distinctify hash table if this is a DISTINCT
+ // Initialize the corresponding distinctify hash table if this is a
+ // DISTINCT
// aggregation.
if (*is_distinct_it) {
- std::vector<const Type*> key_types(group_by_types);
- key_types.insert(key_types.end(), argument_types.begin(), argument_types.end());
- // TODO(jianqiao): estimated_num_entries is quite inaccurate for estimating
+ std::vector<const Type *> key_types(group_by_types);
+ key_types.insert(
+ key_types.end(), argument_types.begin(), argument_types.end());
+ // TODO(jianqiao): estimated_num_entries is quite inaccurate for
+ // estimating
// the number of entries in the distinctify hash table. We may estimate
- // for each distinct aggregation an estimated_num_distinct_keys value during
+ // for each distinct aggregation an estimated_num_distinct_keys value
+ // during
// query optimization, if it worths.
distinctify_hashtables_.emplace_back(
- handles_.back()->createDistinctifyHashTable(
+ AggregationStateFastHashTableFactory::CreateResizable(
*distinctify_hash_table_impl_types_it,
key_types,
estimated_num_entries,
+ {0},
+ {},
storage_manager));
++distinctify_hash_table_impl_types_it;
} else {
distinctify_hashtables_.emplace_back(nullptr);
}
}
+
+ if (!group_by_handles.empty()) {
+ // Aggregation with GROUP BY: create a HashTable pool for per-group
+ // states.
+ group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types,
+ payload_sizes,
+ group_by_handles,
+ storage_manager));
+ }
}
}
@@ -187,7 +212,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
DCHECK(ProtoIsValid(proto, database));
// Rebuild contructor arguments from their representation in 'proto'.
- std::vector<const AggregateFunction*> aggregate_functions;
+ std::vector<const AggregateFunction *> aggregate_functions;
std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments;
std::vector<bool> is_distinct;
std::vector<HashTableImplType> distinctify_hash_table_impl_types;
@@ -200,62 +225,63 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
arguments.emplace_back();
arguments.back().reserve(agg_proto.argument_size());
- for (int argument_idx = 0; argument_idx < agg_proto.argument_size(); ++argument_idx) {
+ for (int argument_idx = 0; argument_idx < agg_proto.argument_size();
+ ++argument_idx) {
arguments.back().emplace_back(ScalarFactory::ReconstructFromProto(
- agg_proto.argument(argument_idx),
- database));
+ agg_proto.argument(argument_idx), database));
}
is_distinct.emplace_back(agg_proto.is_distinct());
if (agg_proto.is_distinct()) {
distinctify_hash_table_impl_types.emplace_back(
- HashTableImplTypeFromProto(
- proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index)));
+ HashTableImplTypeFromProto(proto.distinctify_hash_table_impl_types(
+ distinctify_hash_table_impl_type_index)));
++distinctify_hash_table_impl_type_index;
}
}
std::vector<std::unique_ptr<const Scalar>> group_by_expressions;
- for (int group_by_idx = 0;
- group_by_idx < proto.group_by_expressions_size();
+ for (int group_by_idx = 0; group_by_idx < proto.group_by_expressions_size();
++group_by_idx) {
group_by_expressions.emplace_back(ScalarFactory::ReconstructFromProto(
- proto.group_by_expressions(group_by_idx),
- database));
+ proto.group_by_expressions(group_by_idx), database));
}
unique_ptr<Predicate> predicate;
if (proto.has_predicate()) {
predicate.reset(
- PredicateFactory::ReconstructFromProto(proto.predicate(),
- database));
+ PredicateFactory::ReconstructFromProto(proto.predicate(), database));
}
- return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
- aggregate_functions,
- std::move(arguments),
- std::move(is_distinct),
- std::move(group_by_expressions),
- predicate.release(),
- proto.estimated_num_entries(),
- HashTableImplTypeFromProto(proto.hash_table_impl_type()),
- distinctify_hash_table_impl_types,
- storage_manager);
+ return new AggregationOperationState(
+ database.getRelationSchemaById(proto.relation_id()),
+ aggregate_functions,
+ std::move(arguments),
+ std::move(is_distinct),
+ std::move(group_by_expressions),
+ predicate.release(),
+ proto.estimated_num_entries(),
+ HashTableImplTypeFromProto(proto.hash_table_impl_type()),
+ distinctify_hash_table_impl_types,
+ storage_manager);
}
-bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOperationState &proto,
- const CatalogDatabaseLite &database) {
+bool AggregationOperationState::ProtoIsValid(
+ const serialization::AggregationOperationState &proto,
+ const CatalogDatabaseLite &database) {
if (!proto.IsInitialized() ||
!database.hasRelationWithId(proto.relation_id()) ||
(proto.aggregates_size() < 0)) {
return false;
}
- std::size_t num_distinctify_hash_tables = proto.distinctify_hash_table_impl_types_size();
+ std::size_t num_distinctify_hash_tables =
+ proto.distinctify_hash_table_impl_types_size();
std::size_t distinctify_hash_table_impl_type_index = 0;
for (int i = 0; i < proto.aggregates_size(); ++i) {
- if (!AggregateFunctionFactory::ProtoIsValid(proto.aggregates(i).function())) {
+ if (!AggregateFunctionFactory::ProtoIsValid(
+ proto.aggregates(i).function())) {
return false;
}
@@ -266,16 +292,18 @@ bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOpe
for (int argument_idx = 0;
argument_idx < proto.aggregates(i).argument_size();
++argument_idx) {
- if (!ScalarFactory::ProtoIsValid(proto.aggregates(i).argument(argument_idx),
- database)) {
+ if (!ScalarFactory::ProtoIsValid(
+ proto.aggregates(i).argument(argument_idx), database)) {
return false;
}
}
if (proto.aggregates(i).is_distinct()) {
- if (distinctify_hash_table_impl_type_index >= num_distinctify_hash_tables ||
+ if (distinctify_hash_table_impl_type_index >=
+ num_distinctify_hash_tables ||
!serialization::HashTableImplType_IsValid(
- proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index))) {
+ proto.distinctify_hash_table_impl_types(
+ distinctify_hash_table_impl_type_index))) {
return false;
}
}
@@ -288,8 +316,9 @@ bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOpe
}
if (proto.group_by_expressions_size() > 0) {
- if (!proto.has_hash_table_impl_type()
- || !serialization::HashTableImplType_IsValid(proto.hash_table_impl_type())) {
+ if (!proto.has_hash_table_impl_type() ||
+ !serialization::HashTableImplType_IsValid(
+ proto.hash_table_impl_type())) {
return false;
}
}
@@ -311,7 +340,8 @@ void AggregationOperationState::aggregateBlock(const block_id input_block) {
}
}
-void AggregationOperationState::finalizeAggregate(InsertDestination *output_destination) {
+void AggregationOperationState::finalizeAggregate(
+ InsertDestination *output_destination) {
if (group_by_list_.empty()) {
finalizeSingleState(output_destination);
} else {
@@ -330,19 +360,19 @@ void AggregationOperationState::mergeSingleState(
}
}
-void AggregationOperationState::aggregateBlockSingleState(const block_id input_block) {
+void AggregationOperationState::aggregateBlockSingleState(
+ const block_id input_block) {
// Aggregate per-block state for each aggregate.
std::vector<std::unique_ptr<AggregationState>> local_state;
- BlockReference block(storage_manager_->getBlock(input_block, input_relation_));
+ BlockReference block(
+ storage_manager_->getBlock(input_block, input_relation_));
// If there is a filter predicate, 'reuse_matches' holds the set of matching
// tuples so that it can be reused across multiple aggregates (i.e. we only
// pay the cost of evaluating the predicate once).
std::unique_ptr<TupleIdSequence> reuse_matches;
- for (std::size_t agg_idx = 0;
- agg_idx < handles_.size();
- ++agg_idx) {
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all arguments are attributes of the input relation, elide a copy.
@@ -365,12 +395,11 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
local_state.emplace_back(nullptr);
} else {
// Call StorageBlock::aggregate() to actually do the aggregation.
- local_state.emplace_back(
- block->aggregate(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- predicate_.get(),
- &reuse_matches));
+ local_state.emplace_back(block->aggregate(*handles_[agg_idx],
+ arguments_[agg_idx],
+ local_arguments_as_attributes,
+ predicate_.get(),
+ &reuse_matches));
}
}
@@ -378,8 +407,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
mergeSingleState(local_state);
}
-void AggregationOperationState::aggregateBlockHashTable(const block_id input_block) {
- BlockReference block(storage_manager_->getBlock(input_block, input_relation_));
+void AggregationOperationState::aggregateBlockHashTable(
+ const block_id input_block) {
+ BlockReference block(
+ storage_manager_->getBlock(input_block, input_relation_));
// If there is a filter predicate, 'reuse_matches' holds the set of matching
// tuples so that it can be reused across multiple aggregates (i.e. we only
@@ -391,11 +422,10 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
// GROUP BY expressions once).
std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
- for (std::size_t agg_idx = 0;
- agg_idx < handles_.size();
- ++agg_idx) {
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
- // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression
+ // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
+ // expression
// values and the aggregation arguments together as keys directly into the
// (threadsafe) shared global distinctify HashTable for this aggregate.
block->aggregateDistinct(*handles_[agg_idx],
@@ -406,45 +436,54 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
distinctify_hashtables_[agg_idx].get(),
&reuse_matches,
&reuse_group_by_vectors);
- } else {
- // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
- // directly into the (threadsafe) shared global HashTable for this
- // aggregate.
- DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
- AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
- DCHECK(agg_hash_table != nullptr);
- block->aggregateGroupBy(*handles_[agg_idx],
- arguments_[agg_idx],
- group_by_list_,
- predicate_.get(),
- agg_hash_table,
- &reuse_matches,
- &reuse_group_by_vectors);
- group_by_hashtable_pools_[agg_idx]->returnHashTable(agg_hash_table);
}
}
+
+ // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+ // directly into the (threadsafe) shared global HashTable for this
+ // aggregate.
+ DCHECK(group_by_hashtable_pool_ != nullptr);
+ AggregationStateHashTableBase *agg_hash_table =
+ group_by_hashtable_pool_->getHashTableFast();
+ DCHECK(agg_hash_table != nullptr);
+ block->aggregateGroupBy(arguments_,
+ group_by_list_,
+ predicate_.get(),
+ agg_hash_table,
+ &reuse_matches,
+ &reuse_group_by_vectors);
+ group_by_hashtable_pool_->returnHashTable(agg_hash_table);
}
-void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) {
+void AggregationOperationState::finalizeSingleState(
+ InsertDestination *output_destination) {
// Simply build up a Tuple from the finalized values for each aggregate and
// insert it in '*output_destination'.
std::vector<TypedValue> attribute_values;
- for (std::size_t agg_idx = 0;
- agg_idx < handles_.size();
- ++agg_idx) {
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
single_states_[agg_idx].reset(
- handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(*distinctify_hashtables_[agg_idx]));
+ handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(
+ *distinctify_hashtables_[agg_idx]));
}
- attribute_values.emplace_back(handles_[agg_idx]->finalize(*single_states_[agg_idx]));
+ attribute_values.emplace_back(
+ handles_[agg_idx]->finalize(*single_states_[agg_idx]));
}
output_destination->insertTuple(Tuple(std::move(attribute_values)));
}
-void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) {
+void AggregationOperationState::mergeGroupByHashTables(
+ AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
+ HashTableMergerFast merger(dst);
+ (static_cast<FastHashTable<true, false, true, false> *>(src))
+ ->forEachCompositeKeyFast(&merger);
+}
+
+void AggregationOperationState::finalizeHashTable(
+ InsertDestination *output_destination) {
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
@@ -455,60 +494,57 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
// TODO(harshad) - Find heuristics for faster merge, even in a single thread.
// e.g. Keep merging entries from smaller hash tables to larger.
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
- if (hash_tables->size() > 1) {
- for (int hash_table_index = 0;
- hash_table_index < static_cast<int>(hash_tables->size() - 1);
- ++hash_table_index) {
- // Merge each hash table to the last hash table.
- handles_[agg_idx]->mergeGroupByHashTables(
- (*(*hash_tables)[hash_table_index]),
- hash_tables->back().get());
- }
+
+ auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+ if (hash_tables->size() > 1) {
+ for (int hash_table_index = 0;
+ hash_table_index < static_cast<int>(hash_tables->size() - 1);
+ ++hash_table_index) {
+ // Merge each hash table to the last hash table.
+ mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
+ hash_tables->back().get());
}
}
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
- for (std::size_t agg_idx = 0;
- agg_idx < handles_.size();
- ++agg_idx) {
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
- DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
- auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ DCHECK(group_by_hashtable_pool_ != nullptr);
+ auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
DCHECK(hash_tables != nullptr);
if (hash_tables->empty()) {
// We may have a case where hash_tables is empty, e.g. no input blocks.
// However for aggregateOnDistinctifyHashTableForGroupBy to work
// correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
- group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ AggregationStateHashTableBase *new_hash_table =
+ group_by_hashtable_pool_->getHashTableFast();
+ group_by_hashtable_pool_->returnHashTable(new_hash_table);
+ hash_tables = group_by_hashtable_pool_->getAllHashTables();
}
DCHECK(hash_tables->back() != nullptr);
AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
DCHECK(agg_hash_table != nullptr);
+ handles_[agg_idx]->allowUpdate();
handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
- *distinctify_hashtables_[agg_idx],
- agg_hash_table);
+ *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
}
- auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
DCHECK(hash_tables != nullptr);
if (hash_tables->empty()) {
// We may have a case where hash_tables is empty, e.g. no input blocks.
// However for aggregateOnDistinctifyHashTableForGroupBy to work
// correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
- group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ AggregationStateHashTableBase *new_hash_table =
+ group_by_hashtable_pool_->getHashTable();
+ group_by_hashtable_pool_->returnHashTable(new_hash_table);
+ hash_tables = group_by_hashtable_pool_->getAllHashTables();
}
AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
DCHECK(agg_hash_table != nullptr);
- ColumnVector* agg_result_col =
- handles_[agg_idx]->finalizeHashTable(*agg_hash_table,
- &group_by_keys);
+ ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
+ *agg_hash_table, &group_by_keys, agg_idx);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
}
@@ -526,16 +562,20 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
const Type &group_by_type = group_by_element->getType();
if (NativeColumnVector::UsableForType(group_by_type)) {
- NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+ NativeColumnVector *element_cv =
+ new NativeColumnVector(group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(
+ std::move(group_key[group_by_element_idx]));
}
} else {
- IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+ IndirectColumnVector *element_cv =
+ new IndirectColumnVector(group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(
+ std::move(group_key[group_by_element_idx]));
}
}
++group_by_element_idx;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index ecd116b..cbbfc22 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -102,16 +102,17 @@ class AggregationOperationState {
* tables. Single aggregation state (when GROUP BY list is not
* specified) is not allocated using memory from storage manager.
*/
- AggregationOperationState(const CatalogRelationSchema &input_relation,
- const std::vector<const AggregateFunction*> &aggregate_functions,
- std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
- std::vector<bool> &&is_distinct,
- std::vector<std::unique_ptr<const Scalar>> &&group_by,
- const Predicate *predicate,
- const std::size_t estimated_num_entries,
- const HashTableImplType hash_table_impl_type,
- const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
- StorageManager *storage_manager);
+ AggregationOperationState(
+ const CatalogRelationSchema &input_relation,
+ const std::vector<const AggregateFunction *> &aggregate_functions,
+ std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
+ std::vector<bool> &&is_distinct,
+ std::vector<std::unique_ptr<const Scalar>> &&group_by,
+ const Predicate *predicate,
+ const std::size_t estimated_num_entries,
+ const HashTableImplType hash_table_impl_type,
+ const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
+ StorageManager *storage_manager);
~AggregationOperationState() {}
@@ -143,8 +144,9 @@ class AggregationOperationState {
* in.
* @return Whether proto is fully-formed and valid.
**/
- static bool ProtoIsValid(const serialization::AggregationOperationState &proto,
- const CatalogDatabaseLite &database);
+ static bool ProtoIsValid(
+ const serialization::AggregationOperationState &proto,
+ const CatalogDatabaseLite &database);
/**
* @brief Compute aggregates on the tuples of the given storage block,
@@ -165,10 +167,16 @@ class AggregationOperationState {
**/
void finalizeAggregate(InsertDestination *output_destination);
+ static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
+ AggregationStateHashTableBase *dst);
+
+ int dflag;
+
private:
// Merge locally (per storage block) aggregated states with global aggregation
// states.
- void mergeSingleState(const std::vector<std::unique_ptr<AggregationState>> &local_state);
+ void mergeSingleState(
+ const std::vector<std::unique_ptr<AggregationState>> &local_state);
// Aggregate on input block.
void aggregateBlockSingleState(const block_id input_block);
@@ -185,7 +193,8 @@ class AggregationOperationState {
// Each individual aggregate in this operation has an AggregationHandle and
// some number of Scalar arguments.
- std::vector<std::unique_ptr<AggregationHandle>> handles_;
+ // std::vector<std::unique_ptr<AggregationHandle>> handles_;
+ std::vector<AggregationHandle *> handles_;
std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
// For each aggregate, whether DISTINCT should be applied to the aggregate's
@@ -193,7 +202,8 @@ class AggregationOperationState {
std::vector<bool> is_distinct_;
// Hash table for obtaining distinct (i.e. unique) arguments.
- std::vector<std::unique_ptr<AggregationStateHashTableBase>> distinctify_hashtables_;
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+ distinctify_hashtables_;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all an aggregate's argument expressions are simply attributes in
@@ -208,10 +218,11 @@ class AggregationOperationState {
//
// TODO(shoban): We should ideally store the aggregation state together in one
// hash table to prevent multiple lookups.
- std::vector<std::unique_ptr<AggregationStateHashTableBase>> group_by_hashtables_;
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+ group_by_hashtables_;
- // A vector of group by hash table pools, one for each group by clause.
- std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_;
+ // A vector of group by hash table pools.
+ std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
StorageManager *storage_manager_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 65a7975..f05cc46 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -198,6 +198,9 @@ if (ENABLE_DISTRIBUTED)
endif()
add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
+add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp)
+add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp)
+add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp)
add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
@@ -626,6 +629,53 @@ target_link_libraries(quickstep_storage_EvictionPolicy
quickstep_threading_SpinMutex
quickstep_threading_SpinSharedMutex
quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastHashTable
+ quickstep_catalog_CatalogTypedefs
+ quickstep_storage_HashTableBase
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_storage_TupleReference
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorUtil
+ quickstep_threading_SpinMutex
+ quickstep_threading_SpinSharedMutex
+ quickstep_types_Type
+ quickstep_types_TypedValue
+ quickstep_utility_BloomFilter
+ quickstep_utility_HashPair
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastHashTableFactory
+ glog
+ quickstep_storage_FastHashTable
+ quickstep_storage_FastSeparateChainingHashTable
+ quickstep_storage_HashTable
+ quickstep_storage_HashTable_proto
+ quickstep_storage_HashTableBase
+ quickstep_storage_HashTableFactory
+ quickstep_storage_LinearOpenAddressingHashTable
+ quickstep_storage_SeparateChainingHashTable
+ quickstep_storage_SimpleScalarSeparateChainingHashTable
+ quickstep_storage_TupleReference
+ quickstep_types_TypeFactory
+ quickstep_utility_BloomFilter
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
+ quickstep_storage_FastHashTable
+ quickstep_storage_HashTable
+ quickstep_storage_HashTableBase
+ quickstep_storage_HashTableKeyManager
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_threading_SpinSharedMutex
+ quickstep_types_Type
+ quickstep_types_TypedValue
+ quickstep_utility_Alignment
+ quickstep_utility_Macros
+ quickstep_utility_PrimeNumber)
target_link_libraries(quickstep_storage_FileManager
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
@@ -711,6 +761,8 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
target_link_libraries(quickstep_storage_HashTablePool
glog
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
+ quickstep_storage_FastHashTableFactory
quickstep_storage_HashTableBase
quickstep_threading_SpinMutex
quickstep_utility_Macros
@@ -1098,6 +1150,9 @@ target_link_libraries(quickstep_storage
quickstep_storage_EvictionPolicy
quickstep_storage_FileManager
quickstep_storage_FileManagerLocal
+ quickstep_storage_FastHashTable
+ quickstep_storage_FastHashTableFactory
+ quickstep_storage_FastSeparateChainingHashTable
quickstep_storage_HashTable
quickstep_storage_HashTable_proto
quickstep_storage_HashTableBase