You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/31 23:20:05 UTC
[11/13] incubator-quickstep git commit: Initial commit.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index b942c1b..5de2653 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -39,15 +39,17 @@
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "storage/AggregationOperationState.pb.h"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableBase.hpp"
#include "storage/HashTableFactory.hpp"
+#include "storage/HashTableBase.hpp"
#include "storage/InsertDestination.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
#include "storage/TupleIdSequence.hpp"
#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
@@ -80,50 +82,63 @@ AggregationOperationState::AggregationOperationState(
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
StorageManager *storage_manager)
: input_relation_(input_relation),
- is_aggregate_partitioned_(checkAggregatePartitioned(
- estimated_num_entries, is_distinct, group_by, aggregate_functions)),
+ is_aggregate_collision_free_(false),
+ is_aggregate_partitioned_(false),
predicate_(predicate),
- group_by_list_(std::move(group_by)),
- arguments_(std::move(arguments)),
is_distinct_(std::move(is_distinct)),
storage_manager_(storage_manager) {
+ if (!group_by.empty()) {
+ if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
+ is_aggregate_collision_free_ = true;
+ } else {
+ is_aggregate_partitioned_ = checkAggregatePartitioned(
+ estimated_num_entries, is_distinct_, group_by, aggregate_functions);
+ }
+ }
+
// Sanity checks: each aggregate has a corresponding list of arguments.
- DCHECK(aggregate_functions.size() == arguments_.size());
+ DCHECK(aggregate_functions.size() == arguments.size());
// Get the types of GROUP BY expressions for creating HashTables below.
- std::vector<const Type *> group_by_types;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- group_by_types.emplace_back(&group_by_element->getType());
+ for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ group_by_types_.emplace_back(&group_by_element->getType());
}
- std::vector<AggregationHandle *> group_by_handles;
- group_by_handles.clear();
+ // Prepare group-by element attribute ids and non-trivial expressions.
+ for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ const attribute_id attr_id =
+ group_by_element->getAttributeIdForValueAccessor();
+ if (attr_id == kInvalidAttributeID) {
+ const attribute_id non_trivial_attr_id =
+ -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
+ non_trivial_expressions_.emplace_back(group_by_element.release());
+ group_by_key_ids_.emplace_back(non_trivial_attr_id);
+ } else {
+ group_by_key_ids_.emplace_back(attr_id);
+ }
+ }
if (aggregate_functions.size() == 0) {
// If there is no aggregation function, then it is a distinctify operation
// on the group-by expressions.
- DCHECK_GT(group_by_list_.size(), 0u);
+ DCHECK_GT(group_by_key_ids_.size(), 0u);
handles_.emplace_back(new AggregationHandleDistinct());
- arguments_.push_back({});
is_distinct_.emplace_back(false);
group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
hash_table_impl_type,
- group_by_types,
- {1},
- handles_,
+ group_by_types_,
+ {handles_.front().get()},
storage_manager));
} else {
+ std::vector<AggregationHandle *> group_by_handles;
+
// Set up each individual aggregate in this operation.
std::vector<const AggregateFunction *>::const_iterator agg_func_it =
aggregate_functions.begin();
- std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
- args_it = arguments_.begin();
+ std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
+ args_it = arguments.begin();
std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
- std::vector<HashTableImplType>::const_iterator
- distinctify_hash_table_impl_types_it =
- distinctify_hash_table_impl_types.begin();
- std::vector<std::size_t> payload_sizes;
for (; agg_func_it != aggregate_functions.end();
++agg_func_it, ++args_it, ++is_distinct_it) {
// Get the Types of this aggregate's arguments so that we can create an
@@ -133,6 +148,22 @@ AggregationOperationState::AggregationOperationState(
argument_types.emplace_back(&argument->getType());
}
+ // Prepare argument attribute ids and non-trivial expressions.
+ std::vector<attribute_id> argument_ids;
+ for (std::unique_ptr<const Scalar> &argument : *args_it) {
+ const attribute_id attr_id =
+ argument->getAttributeIdForValueAccessor();
+ if (attr_id == kInvalidAttributeID) {
+ const attribute_id non_trivial_attr_id =
+ -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
+ non_trivial_expressions_.emplace_back(argument.release());
+ argument_ids.emplace_back(non_trivial_attr_id);
+ } else {
+ argument_ids.emplace_back(attr_id);
+ }
+ }
+ argument_ids_.emplace_back(std::move(argument_ids));
+
// Sanity checks: aggregate function exists and can apply to the specified
// arguments.
DCHECK(*agg_func_it != nullptr);
@@ -142,85 +173,43 @@ AggregationOperationState::AggregationOperationState(
// to do actual aggregate computation.
handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
- if (!group_by_list_.empty()) {
+ if (!group_by_key_ids_.empty()) {
// Aggregation with GROUP BY: combined payload is partially updated in
// the presence of DISTINCT.
if (*is_distinct_it) {
- handles_.back()->blockUpdate();
+ LOG(FATAL) << "Distinct aggregation not supported";
}
- group_by_handles.emplace_back(handles_.back());
- payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
+ group_by_handles.emplace_back(handles_.back().get());
} else {
// Aggregation without GROUP BY: create a single global state.
single_states_.emplace_back(handles_.back()->createInitialState());
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // See if all of this aggregate's arguments are attributes in the input
- // relation. If so, remember the attribute IDs so that we can do copy
- // elision when actually performing the aggregation.
- std::vector<attribute_id> local_arguments_as_attributes;
- local_arguments_as_attributes.reserve(args_it->size());
- for (const std::unique_ptr<const Scalar> &argument : *args_it) {
- const attribute_id argument_id =
- argument->getAttributeIdForValueAccessor();
- if (argument_id == -1) {
- local_arguments_as_attributes.clear();
- break;
- } else {
- DCHECK_EQ(input_relation_.getID(),
- argument->getRelationIdForValueAccessor());
- local_arguments_as_attributes.push_back(argument_id);
- }
- }
-
- arguments_as_attributes_.emplace_back(
- std::move(local_arguments_as_attributes));
-#endif
}
+ }
- // Initialize the corresponding distinctify hash table if this is a
- // DISTINCT aggregation.
- if (*is_distinct_it) {
- std::vector<const Type *> key_types(group_by_types);
- key_types.insert(
- key_types.end(), argument_types.begin(), argument_types.end());
- // TODO(jianqiao): estimated_num_entries is quite inaccurate for
- // estimating the number of entries in the distinctify hash table.
- // We may estimate for each distinct aggregation an
- // estimated_num_distinct_keys value during query optimization, if it's
- // worth.
- distinctify_hashtables_.emplace_back(
- AggregationStateFastHashTableFactory::CreateResizable(
- *distinctify_hash_table_impl_types_it,
- key_types,
+ // Aggregation with GROUP BY: create a HashTable pool.
+ if (!group_by_key_ids_.empty()) {
+ if (is_aggregate_collision_free_) {
+ collision_free_hashtable_.reset(
+ AggregationStateHashTableFactory::CreateResizable(
+ hash_table_impl_type,
+ group_by_types_,
estimated_num_entries,
- {0},
- {},
+ group_by_handles,
storage_manager));
- ++distinctify_hash_table_impl_types_it;
- } else {
- distinctify_hashtables_.emplace_back(nullptr);
- }
- }
-
- if (!group_by_handles.empty()) {
- // Aggregation with GROUP BY: create a HashTable pool.
- if (!is_aggregate_partitioned_) {
- group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- payload_sizes,
- group_by_handles,
- storage_manager));
- } else {
+ } else if (is_aggregate_partitioned_) {
partitioned_group_by_hashtable_pool_.reset(
new PartitionedHashTablePool(estimated_num_entries,
FLAGS_num_aggregation_partitions,
hash_table_impl_type,
- group_by_types,
- payload_sizes,
+ group_by_types_,
group_by_handles,
storage_manager));
+ } else {
+ group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types_,
+ group_by_handles,
+ storage_manager));
}
}
}
@@ -269,7 +258,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
proto.group_by_expressions(group_by_idx), database));
}
- unique_ptr<Predicate> predicate;
+ std::unique_ptr<Predicate> predicate;
if (proto.has_predicate()) {
predicate.reset(
PredicateFactory::ReconstructFromProto(proto.predicate(), database));
@@ -353,33 +342,72 @@ bool AggregationOperationState::ProtoIsValid(
return true;
}
-void AggregationOperationState::aggregateBlock(const block_id input_block,
- LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
- if (group_by_list_.empty()) {
- aggregateBlockSingleState(input_block);
+std::size_t AggregationOperationState::getNumPartitions() const {
+ if (is_aggregate_collision_free_) {
+ return static_cast<CollisionFreeAggregationStateHashTable *>(
+ collision_free_hashtable_.get())->getNumFinalizationPartitions();
+ } else if (is_aggregate_partitioned_) {
+ return partitioned_group_by_hashtable_pool_->getNumPartitions();
+ } else {
+ return 1u;
+ }
+}
+
+std::size_t AggregationOperationState::getNumInitializationPartitions() const {
+ if (is_aggregate_collision_free_) {
+ return static_cast<CollisionFreeAggregationStateHashTable *>(
+ collision_free_hashtable_.get())->getNumInitializationPartitions();
} else {
- aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
+ return 0u;
}
}
-void AggregationOperationState::finalizeAggregate(
- InsertDestination *output_destination) {
- if (group_by_list_.empty()) {
- finalizeSingleState(output_destination);
+void AggregationOperationState::initializeState(const std::size_t partition_id) {
+ if (is_aggregate_collision_free_) {
+ static_cast<CollisionFreeAggregationStateHashTable *>(
+ collision_free_hashtable_.get())->initialize(partition_id);
} else {
- finalizeHashTable(output_destination);
+ LOG(FATAL) << "AggregationOperationState::initializeState() "
+ << "is not supported by this aggregation";
}
}
-void AggregationOperationState::mergeSingleState(
- const std::vector<std::unique_ptr<AggregationState>> &local_state) {
- DEBUG_ASSERT(local_state.size() == single_states_.size());
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (!is_distinct_[agg_idx]) {
- handles_[agg_idx]->mergeStates(*local_state[agg_idx],
- single_states_[agg_idx].get());
+bool AggregationOperationState::checkAggregatePartitioned(
+ const std::size_t estimated_num_groups,
+ const std::vector<bool> &is_distinct,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const std::vector<const AggregateFunction *> &aggregate_functions) const {
+ // If there's no aggregation, return false.
+ if (aggregate_functions.empty()) {
+ return false;
+ }
+ // Check if there's a distinct operation involved in any aggregate, if so
+ // the aggregate can't be partitioned.
+ for (auto distinct : is_distinct) {
+ if (distinct) {
+ return false;
}
}
+ // There's no distinct aggregation involved, Check if there's at least one
+ // GROUP BY operation.
+ if (group_by.empty()) {
+ return false;
+ }
+ // There are GROUP BYs without DISTINCT. Check if the estimated number of
+ // groups is large enough to warrant a partitioned aggregation.
+ return estimated_num_groups >
+ static_cast<std::size_t>(
+ FLAGS_partition_aggregation_num_groups_threshold);
+ return false;
+}
+
+void AggregationOperationState::aggregateBlock(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
+ if (group_by_key_ids_.empty()) {
+ aggregateBlockSingleState(input_block);
+ } else {
+ aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
+ }
}
void AggregationOperationState::aggregateBlockSingleState(
@@ -392,114 +420,137 @@ void AggregationOperationState::aggregateBlockSingleState(
std::unique_ptr<TupleIdSequence> matches;
if (predicate_ != nullptr) {
- std::unique_ptr<ValueAccessor> accessor(
- block->getTupleStorageSubBlock().createValueAccessor());
matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
}
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all arguments are attributes of the input relation, elide a copy.
- if (!arguments_as_attributes_[agg_idx].empty()) {
- local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
+ const auto &tuple_store = block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> accessor(
+ tuple_store.createValueAccessor(matches.get()));
+
+ ColumnVectorsValueAccessor non_trivial_results;
+ if (!non_trivial_expressions_.empty()) {
+ SubBlocksReference sub_blocks_ref(tuple_store,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ for (const auto &expression : non_trivial_expressions_) {
+ non_trivial_results.addColumn(
+ expression->getAllValues(accessor.get(), &sub_blocks_ref));
}
-#endif
- if (is_distinct_[agg_idx]) {
- // Call StorageBlock::aggregateDistinct() to put the arguments as keys
- // directly into the (threadsafe) shared global distinctify HashTable
- // for this aggregate.
- block->aggregateDistinct(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- {}, /* group_by */
- matches.get(),
- distinctify_hashtables_[agg_idx].get(),
- nullptr /* reuse_group_by_vectors */);
- local_state.emplace_back(nullptr);
+ }
+
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+ const auto &argument_ids = argument_ids_[agg_idx];
+ const auto &handle = handles_[agg_idx];
+
+ AggregationState *state;
+ if (argument_ids.empty()) {
+ // Special case. This is a nullary aggregate (i.e. COUNT(*)).
+ state = handle->accumulateNullary(matches == nullptr ? tuple_store.numTuples()
+ : matches->size());
} else {
- // Call StorageBlock::aggregate() to actually do the aggregation.
- local_state.emplace_back(block->aggregate(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- matches.get()));
+ // Have the AggregationHandle actually do the aggregation.
+ state = handle->accumulate(accessor.get(), &non_trivial_results, argument_ids);
}
+ local_state.emplace_back(state);
}
// Merge per-block aggregation states back with global state.
mergeSingleState(local_state);
}
+void AggregationOperationState::mergeSingleState(
+ const std::vector<std::unique_ptr<AggregationState>> &local_state) {
+ DEBUG_ASSERT(local_state.size() == single_states_.size());
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+ if (!is_distinct_[agg_idx]) {
+ handles_[agg_idx]->mergeStates(*local_state[agg_idx],
+ single_states_[agg_idx].get());
+ }
+ }
+}
+
+void AggregationOperationState::mergeGroupByHashTables(
+ AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) const {
+ HashTableMergerFast merger(dst);
+ static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src)
+ ->forEach(&merger);
+}
+
void AggregationOperationState::aggregateBlockHashTable(
const block_id input_block,
LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
BlockReference block(
storage_manager_->getBlock(input_block, input_relation_));
+ const auto &tuple_store = block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor());
+ std::unique_ptr<ValueAccessor> shared_accessor;
+ ValueAccessor *accessor = base_accessor.get();
// Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
// as the existence map for the tuples.
std::unique_ptr<TupleIdSequence> matches;
if (predicate_ != nullptr) {
matches.reset(block->getMatchesForPredicate(predicate_.get()));
+ shared_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+ accessor = shared_accessor.get();
}
if (lip_filter_adaptive_prober != nullptr) {
- std::unique_ptr<ValueAccessor> accessor(
- block->getTupleStorageSubBlock().createValueAccessor(matches.get()));
- matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
+ matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor));
+ shared_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+ accessor = shared_accessor.get();
}
- // This holds values of all the GROUP BY attributes so that the can be reused
- // across multiple aggregates (i.e. we only pay the cost of evaluatin the
- // GROUP BY expressions once).
- std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
-
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (is_distinct_[agg_idx]) {
- // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
- // expression
- // values and the aggregation arguments together as keys directly into the
- // (threadsafe) shared global distinctify HashTable for this aggregate.
- block->aggregateDistinct(*handles_[agg_idx],
- arguments_[agg_idx],
- nullptr, /* arguments_as_attributes */
- group_by_list_,
- matches.get(),
- distinctify_hashtables_[agg_idx].get(),
- &reuse_group_by_vectors);
+ std::unique_ptr<ColumnVectorsValueAccessor> non_trivial_results;
+ if (!non_trivial_expressions_.empty()) {
+ non_trivial_results.reset(new ColumnVectorsValueAccessor());
+ SubBlocksReference sub_blocks_ref(tuple_store,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ for (const auto &expression : non_trivial_expressions_) {
+ non_trivial_results->addColumn(
+ expression->getAllValues(accessor, &sub_blocks_ref));
}
}
- if (!is_aggregate_partitioned_) {
- // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
- // directly into the (threadsafe) shared global HashTable for this
- // aggregate.
- DCHECK(group_by_hashtable_pool_ != nullptr);
- AggregationStateHashTableBase *agg_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- DCHECK(agg_hash_table != nullptr);
- block->aggregateGroupBy(arguments_,
- group_by_list_,
- matches.get(),
- agg_hash_table,
- &reuse_group_by_vectors);
- group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+ accessor->beginIterationVirtual();
+
+ // TODO
+ if (is_aggregate_collision_free_) {
+ aggregateBlockHashTableImplCollisionFree(
+ accessor, non_trivial_results.get());
+ } else if (is_aggregate_partitioned_) {
+ aggregateBlockHashTableImplPartitioned(
+ accessor, non_trivial_results.get());
} else {
- ColumnVectorsValueAccessor temp_result;
- // IDs of 'arguments' as attributes in the ValueAccessor we create below.
- std::vector<attribute_id> argument_ids;
+ aggregateBlockHashTableImplThreadPrivate(
+ accessor, non_trivial_results.get());
+ }
+}
- // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
- std::vector<attribute_id> key_ids;
+void AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor) {
+ DCHECK(collision_free_hashtable_ != nullptr);
+
+ collision_free_hashtable_->upsertValueAccessor(argument_ids_,
+ group_by_key_ids_,
+ accessor,
+ aux_accessor);
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor) {
+ DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
+
+ InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ // TODO(jianqiao): handle the situation when keys in non_trivial_results
const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
- block->aggregateGroupByPartitioned(
- arguments_,
- group_by_list_,
- matches.get(),
- num_partitions,
- &temp_result,
- &argument_ids,
- &key_ids,
- &reuse_group_by_vectors);
+
// Compute the partitions for the tuple formed by group by values.
std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
partition_membership.resize(num_partitions);
@@ -507,32 +558,57 @@ void AggregationOperationState::aggregateBlockHashTable(
// Create a tuple-id sequence for each partition.
for (std::size_t partition = 0; partition < num_partitions; ++partition) {
partition_membership[partition].reset(
- new TupleIdSequence(temp_result.getEndPosition()));
+ new TupleIdSequence(accessor->getEndPosition()));
}
// Iterate over ValueAccessor for each tuple,
// set a bit in the appropriate TupleIdSequence.
- temp_result.beginIteration();
- while (temp_result.next()) {
+ while (accessor->next()) {
// We need a unique_ptr because getTupleWithAttributes() uses "new".
- std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
+ std::unique_ptr<Tuple> curr_tuple(
+ accessor->getTupleWithAttributes(group_by_key_ids_));
const std::size_t curr_tuple_partition_id =
curr_tuple->getTupleHash() % num_partitions;
partition_membership[curr_tuple_partition_id]->set(
- temp_result.getCurrentPosition(), true);
+ accessor->getCurrentPosition(), true);
}
- // For each partition, create an adapter around Value Accessor and
- // TupleIdSequence.
- std::vector<std::unique_ptr<
- TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
- adapter.resize(num_partitions);
+ // Aggregate each partition.
for (std::size_t partition = 0; partition < num_partitions; ++partition) {
- adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
- *(partition_membership)[partition]));
+ std::unique_ptr<ValueAccessor> adapter(
+ accessor->createSharedTupleIdSequenceAdapter(
+ *(partition_membership)[partition]));
partitioned_group_by_hashtable_pool_->getHashTable(partition)
- ->upsertValueAccessorCompositeKeyFast(
- argument_ids, adapter[partition].get(), key_ids, true);
+ ->upsertValueAccessor(argument_ids_,
+ group_by_key_ids_,
+ adapter.get(),
+ aux_accessor);
}
+ });
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor) {
+ DCHECK(group_by_hashtable_pool_ != nullptr);
+
+ AggregationStateHashTableBase *agg_hash_table =
+ group_by_hashtable_pool_->getHashTable();
+
+ agg_hash_table->upsertValueAccessor(argument_ids_,
+ group_by_key_ids_,
+ accessor,
+ aux_accessor);
+ group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+}
+
+void AggregationOperationState::finalizeAggregate(
+ const std::size_t partition_id,
+ InsertDestination *output_destination) {
+ if (group_by_key_ids_.empty()) {
+ DCHECK_EQ(0u, partition_id);
+ finalizeSingleState(output_destination);
+ } else {
+ finalizeHashTable(partition_id, output_destination);
}
}
@@ -543,12 +619,6 @@ void AggregationOperationState::finalizeSingleState(
std::vector<TypedValue> attribute_values;
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (is_distinct_[agg_idx]) {
- single_states_[agg_idx].reset(
- handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(
- *distinctify_hashtables_[agg_idx]));
- }
-
attribute_values.emplace_back(
handles_[agg_idx]->finalize(*single_states_[agg_idx]));
}
@@ -556,80 +626,79 @@ void AggregationOperationState::finalizeSingleState(
output_destination->insertTuple(Tuple(std::move(attribute_values)));
}
-void AggregationOperationState::mergeGroupByHashTables(
- AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
- HashTableMergerFast merger(dst);
- (static_cast<FastHashTable<true, false, true, false> *>(src))
- ->forEachCompositeKeyFast(&merger);
+void AggregationOperationState::finalizeHashTable(
+ const std::size_t partition_id,
+ InsertDestination *output_destination) {
+ if (is_aggregate_collision_free_) {
+ finalizeHashTableImplCollisionFree(partition_id, output_destination);
+ } else if (is_aggregate_partitioned_) {
+ finalizeHashTableImplPartitioned(partition_id, output_destination);
+ } else {
+ DCHECK_EQ(0u, partition_id);
+ finalizeHashTableImplThreadPrivate(output_destination);
+ }
}
-void AggregationOperationState::finalizeHashTable(
+void AggregationOperationState::finalizeHashTableImplCollisionFree(
+ const std::size_t partition_id,
InsertDestination *output_destination) {
- // Each element of 'group_by_keys' is a vector of values for a particular
- // group (which is also the prefix of the finalized Tuple for that group).
- std::vector<std::vector<TypedValue>> group_by_keys;
+ std::vector<std::unique_ptr<ColumnVector>> final_values;
+ CollisionFreeAggregationStateHashTable *hash_table =
+ static_cast<CollisionFreeAggregationStateHashTable *>(
+ collision_free_hashtable_.get());
- // TODO(harshad) - The merge phase may be slower when each hash table contains
- // large number of entries. We should find ways in which we can perform a
- // parallel merge.
+ // TODO
+ const std::size_t max_length =
+ hash_table->getNumTuplesInPartition(partition_id);
+ ColumnVectorsValueAccessor complete_result;
- // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
- // e.g. Keep merging entries from smaller hash tables to larger.
+ DCHECK_EQ(1u, group_by_types_.size());
+ const Type *key_type = group_by_types_.front();
+ DCHECK(NativeColumnVector::UsableForType(*key_type));
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- if (hash_tables->size() > 1) {
- for (int hash_table_index = 0;
- hash_table_index < static_cast<int>(hash_tables->size() - 1);
- ++hash_table_index) {
- // Merge each hash table to the last hash table.
- mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
- hash_tables->back().get());
+ std::unique_ptr<NativeColumnVector> key_cv(
+ new NativeColumnVector(*key_type, max_length));
+ hash_table->finalizeKey(partition_id, key_cv.get());
+ complete_result.addColumn(key_cv.release());
+
+ for (std::size_t i = 0; i < handles_.size(); ++i) {
+ if (handles_[i]->getAggregationID() == AggregationID::kDistinct) {
+ DCHECK_EQ(1u, handles_.size());
+ break;
}
+
+ const Type *result_type = handles_[i]->getResultType();
+ DCHECK(NativeColumnVector::UsableForType(*result_type));
+
+ std::unique_ptr<NativeColumnVector> result_cv(
+ new NativeColumnVector(*result_type, max_length));
+ hash_table->finalizeState(partition_id, i, result_cv.get());
+ complete_result.addColumn(result_cv.release());
}
+ // Bulk-insert the complete result.
+ output_destination->bulkInsertTuples(&complete_result);
+}
+
+void AggregationOperationState::finalizeHashTableImplPartitioned(
+ const std::size_t partition_id,
+ InsertDestination *output_destination) {
+ // Each element of 'group_by_keys' is a vector of values for a particular
+ // group (which is also the prefix of the finalized Tuple for that group).
+ std::vector<std::vector<TypedValue>> group_by_keys;
+
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
+ AggregationStateHashTableBase *hash_table =
+ partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (is_distinct_[agg_idx]) {
- DCHECK(group_by_hashtable_pool_ != nullptr);
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(hash_tables != nullptr);
- if (hash_tables->empty()) {
- // We may have a case where hash_tables is empty, e.g. no input blocks.
- // However for aggregateOnDistinctifyHashTableForGroupBy to work
- // correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- group_by_hashtable_pool_->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- DCHECK(hash_tables->back() != nullptr);
- AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
- DCHECK(agg_hash_table != nullptr);
- handles_[agg_idx]->allowUpdate();
- handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
- *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
- }
-
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(hash_tables != nullptr);
- if (hash_tables->empty()) {
- // We may have a case where hash_tables is empty, e.g. no input blocks.
- // However for aggregateOnDistinctifyHashTableForGroupBy to work
- // correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- group_by_hashtable_pool_->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
- DCHECK(agg_hash_table != nullptr);
ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
- *agg_hash_table, &group_by_keys, agg_idx);
+ *hash_table, &group_by_keys, agg_idx);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
}
}
+ hash_table->destroyPayload();
// Reorganize 'group_by_keys' in column-major order so that we can make a
// ColumnVectorsValueAccessor to bulk-insert results.
@@ -640,23 +709,20 @@ void AggregationOperationState::finalizeHashTable(
// in a single HashTable.
std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
std::size_t group_by_element_idx = 0;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- const Type &group_by_type = group_by_element->getType();
- if (NativeColumnVector::UsableForType(group_by_type)) {
+ for (const Type *group_by_type : group_by_types_) {
+ if (NativeColumnVector::UsableForType(*group_by_type)) {
NativeColumnVector *element_cv =
- new NativeColumnVector(group_by_type, group_by_keys.size());
+ new NativeColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(
- std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
}
} else {
IndirectColumnVector *element_cv =
- new IndirectColumnVector(group_by_type, group_by_keys.size());
+ new IndirectColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(
- std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
}
}
++group_by_element_idx;
@@ -676,42 +742,44 @@ void AggregationOperationState::finalizeHashTable(
output_destination->bulkInsertTuples(&complete_result);
}
-void AggregationOperationState::destroyAggregationHashTablePayload() {
- std::vector<std::unique_ptr<AggregationStateHashTableBase>> *all_hash_tables =
- nullptr;
- if (!is_aggregate_partitioned_) {
- if (group_by_hashtable_pool_ != nullptr) {
- all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- } else {
- if (partitioned_group_by_hashtable_pool_ != nullptr) {
- all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables();
- }
- }
- if (all_hash_tables != nullptr) {
- for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index) {
- (*all_hash_tables)[ht_index]->destroyPayload();
- }
- }
-}
-
-void AggregationOperationState::finalizeAggregatePartitioned(
- const std::size_t partition_id, InsertDestination *output_destination) {
+void AggregationOperationState::finalizeHashTableImplThreadPrivate(
+ InsertDestination *output_destination) {
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
+ // TODO(harshad) - The merge phase may be slower when each hash table contains
+ // large number of entries. We should find ways in which we can perform a
+ // parallel merge.
+
+ // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
+ // e.g. Keep merging entries from smaller hash tables to larger.
+
+ auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+ DCHECK(hash_tables != nullptr);
+ if (hash_tables->empty()) {
+ return;
+ }
+
+ std::unique_ptr<AggregationStateHashTableBase> final_hash_table(
+ hash_tables->back().release());
+ for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+ std::unique_ptr<AggregationStateHashTableBase> hash_table(
+ hash_tables->at(i).release());
+ mergeGroupByHashTables(hash_table.get(), final_hash_table.get());
+ hash_table->destroyPayload();
+ }
+
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- AggregationStateHashTableBase *hash_table =
- partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
- *hash_table, &group_by_keys, agg_idx);
+ *final_hash_table, &group_by_keys, agg_idx);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
}
}
+ final_hash_table->destroyPayload();
// Reorganize 'group_by_keys' in column-major order so that we can make a
// ColumnVectorsValueAccessor to bulk-insert results.
@@ -722,19 +790,22 @@ void AggregationOperationState::finalizeAggregatePartitioned(
// in a single HashTable.
std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
std::size_t group_by_element_idx = 0;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- const Type &group_by_type = group_by_element->getType();
- if (NativeColumnVector::UsableForType(group_by_type)) {
- NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+ for (const Type *group_by_type : group_by_types_) {
+ if (NativeColumnVector::UsableForType(*group_by_type)) {
+ NativeColumnVector *element_cv =
+ new NativeColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(
+ std::move(group_key[group_by_element_idx]));
}
} else {
- IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+ IndirectColumnVector *element_cv =
+ new IndirectColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(
+ std::move(group_key[group_by_element_idx]));
}
}
++group_by_element_idx;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 591e3a1..44803fc 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -33,7 +33,9 @@
#include "storage/HashTableBase.hpp"
#include "storage/HashTablePool.hpp"
#include "storage/PartitionedHashTablePool.hpp"
+#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "utility/ConcurrentBitVector.hpp"
#include "utility/Macros.hpp"
#include "gflags/gflags.h"
@@ -43,9 +45,11 @@ namespace quickstep {
class AggregateFunction;
class CatalogDatabaseLite;
class CatalogRelationSchema;
+class ColumnVectorsValueAccessor;
class InsertDestination;
class LIPFilterAdaptiveProber;
class StorageManager;
+class TupleIdSequence;
DECLARE_int32(num_aggregation_partitions);
DECLARE_int32(partition_aggregation_num_groups_threshold);
@@ -166,127 +170,99 @@ class AggregationOperationState {
* the block.
**/
void aggregateBlock(const block_id input_block,
- LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr);
/**
* @brief Generate the final results for the aggregates managed by this
* AggregationOperationState and write them out to StorageBlock(s).
*
+ * @param partition_id The partition id of this finalize operation.
* @param output_destination An InsertDestination where the finalized output
* tuple(s) from this aggregate are to be written.
**/
- void finalizeAggregate(InsertDestination *output_destination);
-
- /**
- * @brief Destroy the payloads in the aggregation hash tables.
- **/
- void destroyAggregationHashTablePayload();
-
- /**
- * @brief Generate the final results for the aggregates managed by this
- * AggregationOperationState and write them out to StorageBlock(s).
- * In this implementation, each thread picks a hash table belonging to
- * a partition and writes its values to StorageBlock(s). There is no
- * need to merge multiple hash tables in one, because there is no
- * overlap in the keys across two hash tables.
- *
- * @param partition_id The ID of the partition for which finalize is being
- * performed.
- * @param output_destination An InsertDestination where the finalized output
- * tuple(s) from this aggregate are to be written.
- **/
- void finalizeAggregatePartitioned(
- const std::size_t partition_id, InsertDestination *output_destination);
-
- static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
- AggregationStateHashTableBase *dst);
-
- bool isAggregatePartitioned() const {
- return is_aggregate_partitioned_;
- }
+ void finalizeAggregate(const std::size_t partition_id,
+ InsertDestination *output_destination);
/**
* @brief Get the number of partitions to be used for the aggregation.
* For non-partitioned aggregations, we return 1.
**/
- std::size_t getNumPartitions() const {
- return is_aggregate_partitioned_
- ? partitioned_group_by_hashtable_pool_->getNumPartitions()
- : 1;
- }
+ std::size_t getNumPartitions() const;
- int dflag;
+ std::size_t getNumInitializationPartitions() const;
+
+ void initializeState(const std::size_t partition_id);
private:
- // Merge locally (per storage block) aggregated states with global aggregation
- // states.
- void mergeSingleState(
- const std::vector<std::unique_ptr<AggregationState>> &local_state);
+ bool checkAggregatePartitioned(
+ const std::size_t estimated_num_groups,
+ const std::vector<bool> &is_distinct,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const std::vector<const AggregateFunction *> &aggregate_functions) const;
// Aggregate on input block.
void aggregateBlockSingleState(const block_id input_block);
void aggregateBlockHashTable(const block_id input_block,
LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
- void finalizeSingleState(InsertDestination *output_destination);
- void finalizeHashTable(InsertDestination *output_destination);
+ // Merge locally (per storage block) aggregated states with global aggregation
+ // states.
+ void mergeSingleState(
+ const std::vector<std::unique_ptr<AggregationState>> &local_state);
+ void mergeGroupByHashTables(AggregationStateHashTableBase *src,
+ AggregationStateHashTableBase *dst) const;
- bool checkAggregatePartitioned(
- const std::size_t estimated_num_groups,
- const std::vector<bool> &is_distinct,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const std::vector<const AggregateFunction *> &aggregate_functions) const {
- // If there's no aggregation, return false.
- if (aggregate_functions.empty()) {
- return false;
- }
- // Check if there's a distinct operation involved in any aggregate, if so
- // the aggregate can't be partitioned.
- for (auto distinct : is_distinct) {
- if (distinct) {
- return false;
- }
- }
- // There's no distinct aggregation involved, Check if there's at least one
- // GROUP BY operation.
- if (group_by.empty()) {
- return false;
- }
- // There are GROUP BYs without DISTINCT. Check if the estimated number of
- // groups is large enough to warrant a partitioned aggregation.
- return estimated_num_groups >
- static_cast<std::size_t>(
- FLAGS_partition_aggregation_num_groups_threshold);
- }
+ // Finalize the aggregation results into output_destination.
+ void finalizeSingleState(InsertDestination *output_destination);
+ void finalizeHashTable(const std::size_t partition_id,
+ InsertDestination *output_destination);
+
+ // Specialized implementations for aggregateBlockHashTable.
+ void aggregateBlockHashTableImplCollisionFree(ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor);
+ void aggregateBlockHashTableImplPartitioned(ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor);
+ void aggregateBlockHashTableImplThreadPrivate(ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor);
+
+ // Specialized implementations for finalizeHashTable.
+ void finalizeHashTableImplCollisionFree(const std::size_t partition_id,
+ InsertDestination *output_destination);
+ void finalizeHashTableImplPartitioned(const std::size_t partition_id,
+ InsertDestination *output_destination);
+ void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
// Common state for all aggregates in this operation: the input relation, the
// filter predicate (if any), and the list of GROUP BY expressions (if any).
const CatalogRelationSchema &input_relation_;
+ // Whether the aggregation is collision free or not.
+ bool is_aggregate_collision_free_;
+
// Whether the aggregation is partitioned or not.
- const bool is_aggregate_partitioned_;
+ bool is_aggregate_partitioned_;
std::unique_ptr<const Predicate> predicate_;
- std::vector<std::unique_ptr<const Scalar>> group_by_list_;
// Each individual aggregate in this operation has an AggregationHandle and
- // some number of Scalar arguments.
- std::vector<AggregationHandle *> handles_;
- std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
+ // zero (indicated by -1) or one argument.
+ std::vector<std::unique_ptr<AggregationHandle>> handles_;
// For each aggregate, whether DISTINCT should be applied to the aggregate's
// arguments.
std::vector<bool> is_distinct_;
- // Hash table for obtaining distinct (i.e. unique) arguments.
- std::vector<std::unique_ptr<AggregationStateHashTableBase>>
- distinctify_hashtables_;
+ // Non-trivial group-by/argument expressions that need to be evaluated.
+ std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all an aggregate's argument expressions are simply attributes in
- // 'input_relation_', then this caches the attribute IDs of those arguments.
- std::vector<std::vector<attribute_id>> arguments_as_attributes_;
-#endif
+ std::vector<attribute_id> group_by_key_ids_;
+ std::vector<std::vector<attribute_id>> argument_ids_;
+
+ std::vector<const Type *> group_by_types_;
+
+ // Hash table for obtaining distinct (i.e. unique) arguments.
+// std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+// distinctify_hashtables_;
// Per-aggregate global states for aggregation without GROUP BY.
std::vector<std::unique_ptr<AggregationState>> single_states_;
@@ -303,6 +279,8 @@ class AggregationOperationState {
std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
+ std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_;
+
StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index fddea1f..c7bc28f 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -165,6 +165,9 @@ if(QUICKSTEP_HAVE_BITWEAVING)
bitweaving/BitWeavingVIndexSubBlock.hpp)
endif()
# CMAKE_VALIDATE_IGNORE_END
+add_library(quickstep_storage_CollisionFreeAggregationStateHashTable
+ CollisionFreeAggregationStateHashTable.cpp
+ CollisionFreeAggregationStateHashTable.hpp)
add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp)
add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp)
add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -194,9 +197,6 @@ if (ENABLE_DISTRIBUTED)
endif()
add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
-add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp)
-add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp)
-add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp)
add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
@@ -225,6 +225,9 @@ add_library(quickstep_storage_InsertDestination_proto
add_library(quickstep_storage_LinearOpenAddressingHashTable
../empty_src.cpp
LinearOpenAddressingHashTable.hpp)
+add_library(quickstep_storage_PackedPayloadAggregationStateHashTable
+ PackedPayloadAggregationStateHashTable.cpp
+ PackedPayloadAggregationStateHashTable.hpp)
add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
@@ -276,22 +279,25 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_storage_AggregationOperationState_proto
- quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
quickstep_storage_HashTablePool
quickstep_storage_InsertDestination
quickstep_storage_PartitionedHashTablePool
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_storage_StorageBlock
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
+ quickstep_storage_SubBlocksReference
quickstep_storage_TupleIdSequence
quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorUtil
quickstep_types_TypedValue
quickstep_types_containers_ColumnVector
quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_types_containers_Tuple
quickstep_utility_Macros
+ quickstep_utility_ConcurrentBitVector
quickstep_utility_lipfilter_LIPFilterAdaptiveProber)
target_link_libraries(quickstep_storage_AggregationOperationState_proto
quickstep_expressions_Expressions_proto
@@ -429,6 +435,24 @@ if(QUICKSTEP_HAVE_BITWEAVING)
quickstep_utility_Macros)
endif()
# CMAKE_VALIDATE_IGNORE_END
+target_link_libraries(quickstep_storage_CollisionFreeAggregationStateHashTable
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_aggregation_AggregationHandle
+ quickstep_expressions_aggregation_AggregationID
+ quickstep_storage_HashTableBase
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorUtil
+ quickstep_types_Type
+ quickstep_types_TypeID
+ quickstep_types_TypedValue
+ quickstep_types_containers_ColumnVector
+ quickstep_types_containers_ColumnVectorsValueAccessor
+ quickstep_utility_ConcurrentBitVector
+ quickstep_utility_Macros)
target_link_libraries(quickstep_storage_ColumnStoreUtil
quickstep_catalog_CatalogAttribute
quickstep_catalog_CatalogRelationSchema
@@ -626,52 +650,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy
quickstep_threading_SpinMutex
quickstep_threading_SpinSharedMutex
quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTable
- quickstep_catalog_CatalogTypedefs
- quickstep_storage_HashTableBase
- quickstep_storage_StorageBlob
- quickstep_storage_StorageBlockInfo
- quickstep_storage_StorageConstants
- quickstep_storage_StorageManager
- quickstep_storage_TupleReference
- quickstep_storage_ValueAccessor
- quickstep_storage_ValueAccessorUtil
- quickstep_threading_SpinMutex
- quickstep_threading_SpinSharedMutex
- quickstep_types_Type
- quickstep_types_TypedValue
- quickstep_utility_HashPair
- quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTableFactory
- glog
- quickstep_storage_FastHashTable
- quickstep_storage_FastSeparateChainingHashTable
- quickstep_storage_HashTable
- quickstep_storage_HashTable_proto
- quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
- quickstep_storage_LinearOpenAddressingHashTable
- quickstep_storage_SeparateChainingHashTable
- quickstep_storage_SimpleScalarSeparateChainingHashTable
- quickstep_storage_TupleReference
- quickstep_types_TypeFactory
- quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
- quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
- quickstep_storage_HashTableBase
- quickstep_storage_HashTableKeyManager
- quickstep_storage_StorageBlob
- quickstep_storage_StorageBlockInfo
- quickstep_storage_StorageConstants
- quickstep_storage_StorageManager
- quickstep_threading_SpinSharedMutex
- quickstep_types_Type
- quickstep_types_TypedValue
- quickstep_utility_Alignment
- quickstep_utility_Macros
- quickstep_utility_PrimeNumber)
target_link_libraries(quickstep_storage_FileManager
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
@@ -734,10 +712,12 @@ target_link_libraries(quickstep_storage_HashTable_proto
${PROTOBUF_LIBRARY})
target_link_libraries(quickstep_storage_HashTableFactory
glog
+ quickstep_storage_CollisionFreeAggregationStateHashTable
quickstep_storage_HashTable
quickstep_storage_HashTable_proto
quickstep_storage_HashTableBase
quickstep_storage_LinearOpenAddressingHashTable
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_storage_SeparateChainingHashTable
quickstep_storage_SimpleScalarSeparateChainingHashTable
quickstep_storage_TupleReference
@@ -757,9 +737,8 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
target_link_libraries(quickstep_storage_HashTablePool
glog
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_FastHashTableFactory
quickstep_storage_HashTableBase
+ quickstep_storage_HashTableFactory
quickstep_threading_SpinMutex
quickstep_utility_Macros
quickstep_utility_StringUtil)
@@ -817,12 +796,32 @@ target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable
quickstep_utility_Alignment
quickstep_utility_Macros
quickstep_utility_PrimeNumber)
+target_link_libraries(quickstep_storage_PackedPayloadAggregationStateHashTable
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_HashTableBase
+ quickstep_storage_HashTableKeyManager
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_storage_TupleReference
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorUtil
+ quickstep_threading_SpinMutex
+ quickstep_threading_SpinSharedMutex
+ quickstep_types_Type
+ quickstep_types_TypedValue
+ quickstep_types_containers_ColumnVectorsValueAccessor
+ quickstep_utility_Alignment
+ quickstep_utility_HashPair
+ quickstep_utility_Macros
+ quickstep_utility_PrimeNumber)
target_link_libraries(quickstep_storage_PartitionedHashTablePool
glog
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_FastHashTableFactory
quickstep_storage_HashTableBase
+ quickstep_storage_HashTableFactory
quickstep_utility_Macros
quickstep_utility_StringUtil)
target_link_libraries(quickstep_storage_PreloaderThread
@@ -933,7 +932,6 @@ target_link_libraries(quickstep_storage_StorageBlock
glog
quickstep_catalog_CatalogRelationSchema
quickstep_catalog_CatalogTypedefs
- quickstep_expressions_aggregation_AggregationHandle
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_storage_BasicColumnStoreTupleStorageSubBlock
@@ -942,7 +940,6 @@ target_link_libraries(quickstep_storage_StorageBlock
quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
quickstep_storage_CompressedPackedRowStoreTupleStorageSubBlock
quickstep_storage_CountedReference
- quickstep_storage_HashTableBase
quickstep_storage_IndexSubBlock
quickstep_storage_InsertDestinationInterface
quickstep_storage_SMAIndexSubBlock
@@ -1111,6 +1108,7 @@ target_link_libraries(quickstep_storage
quickstep_storage_BasicColumnStoreValueAccessor
quickstep_storage_BloomFilterIndexSubBlock
quickstep_storage_CSBTreeIndexSubBlock
+ quickstep_storage_CollisionFreeAggregationStateHashTable
quickstep_storage_ColumnStoreUtil
quickstep_storage_CompressedBlockBuilder
quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -1123,9 +1121,6 @@ target_link_libraries(quickstep_storage
quickstep_storage_EvictionPolicy
quickstep_storage_FileManager
quickstep_storage_FileManagerLocal
- quickstep_storage_FastHashTable
- quickstep_storage_FastHashTableFactory
- quickstep_storage_FastSeparateChainingHashTable
quickstep_storage_HashTable
quickstep_storage_HashTable_proto
quickstep_storage_HashTableBase
@@ -1139,6 +1134,7 @@ target_link_libraries(quickstep_storage
quickstep_storage_InsertDestination_proto
quickstep_storage_LinearOpenAddressingHashTable
quickstep_storage_PartitionedHashTablePool
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_storage_PreloaderThread
quickstep_storage_SMAIndexSubBlock
quickstep_storage_SeparateChainingHashTable
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/storage/CollisionFreeAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp b/storage/CollisionFreeAggregationStateHashTable.cpp
new file mode 100644
index 0000000..15d4dfe
--- /dev/null
+++ b/storage/CollisionFreeAggregationStateHashTable.cpp
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/CollisionFreeAggregationStateHashTable.hpp"
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+
+namespace quickstep {
+
+CollisionFreeAggregationStateHashTable::CollisionFreeAggregationStateHashTable(
+ const std::vector<const Type *> &key_types,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager)
+ : key_type_(key_types.front()),
+ num_entries_(num_entries),
+ num_handles_(handles.size()),
+ handles_(handles),
+ num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)),
+ storage_manager_(storage_manager) {
+ CHECK_EQ(1u, key_types.size());
+ DCHECK_GT(num_entries, 0u);
+
+ std::map<std::string, std::size_t> memory_offsets;
+ std::size_t required_memory = 0;
+
+ memory_offsets.emplace("existence_map", required_memory);
+ required_memory +=
+ CacheLineAlignedBytes(ConcurrentBitVector::BytesNeeded(num_entries));
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ const AggregationHandle *handle = handles_[i];
+ const std::vector<const Type *> argument_types = handle->getArgumentTypes();
+
+ std::size_t state_size = 0;
+ switch (handle->getAggregationID()) {
+ case AggregationID::kCount: {
+ state_size = sizeof(std::atomic<std::size_t>);
+ break;
+ }
+ case AggregationID::kSum: {
+ CHECK_EQ(1u, argument_types.size());
+ switch (argument_types.front()->getTypeID()) {
+ case TypeID::kInt: // Fall through
+ case TypeID::kLong:
+ state_size = sizeof(std::atomic<std::int64_t>);
+ break;
+ case TypeID::kFloat: // Fall through
+ case TypeID::kDouble:
+ state_size = sizeof(std::atomic<double>);
+ break;
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+
+ memory_offsets.emplace(std::string("state") + std::to_string(i),
+ required_memory);
+ required_memory += CacheLineAlignedBytes(state_size * num_entries);
+ }
+
+ const std::size_t num_storage_slots =
+ storage_manager_->SlotsNeededForBytes(required_memory);
+
+ const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
+ blob_ = storage_manager_->getBlobMutable(blob_id);
+
+ void *memory_start = blob_->getMemoryMutable();
+ existence_map_.reset(new ConcurrentBitVector(
+ reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"),
+ num_entries,
+ false /* initialize */));
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ vec_tables_.emplace_back(
+ reinterpret_cast<char *>(memory_start) +
+ memory_offsets.at(std::string("state") + std::to_string(i)));
+ }
+
+ memory_size_ = required_memory;
+ num_init_partitions_ = std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL);
+}
+
+CollisionFreeAggregationStateHashTable::~CollisionFreeAggregationStateHashTable() {
+ const block_id blob_id = blob_->getID();
+ blob_.release();
+ storage_manager_->deleteBlockOrBlobFile(blob_id);
+}
+
+void CollisionFreeAggregationStateHashTable::destroyPayload() {
+}
+
+bool CollisionFreeAggregationStateHashTable::upsertValueAccessor(
+ const std::vector<std::vector<attribute_id>> &argument_ids,
+ const std::vector<attribute_id> &key_attr_ids,
+ ValueAccessor *base_accessor,
+ ColumnVectorsValueAccessor *aux_accessor) {
+ DCHECK_EQ(1u, key_attr_ids.size());
+
+ const attribute_id key_attr_id = key_attr_ids.front();
+ const bool is_key_nullable = key_type_->isNullable();
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ DCHECK_LE(argument_ids[i].size(), 1u);
+
+ const attribute_id argument_id =
+ argument_ids[i].empty() ? kInvalidAttributeID : argument_ids[i].front();
+
+ const AggregationHandle *handle = handles_[i];
+ const auto &argument_types = handle->getArgumentTypes();
+
+ const Type *argument_type;
+ bool is_argument_nullable;
+ if (argument_types.empty()) {
+ argument_type = nullptr;
+ is_argument_nullable = false;
+ } else {
+ argument_type = argument_types.front();
+ is_argument_nullable = argument_type->isNullable();
+ }
+
+ InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+ base_accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ if (key_attr_id >= 0) {
+ if (argument_id >= 0) {
+ upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ key_attr_id,
+ argument_id,
+ vec_tables_[i],
+ accessor,
+ accessor);
+ } else {
+ upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ key_attr_id,
+ -(argument_id+2),
+ vec_tables_[i],
+ accessor,
+ aux_accessor);
+ }
+ } else {
+ if (argument_id >= 0) {
+ upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ -(key_attr_id+2),
+ argument_id,
+ vec_tables_[i],
+ aux_accessor,
+ accessor);
+ } else {
+ upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ -(key_attr_id+2),
+ -(argument_id+2),
+ vec_tables_[i],
+ aux_accessor,
+ aux_accessor);
+ }
+ }
+ });
+ }
+ return true;
+}
+
+void CollisionFreeAggregationStateHashTable::finalizeKey(
+ const std::size_t partition_id,
+ NativeColumnVector *output_cv) const {
+ const std::size_t start_position =
+ calculatePartitionStartPosition(partition_id);
+ const std::size_t end_position =
+ calculatePartitionEndPosition(partition_id);
+
+ switch (key_type_->getTypeID()) {
+ case TypeID::kInt:
+ finalizeKeyInternal<int>(start_position, end_position, output_cv);
+ return;
+ case TypeID::kLong:
+ finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
+ return;
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+void CollisionFreeAggregationStateHashTable::finalizeState(
+ const std::size_t partition_id,
+ std::size_t handle_id,
+ NativeColumnVector *output_cv) const {
+ const std::size_t start_position =
+ calculatePartitionStartPosition(partition_id);
+ const std::size_t end_position =
+ calculatePartitionEndPosition(partition_id);
+
+ const AggregationHandle *handle = handles_[handle_id];
+ const auto &argument_types = handle->getArgumentTypes();
+ const Type *argument_type =
+ argument_types.empty() ? nullptr : argument_types.front();
+
+ finalizeStateDispatchHelper(handle->getAggregationID(),
+ argument_type,
+ vec_tables_[handle_id],
+ start_position,
+ end_position,
+ output_cv);
+}
+
+} // namespace quickstep