You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/07 22:35:28 UTC
[09/14] incubator-quickstep git commit: - Adds
CollisionFreeVectorTable to support specialized fast path aggregation for
range-bounded single integer group-by key. - Supports copy elision for
aggregation.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index b942c1b..0b34908 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -20,7 +20,7 @@
#include "storage/AggregationOperationState.hpp"
#include <cstddef>
-#include <cstdio>
+#include <cstdint>
#include <memory>
#include <string>
#include <utility>
@@ -34,29 +34,32 @@
#include "expressions/aggregation/AggregateFunction.hpp"
#include "expressions/aggregation/AggregateFunctionFactory.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "storage/AggregationOperationState.pb.h"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableBase.hpp"
+#include "storage/CollisionFreeVectorTable.hpp"
#include "storage/HashTableFactory.hpp"
+#include "storage/HashTableBase.hpp"
#include "storage/InsertDestination.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
#include "storage/TupleIdSequence.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
-#include "glog/logging.h"
+#include "gflags/gflags.h"
-using std::unique_ptr;
+#include "glog/logging.h"
namespace quickstep {
@@ -80,148 +83,145 @@ AggregationOperationState::AggregationOperationState(
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
StorageManager *storage_manager)
: input_relation_(input_relation),
- is_aggregate_partitioned_(checkAggregatePartitioned(
- estimated_num_entries, is_distinct, group_by, aggregate_functions)),
+ is_aggregate_collision_free_(false),
+ is_aggregate_partitioned_(false),
predicate_(predicate),
- group_by_list_(std::move(group_by)),
- arguments_(std::move(arguments)),
is_distinct_(std::move(is_distinct)),
storage_manager_(storage_manager) {
+ if (!group_by.empty()) {
+ if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
+ is_aggregate_collision_free_ = true;
+ } else {
+ is_aggregate_partitioned_ = checkAggregatePartitioned(
+ estimated_num_entries, is_distinct_, group_by, aggregate_functions);
+ }
+ }
+
// Sanity checks: each aggregate has a corresponding list of arguments.
- DCHECK(aggregate_functions.size() == arguments_.size());
+ DCHECK(aggregate_functions.size() == arguments.size());
// Get the types of GROUP BY expressions for creating HashTables below.
- std::vector<const Type *> group_by_types;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- group_by_types.emplace_back(&group_by_element->getType());
+ for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ group_by_types_.emplace_back(&group_by_element->getType());
+ }
+
+ // Prepare group-by key ids and non-trivial expressions.
+ for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ const attribute_id attr_id =
+ group_by_element->getAttributeIdForValueAccessor();
+ if (attr_id != kInvalidAttributeID) {
+ group_by_key_ids_.emplace_back(ValueAccessorSource::kBase, attr_id);
+ } else {
+ group_by_key_ids_.emplace_back(ValueAccessorSource::kDerived,
+ non_trivial_expressions_.size());
+ non_trivial_expressions_.emplace_back(group_by_element.release());
+ }
}
std::vector<AggregationHandle *> group_by_handles;
- group_by_handles.clear();
-
- if (aggregate_functions.size() == 0) {
- // If there is no aggregation function, then it is a distinctify operation
- // on the group-by expressions.
- DCHECK_GT(group_by_list_.size(), 0u);
-
- handles_.emplace_back(new AggregationHandleDistinct());
- arguments_.push_back({});
- is_distinct_.emplace_back(false);
- group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- {1},
- handles_,
- storage_manager));
- } else {
- // Set up each individual aggregate in this operation.
- std::vector<const AggregateFunction *>::const_iterator agg_func_it =
- aggregate_functions.begin();
- std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
- args_it = arguments_.begin();
- std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
- std::vector<HashTableImplType>::const_iterator
- distinctify_hash_table_impl_types_it =
- distinctify_hash_table_impl_types.begin();
- std::vector<std::size_t> payload_sizes;
- for (; agg_func_it != aggregate_functions.end();
- ++agg_func_it, ++args_it, ++is_distinct_it) {
- // Get the Types of this aggregate's arguments so that we can create an
- // AggregationHandle.
- std::vector<const Type *> argument_types;
- for (const std::unique_ptr<const Scalar> &argument : *args_it) {
- argument_types.emplace_back(&argument->getType());
- }
- // Sanity checks: aggregate function exists and can apply to the specified
- // arguments.
- DCHECK(*agg_func_it != nullptr);
- DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
-
- // Have the AggregateFunction create an AggregationHandle that we can use
- // to do actual aggregate computation.
- handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
-
- if (!group_by_list_.empty()) {
- // Aggregation with GROUP BY: combined payload is partially updated in
- // the presence of DISTINCT.
- if (*is_distinct_it) {
- handles_.back()->blockUpdate();
- }
- group_by_handles.emplace_back(handles_.back());
- payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
+ // Set up each individual aggregate in this operation.
+ std::vector<const AggregateFunction *>::const_iterator agg_func_it =
+ aggregate_functions.begin();
+ std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
+ args_it = arguments.begin();
+ std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
+ std::vector<HashTableImplType>::const_iterator
+ distinctify_hash_table_impl_types_it =
+ distinctify_hash_table_impl_types.begin();
+ for (; agg_func_it != aggregate_functions.end();
+ ++agg_func_it, ++args_it, ++is_distinct_it) {
+ // Get the Types of this aggregate's arguments so that we can create an
+ // AggregationHandle.
+ std::vector<const Type *> argument_types;
+ for (const std::unique_ptr<const Scalar> &argument : *args_it) {
+ argument_types.emplace_back(&argument->getType());
+ }
+
+ // Prepare argument attribute ids and non-trivial expressions.
+ std::vector<MultiSourceAttributeId> argument_ids;
+ for (std::unique_ptr<const Scalar> &argument : *args_it) {
+ const attribute_id attr_id =
+ argument->getAttributeIdForValueAccessor();
+ if (attr_id != kInvalidAttributeID) {
+ argument_ids.emplace_back(ValueAccessorSource::kBase, attr_id);
} else {
- // Aggregation without GROUP BY: create a single global state.
- single_states_.emplace_back(handles_.back()->createInitialState());
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // See if all of this aggregate's arguments are attributes in the input
- // relation. If so, remember the attribute IDs so that we can do copy
- // elision when actually performing the aggregation.
- std::vector<attribute_id> local_arguments_as_attributes;
- local_arguments_as_attributes.reserve(args_it->size());
- for (const std::unique_ptr<const Scalar> &argument : *args_it) {
- const attribute_id argument_id =
- argument->getAttributeIdForValueAccessor();
- if (argument_id == -1) {
- local_arguments_as_attributes.clear();
- break;
- } else {
- DCHECK_EQ(input_relation_.getID(),
- argument->getRelationIdForValueAccessor());
- local_arguments_as_attributes.push_back(argument_id);
- }
- }
-
- arguments_as_attributes_.emplace_back(
- std::move(local_arguments_as_attributes));
-#endif
+ argument_ids.emplace_back(ValueAccessorSource::kDerived,
+ non_trivial_expressions_.size());
+ non_trivial_expressions_.emplace_back(argument.release());
}
+ }
+ argument_ids_.emplace_back(std::move(argument_ids));
+
+ // Sanity checks: aggregate function exists and can apply to the specified
+ // arguments.
+ DCHECK(*agg_func_it != nullptr);
+ DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
- // Initialize the corresponding distinctify hash table if this is a
- // DISTINCT aggregation.
+ // Have the AggregateFunction create an AggregationHandle that we can use
+ // to do actual aggregate computation.
+ handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
+
+ if (!group_by_key_ids_.empty()) {
+ // Aggregation with GROUP BY: combined payload is partially updated in
+ // the presence of DISTINCT.
if (*is_distinct_it) {
- std::vector<const Type *> key_types(group_by_types);
- key_types.insert(
- key_types.end(), argument_types.begin(), argument_types.end());
- // TODO(jianqiao): estimated_num_entries is quite inaccurate for
- // estimating the number of entries in the distinctify hash table.
- // We may estimate for each distinct aggregation an
- // estimated_num_distinct_keys value during query optimization, if it's
- // worth.
- distinctify_hashtables_.emplace_back(
- AggregationStateFastHashTableFactory::CreateResizable(
- *distinctify_hash_table_impl_types_it,
- key_types,
- estimated_num_entries,
- {0},
- {},
- storage_manager));
- ++distinctify_hash_table_impl_types_it;
- } else {
- distinctify_hashtables_.emplace_back(nullptr);
+ handles_.back()->blockUpdate();
}
+ group_by_handles.emplace_back(handles_.back().get());
+ } else {
+ // Aggregation without GROUP BY: create a single global state.
+ single_states_.emplace_back(handles_.back()->createInitialState());
}
- if (!group_by_handles.empty()) {
- // Aggregation with GROUP BY: create a HashTable pool.
- if (!is_aggregate_partitioned_) {
- group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- payload_sizes,
- group_by_handles,
- storage_manager));
- } else {
- partitioned_group_by_hashtable_pool_.reset(
- new PartitionedHashTablePool(estimated_num_entries,
- FLAGS_num_aggregation_partitions,
- hash_table_impl_type,
- group_by_types,
- payload_sizes,
- group_by_handles,
- storage_manager));
- }
+ // Initialize the corresponding distinctify hash table if this is a
+ // DISTINCT aggregation.
+ if (*is_distinct_it) {
+ std::vector<const Type *> key_types(group_by_types_);
+ key_types.insert(
+ key_types.end(), argument_types.begin(), argument_types.end());
+ // TODO(jianqiao): estimated_num_entries is quite inaccurate for
+ // estimating the number of entries in the distinctify hash table.
+ // We need to estimate for each distinct aggregation an
+ // estimated_num_distinct_keys value during query optimization.
+ distinctify_hashtables_.emplace_back(
+ AggregationStateHashTableFactory::CreateResizable(
+ *distinctify_hash_table_impl_types_it,
+ key_types,
+ estimated_num_entries,
+ {} /* handles */,
+ storage_manager));
+ ++distinctify_hash_table_impl_types_it;
+ } else {
+ distinctify_hashtables_.emplace_back(nullptr);
+ }
+ }
+
+ if (!group_by_key_ids_.empty()) {
+ // Aggregation with GROUP BY: create the hash table (pool).
+ if (is_aggregate_collision_free_) {
+ collision_free_hashtable_.reset(
+ AggregationStateHashTableFactory::CreateResizable(
+ hash_table_impl_type,
+ group_by_types_,
+ estimated_num_entries,
+ group_by_handles,
+ storage_manager));
+ } else if (is_aggregate_partitioned_) {
+ partitioned_group_by_hashtable_pool_.reset(
+ new PartitionedHashTablePool(estimated_num_entries,
+ FLAGS_num_aggregation_partitions,
+ hash_table_impl_type,
+ group_by_types_,
+ group_by_handles,
+ storage_manager));
+ } else {
+ group_by_hashtable_pool_.reset(
+ new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types_,
+ group_by_handles,
+ storage_manager));
}
}
}
@@ -269,7 +269,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
proto.group_by_expressions(group_by_idx), database));
}
- unique_ptr<Predicate> predicate;
+ std::unique_ptr<Predicate> predicate;
if (proto.has_predicate()) {
predicate.reset(
PredicateFactory::ReconstructFromProto(proto.predicate(), database));
@@ -353,153 +353,210 @@ bool AggregationOperationState::ProtoIsValid(
return true;
}
-void AggregationOperationState::aggregateBlock(const block_id input_block,
- LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
- if (group_by_list_.empty()) {
- aggregateBlockSingleState(input_block);
- } else {
- aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
+bool AggregationOperationState::checkAggregatePartitioned(
+ const std::size_t estimated_num_groups,
+ const std::vector<bool> &is_distinct,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const std::vector<const AggregateFunction *> &aggregate_functions) const {
+ // If there's no aggregation, return false.
+ if (aggregate_functions.empty()) {
+ return false;
+ }
+ // Check if there's a distinct operation involved in any aggregate, if so
+ // the aggregate can't be partitioned.
+ for (auto distinct : is_distinct) {
+ if (distinct) {
+ return false;
+ }
+ }
+ // There's no distinct aggregation involved, Check if there's at least one
+ // GROUP BY operation.
+ if (group_by.empty()) {
+ return false;
+ }
+
+ // Currently we require that all the group-by keys are ScalarAttributes for
+ // the convenient of implementing copy elision.
+ // TODO(jianqiao): relax this requirement.
+ for (const auto &group_by_element : group_by) {
+ if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
+ return false;
+ }
}
+
+ // There are GROUP BYs without DISTINCT. Check if the estimated number of
+ // groups is large enough to warrant a partitioned aggregation.
+ return estimated_num_groups >
+ static_cast<std::size_t>(
+ FLAGS_partition_aggregation_num_groups_threshold);
+ return false;
}
-void AggregationOperationState::finalizeAggregate(
- InsertDestination *output_destination) {
- if (group_by_list_.empty()) {
- finalizeSingleState(output_destination);
+std::size_t AggregationOperationState::getNumInitializationPartitions() const {
+ if (is_aggregate_collision_free_) {
+ return static_cast<CollisionFreeVectorTable *>(
+ collision_free_hashtable_.get())->getNumInitializationPartitions();
} else {
- finalizeHashTable(output_destination);
+ return 0u;
}
}
-void AggregationOperationState::mergeSingleState(
- const std::vector<std::unique_ptr<AggregationState>> &local_state) {
- DEBUG_ASSERT(local_state.size() == single_states_.size());
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (!is_distinct_[agg_idx]) {
- handles_[agg_idx]->mergeStates(*local_state[agg_idx],
- single_states_[agg_idx].get());
- }
+std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
+ if (is_aggregate_collision_free_) {
+ return static_cast<CollisionFreeVectorTable *>(
+ collision_free_hashtable_.get())->getNumFinalizationPartitions();
+ } else if (is_aggregate_partitioned_) {
+ return partitioned_group_by_hashtable_pool_->getNumPartitions();
+ } else {
+ return 1u;
}
}
-void AggregationOperationState::aggregateBlockSingleState(
- const block_id input_block) {
- // Aggregate per-block state for each aggregate.
- std::vector<std::unique_ptr<AggregationState>> local_state;
+void AggregationOperationState::initialize(const std::size_t partition_id) {
+ if (is_aggregate_collision_free_) {
+ static_cast<CollisionFreeVectorTable *>(
+ collision_free_hashtable_.get())->initialize(partition_id);
+ } else {
+ LOG(FATAL) << "AggregationOperationState::initializeState() "
+ << "is not supported by this aggregation";
+ }
+}
+void AggregationOperationState::aggregateBlock(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
BlockReference block(
storage_manager_->getBlock(input_block, input_relation_));
+ const auto &tuple_store = block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor());
+ std::unique_ptr<ValueAccessor> shared_accessor;
+ ValueAccessor *accessor = base_accessor.get();
+ // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
+ // as the existence map for the tuples.
std::unique_ptr<TupleIdSequence> matches;
if (predicate_ != nullptr) {
- std::unique_ptr<ValueAccessor> accessor(
- block->getTupleStorageSubBlock().createValueAccessor());
- matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
+ matches.reset(block->getMatchesForPredicate(predicate_.get()));
+ shared_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+ accessor = shared_accessor.get();
+ }
+ if (lip_filter_adaptive_prober != nullptr) {
+ matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor));
+ shared_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+ accessor = shared_accessor.get();
}
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all arguments are attributes of the input relation, elide a copy.
- if (!arguments_as_attributes_[agg_idx].empty()) {
- local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
+ std::unique_ptr<ColumnVectorsValueAccessor> non_trivial_results;
+ if (!non_trivial_expressions_.empty()) {
+ non_trivial_results.reset(new ColumnVectorsValueAccessor());
+ SubBlocksReference sub_blocks_ref(tuple_store,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ for (const auto &expression : non_trivial_expressions_) {
+ non_trivial_results->addColumn(
+ expression->getAllValues(accessor, &sub_blocks_ref));
}
-#endif
+ }
+
+ accessor->beginIterationVirtual();
+
+ ValueAccessorMultiplexer accessor_mux(accessor, non_trivial_results.get());
+ if (group_by_key_ids_.empty()) {
+ aggregateBlockSingleState(accessor_mux);
+ } else {
+ aggregateBlockHashTable(accessor_mux);
+ }
+}
+
+void AggregationOperationState::aggregateBlockSingleState(
+ const ValueAccessorMultiplexer &accessor_mux) {
+ // Aggregate per-block state for each aggregate.
+ std::vector<std::unique_ptr<AggregationState>> local_state;
+
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+ const auto &argument_ids = argument_ids_[agg_idx];
+ const auto &handle = handles_[agg_idx];
+
+ AggregationState *state = nullptr;
if (is_distinct_[agg_idx]) {
- // Call StorageBlock::aggregateDistinct() to put the arguments as keys
- // directly into the (threadsafe) shared global distinctify HashTable
- // for this aggregate.
- block->aggregateDistinct(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- {}, /* group_by */
- matches.get(),
- distinctify_hashtables_[agg_idx].get(),
- nullptr /* reuse_group_by_vectors */);
- local_state.emplace_back(nullptr);
+ handle->insertValueAccessorIntoDistinctifyHashTable(
+ argument_ids,
+ {},
+ accessor_mux,
+ distinctify_hashtables_[agg_idx].get());
} else {
- // Call StorageBlock::aggregate() to actually do the aggregation.
- local_state.emplace_back(block->aggregate(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- matches.get()));
+ if (argument_ids.empty()) {
+ // Special case. This is a nullary aggregate (i.e. COUNT(*)).
+ ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+ DCHECK(base_accessor != nullptr);
+ state = handle->accumulateNullary(base_accessor->getNumTuplesVirtual());
+ } else {
+ // Have the AggregationHandle actually do the aggregation.
+ state = handle->accumulateValueAccessor(argument_ids, accessor_mux);
+ }
}
+ local_state.emplace_back(state);
}
// Merge per-block aggregation states back with global state.
mergeSingleState(local_state);
}
-void AggregationOperationState::aggregateBlockHashTable(
- const block_id input_block,
- LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
- BlockReference block(
- storage_manager_->getBlock(input_block, input_relation_));
-
- // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
- // as the existence map for the tuples.
- std::unique_ptr<TupleIdSequence> matches;
- if (predicate_ != nullptr) {
- matches.reset(block->getMatchesForPredicate(predicate_.get()));
- }
- if (lip_filter_adaptive_prober != nullptr) {
- std::unique_ptr<ValueAccessor> accessor(
- block->getTupleStorageSubBlock().createValueAccessor(matches.get()));
- matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
- }
-
- // This holds values of all the GROUP BY attributes so that the can be reused
- // across multiple aggregates (i.e. we only pay the cost of evaluatin the
- // GROUP BY expressions once).
- std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
-
+void AggregationOperationState::mergeSingleState(
+ const std::vector<std::unique_ptr<AggregationState>> &local_state) {
+ DCHECK_EQ(local_state.size(), single_states_.size());
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (is_distinct_[agg_idx]) {
- // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
- // expression
- // values and the aggregation arguments together as keys directly into the
- // (threadsafe) shared global distinctify HashTable for this aggregate.
- block->aggregateDistinct(*handles_[agg_idx],
- arguments_[agg_idx],
- nullptr, /* arguments_as_attributes */
- group_by_list_,
- matches.get(),
- distinctify_hashtables_[agg_idx].get(),
- &reuse_group_by_vectors);
+ if (!is_distinct_[agg_idx]) {
+ handles_[agg_idx]->mergeStates(*local_state[agg_idx],
+ single_states_[agg_idx].get());
}
}
+}
+
+void AggregationOperationState::mergeGroupByHashTables(
+ AggregationStateHashTableBase *src,
+ AggregationStateHashTableBase *dst) const {
+ HashTableMerger merger(static_cast<PackedPayloadHashTable *>(dst));
+ static_cast<PackedPayloadHashTable *>(src)->forEachCompositeKey(&merger);
+}
- if (!is_aggregate_partitioned_) {
- // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
- // directly into the (threadsafe) shared global HashTable for this
- // aggregate.
- DCHECK(group_by_hashtable_pool_ != nullptr);
- AggregationStateHashTableBase *agg_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- DCHECK(agg_hash_table != nullptr);
- block->aggregateGroupBy(arguments_,
- group_by_list_,
- matches.get(),
- agg_hash_table,
- &reuse_group_by_vectors);
- group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+void AggregationOperationState::aggregateBlockHashTable(
+ const ValueAccessorMultiplexer &accessor_mux) {
+ if (is_aggregate_collision_free_) {
+ aggregateBlockHashTableImplCollisionFree(accessor_mux);
+ } else if (is_aggregate_partitioned_) {
+ aggregateBlockHashTableImplPartitioned(accessor_mux);
} else {
- ColumnVectorsValueAccessor temp_result;
- // IDs of 'arguments' as attributes in the ValueAccessor we create below.
- std::vector<attribute_id> argument_ids;
+ aggregateBlockHashTableImplThreadPrivate(accessor_mux);
+ }
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
+ const ValueAccessorMultiplexer &accessor_mux) {
+ DCHECK(collision_free_hashtable_ != nullptr);
+
+ collision_free_hashtable_->upsertValueAccessorCompositeKey(argument_ids_,
+ group_by_key_ids_,
+ accessor_mux);
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
+ const ValueAccessorMultiplexer &accessor_mux) {
+ DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
+
+ std::vector<attribute_id> group_by_key_ids;
+ for (const MultiSourceAttributeId &key_id : group_by_key_ids_) {
+ DCHECK(key_id.source == ValueAccessorSource::kBase);
+ group_by_key_ids.emplace_back(key_id.attr_id);
+ }
- // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
- std::vector<attribute_id> key_ids;
+ InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+ accessor_mux.getBaseAccessor(),
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ // TODO(jianqiao): handle the situation when keys in non_trivial_results
const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
- block->aggregateGroupByPartitioned(
- arguments_,
- group_by_list_,
- matches.get(),
- num_partitions,
- &temp_result,
- &argument_ids,
- &key_ids,
- &reuse_group_by_vectors);
+
// Compute the partitions for the tuple formed by group by values.
std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
partition_membership.resize(num_partitions);
@@ -507,32 +564,74 @@ void AggregationOperationState::aggregateBlockHashTable(
// Create a tuple-id sequence for each partition.
for (std::size_t partition = 0; partition < num_partitions; ++partition) {
partition_membership[partition].reset(
- new TupleIdSequence(temp_result.getEndPosition()));
+ new TupleIdSequence(accessor->getEndPosition()));
}
// Iterate over ValueAccessor for each tuple,
// set a bit in the appropriate TupleIdSequence.
- temp_result.beginIteration();
- while (temp_result.next()) {
+ while (accessor->next()) {
// We need a unique_ptr because getTupleWithAttributes() uses "new".
- std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
+ std::unique_ptr<Tuple> curr_tuple(
+ accessor->getTupleWithAttributes(group_by_key_ids));
const std::size_t curr_tuple_partition_id =
curr_tuple->getTupleHash() % num_partitions;
partition_membership[curr_tuple_partition_id]->set(
- temp_result.getCurrentPosition(), true);
+ accessor->getCurrentPosition(), true);
}
- // For each partition, create an adapter around Value Accessor and
- // TupleIdSequence.
- std::vector<std::unique_ptr<
- TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
- adapter.resize(num_partitions);
+
+ // Aggregate each partition.
for (std::size_t partition = 0; partition < num_partitions; ++partition) {
- adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
- *(partition_membership)[partition]));
+ std::unique_ptr<ValueAccessor> base_adapter(
+ accessor->createSharedTupleIdSequenceAdapter(
+ *partition_membership[partition]));
+
+ std::unique_ptr<ValueAccessor> derived_adapter;
+ if (accessor_mux.getDerivedAccessor() != nullptr) {
+ derived_adapter.reset(
+ accessor_mux.getDerivedAccessor()->createSharedTupleIdSequenceAdapterVirtual(
+ *partition_membership[partition]));
+ }
+
+ ValueAccessorMultiplexer local_mux(base_adapter.get(), derived_adapter.get());
partitioned_group_by_hashtable_pool_->getHashTable(partition)
- ->upsertValueAccessorCompositeKeyFast(
- argument_ids, adapter[partition].get(), key_ids, true);
+ ->upsertValueAccessorCompositeKey(argument_ids_,
+ group_by_key_ids_,
+ local_mux);
}
+ });
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
+ const ValueAccessorMultiplexer &accessor_mux) {
+ DCHECK(group_by_hashtable_pool_ != nullptr);
+
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+ if (is_distinct_[agg_idx]) {
+ handles_[agg_idx]->insertValueAccessorIntoDistinctifyHashTable(
+ argument_ids_[agg_idx],
+ group_by_key_ids_,
+ accessor_mux,
+ distinctify_hashtables_[agg_idx].get());
+ }
+ }
+
+ AggregationStateHashTableBase *agg_hash_table =
+ group_by_hashtable_pool_->getHashTable();
+
+ agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
+ group_by_key_ids_,
+ accessor_mux);
+ group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+}
+
+void AggregationOperationState::finalizeAggregate(
+ const std::size_t partition_id,
+ InsertDestination *output_destination) {
+ if (group_by_key_ids_.empty()) {
+ DCHECK_EQ(0u, partition_id);
+ finalizeSingleState(output_destination);
+ } else {
+ finalizeHashTable(partition_id, output_destination);
}
}
@@ -556,80 +655,83 @@ void AggregationOperationState::finalizeSingleState(
output_destination->insertTuple(Tuple(std::move(attribute_values)));
}
-void AggregationOperationState::mergeGroupByHashTables(
- AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
- HashTableMergerFast merger(dst);
- (static_cast<FastHashTable<true, false, true, false> *>(src))
- ->forEachCompositeKeyFast(&merger);
+void AggregationOperationState::finalizeHashTable(
+ const std::size_t partition_id,
+ InsertDestination *output_destination) {
+ if (is_aggregate_collision_free_) {
+ finalizeHashTableImplCollisionFree(partition_id, output_destination);
+ } else if (is_aggregate_partitioned_) {
+ finalizeHashTableImplPartitioned(partition_id, output_destination);
+ } else {
+ DCHECK_EQ(0u, partition_id);
+ finalizeHashTableImplThreadPrivate(output_destination);
+ }
}
-void AggregationOperationState::finalizeHashTable(
+void AggregationOperationState::finalizeHashTableImplCollisionFree(
+ const std::size_t partition_id,
+ InsertDestination *output_destination) {
+ std::vector<std::unique_ptr<ColumnVector>> final_values;
+ CollisionFreeVectorTable *hash_table =
+ static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get());
+
+ const std::size_t max_length =
+ hash_table->getNumTuplesInFinalizationPartition(partition_id);
+ ColumnVectorsValueAccessor complete_result;
+
+ DCHECK_EQ(1u, group_by_types_.size());
+ const Type *key_type = group_by_types_.front();
+ DCHECK(NativeColumnVector::UsableForType(*key_type));
+
+ std::unique_ptr<NativeColumnVector> key_cv(
+ std::make_unique<NativeColumnVector>(*key_type, max_length));
+ hash_table->finalizeKey(partition_id, key_cv.get());
+ complete_result.addColumn(key_cv.release());
+
+ for (std::size_t i = 0; i < handles_.size(); ++i) {
+ const Type *result_type = handles_[i]->getResultType();
+ DCHECK(NativeColumnVector::UsableForType(*result_type));
+
+ std::unique_ptr<NativeColumnVector> result_cv(
+ std::make_unique<NativeColumnVector>(*result_type, max_length));
+ hash_table->finalizeState(partition_id, i, result_cv.get());
+ complete_result.addColumn(result_cv.release());
+ }
+
+ // Bulk-insert the complete result.
+ output_destination->bulkInsertTuples(&complete_result);
+}
+
+void AggregationOperationState::finalizeHashTableImplPartitioned(
+ const std::size_t partition_id,
InsertDestination *output_destination) {
+ PackedPayloadHashTable *hash_table =
+ static_cast<PackedPayloadHashTable *>(
+ partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
+
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
- // TODO(harshad) - The merge phase may be slower when each hash table contains
- // large number of entries. We should find ways in which we can perform a
- // parallel merge.
+ if (handles_.empty()) {
+ const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
+ const std::uint8_t *dumb_placeholder) -> void {
+ group_by_keys.emplace_back(std::move(group_by_key));
+ };
- // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
- // e.g. Keep merging entries from smaller hash tables to larger.
-
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- if (hash_tables->size() > 1) {
- for (int hash_table_index = 0;
- hash_table_index < static_cast<int>(hash_tables->size() - 1);
- ++hash_table_index) {
- // Merge each hash table to the last hash table.
- mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
- hash_tables->back().get());
- }
+ hash_table->forEachCompositeKey(&keys_retriever);
}
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (is_distinct_[agg_idx]) {
- DCHECK(group_by_hashtable_pool_ != nullptr);
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(hash_tables != nullptr);
- if (hash_tables->empty()) {
- // We may have a case where hash_tables is empty, e.g. no input blocks.
- // However for aggregateOnDistinctifyHashTableForGroupBy to work
- // correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- group_by_hashtable_pool_->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- DCHECK(hash_tables->back() != nullptr);
- AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
- DCHECK(agg_hash_table != nullptr);
- handles_[agg_idx]->allowUpdate();
- handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
- *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
- }
-
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(hash_tables != nullptr);
- if (hash_tables->empty()) {
- // We may have a case where hash_tables is empty, e.g. no input blocks.
- // However for aggregateOnDistinctifyHashTableForGroupBy to work
- // correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- group_by_hashtable_pool_->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
- DCHECK(agg_hash_table != nullptr);
ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
- *agg_hash_table, &group_by_keys, agg_idx);
+ *hash_table, agg_idx, &group_by_keys);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
}
}
+ hash_table->destroyPayload();
// Reorganize 'group_by_keys' in column-major order so that we can make a
// ColumnVectorsValueAccessor to bulk-insert results.
@@ -640,23 +742,20 @@ void AggregationOperationState::finalizeHashTable(
// in a single HashTable.
std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
std::size_t group_by_element_idx = 0;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- const Type &group_by_type = group_by_element->getType();
- if (NativeColumnVector::UsableForType(group_by_type)) {
+ for (const Type *group_by_type : group_by_types_) {
+ if (NativeColumnVector::UsableForType(*group_by_type)) {
NativeColumnVector *element_cv =
- new NativeColumnVector(group_by_type, group_by_keys.size());
+ new NativeColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(
- std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
}
} else {
IndirectColumnVector *element_cv =
- new IndirectColumnVector(group_by_type, group_by_keys.size());
+ new IndirectColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(
- std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
}
}
++group_by_element_idx;
@@ -676,42 +775,64 @@ void AggregationOperationState::finalizeHashTable(
output_destination->bulkInsertTuples(&complete_result);
}
-void AggregationOperationState::destroyAggregationHashTablePayload() {
- std::vector<std::unique_ptr<AggregationStateHashTableBase>> *all_hash_tables =
- nullptr;
- if (!is_aggregate_partitioned_) {
- if (group_by_hashtable_pool_ != nullptr) {
- all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- } else {
- if (partitioned_group_by_hashtable_pool_ != nullptr) {
- all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables();
- }
+void AggregationOperationState::finalizeHashTableImplThreadPrivate(
+ InsertDestination *output_destination) {
+ // TODO(harshad) - The merge phase may be slower when each hash table contains
+ // large number of entries. We should find ways in which we can perform a
+ // parallel merge.
+
+ // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
+ // e.g. Keep merging entries from smaller hash tables to larger.
+
+ auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+ DCHECK(hash_tables != nullptr);
+ if (hash_tables->empty()) {
+ return;
}
- if (all_hash_tables != nullptr) {
- for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index) {
- (*all_hash_tables)[ht_index]->destroyPayload();
- }
+
+ std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr(
+ hash_tables->back().release());
+ for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+ std::unique_ptr<AggregationStateHashTableBase> hash_table(
+ hash_tables->at(i).release());
+ mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
+ hash_table->destroyPayload();
}
-}
-void AggregationOperationState::finalizeAggregatePartitioned(
- const std::size_t partition_id, InsertDestination *output_destination) {
+ PackedPayloadHashTable *final_hash_table =
+ static_cast<PackedPayloadHashTable *>(final_hash_table_ptr.get());
+
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
+ if (handles_.empty()) {
+ const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
+ const std::uint8_t *dumb_placeholder) -> void {
+ group_by_keys.emplace_back(std::move(group_by_key));
+ };
+
+ final_hash_table->forEachCompositeKey(&keys_retriever);
+ }
+
+
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- AggregationStateHashTableBase *hash_table =
- partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
- ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
- *hash_table, &group_by_keys, agg_idx);
- if (agg_result_col != nullptr) {
- final_values.emplace_back(agg_result_col);
+ if (is_distinct_[agg_idx]) {
+ handles_[agg_idx]->allowUpdate();
+ handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
+ *distinctify_hashtables_[agg_idx], agg_idx, final_hash_table);
}
+
+ ColumnVector *agg_result_col =
+ handles_[agg_idx]->finalizeHashTable(
+ *final_hash_table, agg_idx, &group_by_keys);
+ DCHECK(agg_result_col != nullptr);
+
+ final_values.emplace_back(agg_result_col);
}
+ final_hash_table->destroyPayload();
// Reorganize 'group_by_keys' in column-major order so that we can make a
// ColumnVectorsValueAccessor to bulk-insert results.
@@ -722,19 +843,22 @@ void AggregationOperationState::finalizeAggregatePartitioned(
// in a single HashTable.
std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
std::size_t group_by_element_idx = 0;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- const Type &group_by_type = group_by_element->getType();
- if (NativeColumnVector::UsableForType(group_by_type)) {
- NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+ for (const Type *group_by_type : group_by_types_) {
+ if (NativeColumnVector::UsableForType(*group_by_type)) {
+ NativeColumnVector *element_cv =
+ new NativeColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(
+ std::move(group_key[group_by_element_idx]));
}
} else {
- IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+ IndirectColumnVector *element_cv =
+ new IndirectColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(
+ std::move(group_key[group_by_element_idx]));
}
}
++group_by_element_idx;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 591e3a1..13ee377 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -24,31 +24,27 @@
#include <memory>
#include <vector>
-#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
-#include "storage/AggregationOperationState.pb.h"
#include "storage/HashTableBase.hpp"
#include "storage/HashTablePool.hpp"
#include "storage/PartitionedHashTablePool.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
#include "utility/Macros.hpp"
-#include "gflags/gflags.h"
-
namespace quickstep {
+namespace serialization { class AggregationOperationState; }
+
class AggregateFunction;
class CatalogDatabaseLite;
class CatalogRelationSchema;
class InsertDestination;
class LIPFilterAdaptiveProber;
class StorageManager;
-
-DECLARE_int32(num_aggregation_partitions);
-DECLARE_int32(partition_aggregation_num_groups_threshold);
+class Type;
/** \addtogroup Storage
* @{
@@ -156,6 +152,29 @@ class AggregationOperationState {
const CatalogDatabaseLite &database);
/**
+ * @brief Get the number of partitions to be used for initializing the
+ * aggregation.
+ *
+ * @return The number of partitions to be used for initializing the aggregation.
+ **/
+ std::size_t getNumInitializationPartitions() const;
+
+ /**
+ * @brief Get the number of partitions to be used for finalizing the
+ * aggregation.
+ *
+ * @return The number of partitions to be used for finalizing the aggregation.
+ **/
+ std::size_t getNumFinalizationPartitions() const;
+
+ /**
+ * @brief Initialize the specified partition of this aggregation.
+ *
+ * @param partition_id ID of the partition to be initialized.
+ */
+ void initialize(const std::size_t partition_id);
+
+ /**
* @brief Compute aggregates on the tuples of the given storage block,
* updating the running state maintained by this
* AggregationOperationState.
@@ -166,127 +185,95 @@ class AggregationOperationState {
* the block.
**/
void aggregateBlock(const block_id input_block,
- LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr);
/**
* @brief Generate the final results for the aggregates managed by this
* AggregationOperationState and write them out to StorageBlock(s).
*
+ * @param partition_id The partition id of this finalize operation.
* @param output_destination An InsertDestination where the finalized output
* tuple(s) from this aggregate are to be written.
**/
- void finalizeAggregate(InsertDestination *output_destination);
-
- /**
- * @brief Destroy the payloads in the aggregation hash tables.
- **/
- void destroyAggregationHashTablePayload();
-
- /**
- * @brief Generate the final results for the aggregates managed by this
- * AggregationOperationState and write them out to StorageBlock(s).
- * In this implementation, each thread picks a hash table belonging to
- * a partition and writes its values to StorageBlock(s). There is no
- * need to merge multiple hash tables in one, because there is no
- * overlap in the keys across two hash tables.
- *
- * @param partition_id The ID of the partition for which finalize is being
- * performed.
- * @param output_destination An InsertDestination where the finalized output
- * tuple(s) from this aggregate are to be written.
- **/
- void finalizeAggregatePartitioned(
- const std::size_t partition_id, InsertDestination *output_destination);
-
- static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
- AggregationStateHashTableBase *dst);
+ void finalizeAggregate(const std::size_t partition_id,
+ InsertDestination *output_destination);
- bool isAggregatePartitioned() const {
- return is_aggregate_partitioned_;
- }
+ private:
+ // Check whether partitioned aggregation can be applied.
+ bool checkAggregatePartitioned(
+ const std::size_t estimated_num_groups,
+ const std::vector<bool> &is_distinct,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const std::vector<const AggregateFunction *> &aggregate_functions) const;
- /**
- * @brief Get the number of partitions to be used for the aggregation.
- * For non-partitioned aggregations, we return 1.
- **/
- std::size_t getNumPartitions() const {
- return is_aggregate_partitioned_
- ? partitioned_group_by_hashtable_pool_->getNumPartitions()
- : 1;
- }
+ // Aggregate on input block.
+ void aggregateBlockSingleState(const ValueAccessorMultiplexer &accessor_mux);
- int dflag;
+ void aggregateBlockHashTable(const ValueAccessorMultiplexer &accessor_mux);
- private:
// Merge locally (per storage block) aggregated states with global aggregation
// states.
void mergeSingleState(
const std::vector<std::unique_ptr<AggregationState>> &local_state);
- // Aggregate on input block.
- void aggregateBlockSingleState(const block_id input_block);
- void aggregateBlockHashTable(const block_id input_block,
- LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
+ void mergeGroupByHashTables(AggregationStateHashTableBase *src,
+ AggregationStateHashTableBase *dst) const;
+ // Finalize the aggregation results into output_destination.
void finalizeSingleState(InsertDestination *output_destination);
- void finalizeHashTable(InsertDestination *output_destination);
- bool checkAggregatePartitioned(
- const std::size_t estimated_num_groups,
- const std::vector<bool> &is_distinct,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const std::vector<const AggregateFunction *> &aggregate_functions) const {
- // If there's no aggregation, return false.
- if (aggregate_functions.empty()) {
- return false;
- }
- // Check if there's a distinct operation involved in any aggregate, if so
- // the aggregate can't be partitioned.
- for (auto distinct : is_distinct) {
- if (distinct) {
- return false;
- }
- }
- // There's no distinct aggregation involved, Check if there's at least one
- // GROUP BY operation.
- if (group_by.empty()) {
- return false;
- }
- // There are GROUP BYs without DISTINCT. Check if the estimated number of
- // groups is large enough to warrant a partitioned aggregation.
- return estimated_num_groups >
- static_cast<std::size_t>(
- FLAGS_partition_aggregation_num_groups_threshold);
- }
+ void finalizeHashTable(const std::size_t partition_id,
+ InsertDestination *output_destination);
+
+ // Specialized implementations for aggregateBlockHashTable.
+ void aggregateBlockHashTableImplCollisionFree(
+ const ValueAccessorMultiplexer &accessor_mux);
+
+ void aggregateBlockHashTableImplPartitioned(
+ const ValueAccessorMultiplexer &accessor_mux);
+
+ void aggregateBlockHashTableImplThreadPrivate(
+ const ValueAccessorMultiplexer &accessor_mux);
+
+ // Specialized implementations for finalizeHashTable.
+ void finalizeHashTableImplCollisionFree(const std::size_t partition_id,
+ InsertDestination *output_destination);
+
+ void finalizeHashTableImplPartitioned(const std::size_t partition_id,
+ InsertDestination *output_destination);
+
+ void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
// Common state for all aggregates in this operation: the input relation, the
// filter predicate (if any), and the list of GROUP BY expressions (if any).
const CatalogRelationSchema &input_relation_;
+ // Whether the aggregation is collision free or not.
+ bool is_aggregate_collision_free_;
+
// Whether the aggregation is partitioned or not.
- const bool is_aggregate_partitioned_;
+ bool is_aggregate_partitioned_;
std::unique_ptr<const Predicate> predicate_;
- std::vector<std::unique_ptr<const Scalar>> group_by_list_;
// Each individual aggregate in this operation has an AggregationHandle and
- // some number of Scalar arguments.
- std::vector<AggregationHandle *> handles_;
- std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
+ // zero (indicated by -1) or one argument.
+ std::vector<std::unique_ptr<AggregationHandle>> handles_;
// For each aggregate, whether DISTINCT should be applied to the aggregate's
// arguments.
std::vector<bool> is_distinct_;
- // Hash table for obtaining distinct (i.e. unique) arguments.
- std::vector<std::unique_ptr<AggregationStateHashTableBase>>
- distinctify_hashtables_;
+ // Non-trivial group-by/argument expressions that need to be evaluated.
+ std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
+
+ std::vector<MultiSourceAttributeId> group_by_key_ids_;
+ std::vector<std::vector<MultiSourceAttributeId>> argument_ids_;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all an aggregate's argument expressions are simply attributes in
- // 'input_relation_', then this caches the attribute IDs of those arguments.
- std::vector<std::vector<attribute_id>> arguments_as_attributes_;
-#endif
+ std::vector<const Type *> group_by_types_;
+
+ // Hash table for obtaining distinct (i.e. unique) arguments.
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>> distinctify_hashtables_;
// Per-aggregate global states for aggregation without GROUP BY.
std::vector<std::unique_ptr<AggregationState>> single_states_;
@@ -295,14 +282,15 @@ class AggregationOperationState {
//
// TODO(shoban): We should ideally store the aggregation state together in one
// hash table to prevent multiple lookups.
- std::vector<std::unique_ptr<AggregationStateHashTableBase>>
- group_by_hashtables_;
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>> group_by_hashtables_;
// A vector of group by hash table pools.
std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
+ std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_;
+
StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index a44c3a7..293be17 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -165,6 +165,9 @@ if(QUICKSTEP_HAVE_BITWEAVING)
bitweaving/BitWeavingVIndexSubBlock.hpp)
endif()
# CMAKE_VALIDATE_IGNORE_END
+add_library(quickstep_storage_CollisionFreeVectorTable
+ CollisionFreeVectorTable.cpp
+ CollisionFreeVectorTable.hpp)
add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp)
add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp)
add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -194,9 +197,6 @@ if (ENABLE_DISTRIBUTED)
endif()
add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
-add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp)
-add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp)
-add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp)
add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
@@ -226,6 +226,7 @@ add_library(quickstep_storage_InsertDestination_proto
add_library(quickstep_storage_LinearOpenAddressingHashTable
../empty_src.cpp
LinearOpenAddressingHashTable.hpp)
+add_library(quickstep_storage_PackedPayloadHashTable PackedPayloadHashTable.cpp PackedPayloadHashTable.hpp)
add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
@@ -253,6 +254,7 @@ add_library(quickstep_storage_TupleIdSequence ../empty_src.cpp TupleIdSequence.h
add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp)
add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
add_library(quickstep_storage_ValueAccessor ../empty_src.cpp ValueAccessor.hpp)
+add_library(quickstep_storage_ValueAccessorMultiplexer ../empty_src.cpp ValueAccessorMultiplexer.hpp)
add_library(quickstep_storage_ValueAccessorUtil ../empty_src.cpp ValueAccessorUtil.hpp)
add_library(quickstep_storage_WindowAggregationOperationState
WindowAggregationOperationState.hpp
@@ -272,22 +274,25 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_expressions_aggregation_AggregateFunction
quickstep_expressions_aggregation_AggregateFunctionFactory
quickstep_expressions_aggregation_AggregationHandle
- quickstep_expressions_aggregation_AggregationHandleDistinct
- quickstep_expressions_aggregation_AggregationID
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_storage_AggregationOperationState_proto
- quickstep_storage_HashTable
+ quickstep_storage_CollisionFreeVectorTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
quickstep_storage_HashTablePool
quickstep_storage_InsertDestination
quickstep_storage_PartitionedHashTablePool
+ quickstep_storage_PackedPayloadHashTable
quickstep_storage_StorageBlock
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
+ quickstep_storage_SubBlocksReference
quickstep_storage_TupleIdSequence
+ quickstep_storage_TupleStorageSubBlock
quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorMultiplexer
+ quickstep_storage_ValueAccessorUtil
quickstep_types_TypedValue
quickstep_types_containers_ColumnVector
quickstep_types_containers_ColumnVectorsValueAccessor
@@ -430,6 +435,24 @@ if(QUICKSTEP_HAVE_BITWEAVING)
quickstep_utility_Macros)
endif()
# CMAKE_VALIDATE_IGNORE_END
+target_link_libraries(quickstep_storage_CollisionFreeVectorTable
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_aggregation_AggregationHandle
+ quickstep_expressions_aggregation_AggregationID
+ quickstep_storage_HashTableBase
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorMultiplexer
+ quickstep_storage_ValueAccessorUtil
+ quickstep_types_Type
+ quickstep_types_TypeID
+ quickstep_types_containers_ColumnVector
+ quickstep_types_containers_ColumnVectorsValueAccessor
+ quickstep_utility_BarrieredReadWriteConcurrentBitVector
+ quickstep_utility_Macros)
target_link_libraries(quickstep_storage_ColumnStoreUtil
quickstep_catalog_CatalogAttribute
quickstep_catalog_CatalogRelationSchema
@@ -627,52 +650,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy
quickstep_threading_SpinMutex
quickstep_threading_SpinSharedMutex
quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTable
- quickstep_catalog_CatalogTypedefs
- quickstep_storage_HashTableBase
- quickstep_storage_StorageBlob
- quickstep_storage_StorageBlockInfo
- quickstep_storage_StorageConstants
- quickstep_storage_StorageManager
- quickstep_storage_TupleReference
- quickstep_storage_ValueAccessor
- quickstep_storage_ValueAccessorUtil
- quickstep_threading_SpinMutex
- quickstep_threading_SpinSharedMutex
- quickstep_types_Type
- quickstep_types_TypedValue
- quickstep_utility_HashPair
- quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTableFactory
- glog
- quickstep_storage_FastHashTable
- quickstep_storage_FastSeparateChainingHashTable
- quickstep_storage_HashTable
- quickstep_storage_HashTable_proto
- quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
- quickstep_storage_LinearOpenAddressingHashTable
- quickstep_storage_SeparateChainingHashTable
- quickstep_storage_SimpleScalarSeparateChainingHashTable
- quickstep_storage_TupleReference
- quickstep_types_TypeFactory
- quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
- quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
- quickstep_storage_HashTableBase
- quickstep_storage_HashTableKeyManager
- quickstep_storage_StorageBlob
- quickstep_storage_StorageBlockInfo
- quickstep_storage_StorageConstants
- quickstep_storage_StorageManager
- quickstep_threading_SpinSharedMutex
- quickstep_types_Type
- quickstep_types_TypedValue
- quickstep_utility_Alignment
- quickstep_utility_Macros
- quickstep_utility_PrimeNumber)
target_link_libraries(quickstep_storage_FileManager
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
@@ -731,16 +708,19 @@ target_link_libraries(quickstep_storage_HashTable
quickstep_utility_HashPair
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_HashTableBase
+ quickstep_storage_ValueAccessorMultiplexer
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_HashTable_proto
quickstep_types_Type_proto
${PROTOBUF_LIBRARY})
target_link_libraries(quickstep_storage_HashTableFactory
glog
+ quickstep_storage_CollisionFreeVectorTable
quickstep_storage_HashTable
quickstep_storage_HashTable_proto
quickstep_storage_HashTableBase
quickstep_storage_LinearOpenAddressingHashTable
+ quickstep_storage_PackedPayloadHashTable
quickstep_storage_SeparateChainingHashTable
quickstep_storage_SimpleScalarSeparateChainingHashTable
quickstep_storage_TupleReference
@@ -759,13 +739,10 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_HashTablePool
glog
- quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_FastHashTableFactory
quickstep_storage_HashTableBase
+ quickstep_storage_HashTableFactory
quickstep_threading_SpinMutex
- quickstep_utility_Macros
- quickstep_utility_StringUtil)
+ quickstep_utility_Macros)
target_link_libraries(quickstep_storage_IndexSubBlock
quickstep_catalog_CatalogTypedefs
quickstep_expressions_predicate_PredicateCost
@@ -820,14 +797,32 @@ target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable
quickstep_utility_Alignment
quickstep_utility_Macros
quickstep_utility_PrimeNumber)
-target_link_libraries(quickstep_storage_PartitionedHashTablePool
- glog
+target_link_libraries(quickstep_storage_PackedPayloadHashTable
+ quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_FastHashTableFactory
quickstep_storage_HashTableBase
+ quickstep_storage_HashTableKeyManager
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorMultiplexer
+ quickstep_storage_ValueAccessorUtil
+ quickstep_threading_SpinMutex
+ quickstep_threading_SpinSharedMutex
+ quickstep_types_Type
+ quickstep_types_TypedValue
+ quickstep_types_containers_ColumnVectorsValueAccessor
+ quickstep_utility_Alignment
+ quickstep_utility_HashPair
quickstep_utility_Macros
- quickstep_utility_StringUtil)
+ quickstep_utility_PrimeNumber)
+target_link_libraries(quickstep_storage_PartitionedHashTablePool
+ glog
+ quickstep_storage_HashTableBase
+ quickstep_storage_HashTableFactory
+ quickstep_utility_Macros)
target_link_libraries(quickstep_storage_PreloaderThread
glog
quickstep_catalog_CatalogDatabase
@@ -936,7 +931,6 @@ target_link_libraries(quickstep_storage_StorageBlock
glog
quickstep_catalog_CatalogRelationSchema
quickstep_catalog_CatalogTypedefs
- quickstep_expressions_aggregation_AggregationHandle
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_storage_BasicColumnStoreTupleStorageSubBlock
@@ -945,7 +939,6 @@ target_link_libraries(quickstep_storage_StorageBlock
quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
quickstep_storage_CompressedPackedRowStoreTupleStorageSubBlock
quickstep_storage_CountedReference
- quickstep_storage_HashTableBase
quickstep_storage_IndexSubBlock
quickstep_storage_InsertDestinationInterface
quickstep_storage_SMAIndexSubBlock
@@ -1068,6 +1061,10 @@ target_link_libraries(quickstep_storage_ValueAccessor
quickstep_types_TypedValue
quickstep_types_containers_Tuple
quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_ValueAccessorMultiplexer
+ glog
+ quickstep_catalog_CatalogTypedefs
+ quickstep_utility_Macros)
target_link_libraries(quickstep_storage_ValueAccessorUtil
glog
quickstep_storage_BasicColumnStoreValueAccessor
@@ -1115,6 +1112,7 @@ target_link_libraries(quickstep_storage
quickstep_storage_BasicColumnStoreValueAccessor
quickstep_storage_BloomFilterIndexSubBlock
quickstep_storage_CSBTreeIndexSubBlock
+ quickstep_storage_CollisionFreeVectorTable
quickstep_storage_ColumnStoreUtil
quickstep_storage_CompressedBlockBuilder
quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -1125,9 +1123,6 @@ target_link_libraries(quickstep_storage
quickstep_storage_CompressedTupleStorageSubBlock
quickstep_storage_CountedReference
quickstep_storage_EvictionPolicy
- quickstep_storage_FastHashTable
- quickstep_storage_FastHashTableFactory
- quickstep_storage_FastSeparateChainingHashTable
quickstep_storage_FileManager
quickstep_storage_FileManagerLocal
quickstep_storage_Flags
@@ -1144,6 +1139,7 @@ target_link_libraries(quickstep_storage
quickstep_storage_InsertDestination_proto
quickstep_storage_LinearOpenAddressingHashTable
quickstep_storage_PartitionedHashTablePool
+ quickstep_storage_PackedPayloadHashTable
quickstep_storage_PreloaderThread
quickstep_storage_SMAIndexSubBlock
quickstep_storage_SeparateChainingHashTable
@@ -1166,6 +1162,7 @@ target_link_libraries(quickstep_storage
quickstep_storage_TupleReference
quickstep_storage_TupleStorageSubBlock
quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorMultiplexer
quickstep_storage_ValueAccessorUtil
quickstep_storage_WindowAggregationOperationState
quickstep_storage_WindowAggregationOperationState_proto)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/CollisionFreeVectorTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
new file mode 100644
index 0000000..d836014
--- /dev/null
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/CollisionFreeVectorTable.hpp"
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+CollisionFreeVectorTable::CollisionFreeVectorTable(
+ const Type *key_type,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager)
+ : key_type_(key_type),
+ num_entries_(num_entries),
+ num_handles_(handles.size()),
+ handles_(handles),
+ num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)),
+ storage_manager_(storage_manager) {
+ DCHECK_GT(num_entries, 0u);
+
+ std::size_t required_memory = 0;
+ const std::size_t existence_map_offset = 0;
+ std::vector<std::size_t> state_offsets;
+
+ required_memory += CacheLineAlignedBytes(
+ BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ const AggregationHandle *handle = handles_[i];
+ const std::vector<const Type *> argument_types = handle->getArgumentTypes();
+
+ std::size_t state_size = 0;
+ switch (handle->getAggregationID()) {
+ case AggregationID::kCount: {
+ state_size = sizeof(std::atomic<std::size_t>);
+ break;
+ }
+ case AggregationID::kSum: {
+ DCHECK_EQ(1u, argument_types.size());
+ switch (argument_types.front()->getTypeID()) {
+ case TypeID::kInt: // Fall through
+ case TypeID::kLong:
+ state_size = sizeof(std::atomic<std::int64_t>);
+ break;
+ case TypeID::kFloat: // Fall through
+ case TypeID::kDouble:
+ state_size = sizeof(std::atomic<double>);
+ break;
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+
+ state_offsets.emplace_back(required_memory);
+ required_memory += CacheLineAlignedBytes(state_size * num_entries);
+ }
+
+ const std::size_t num_storage_slots =
+ storage_manager_->SlotsNeededForBytes(required_memory);
+
+ const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
+ blob_ = storage_manager_->getBlobMutable(blob_id);
+
+ void *memory_start = blob_->getMemoryMutable();
+ existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
+ reinterpret_cast<char *>(memory_start) + existence_map_offset,
+ num_entries,
+ false /* initialize */));
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ // Columnwise layout.
+ vec_tables_.emplace_back(
+ reinterpret_cast<char *>(memory_start) + state_offsets.at(i));
+ }
+
+ memory_size_ = required_memory;
+ num_init_partitions_ = CalculateNumInitializationPartitions(memory_size_);
+}
+
+CollisionFreeVectorTable::~CollisionFreeVectorTable() {
+ const block_id blob_id = blob_->getID();
+ blob_.release();
+ storage_manager_->deleteBlockOrBlobFile(blob_id);
+}
+
+void CollisionFreeVectorTable::destroyPayload() {
+}
+
+bool CollisionFreeVectorTable::upsertValueAccessorCompositeKey(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ const ValueAccessorMultiplexer &accessor_mux) {
+ DCHECK_EQ(1u, key_ids.size());
+
+ if (handles_.empty()) {
+ InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+ accessor_mux.getValueAccessorBySource(key_ids.front().source),
+ [&key_ids, this](auto *accessor) -> void { // NOLINT(build/c++11)
+ this->upsertValueAccessorKeyOnlyHelper(key_type_->isNullable(),
+ key_type_,
+ key_ids.front().attr_id,
+ accessor);
+ });
+ return true;
+ }
+
+ DCHECK(accessor_mux.getDerivedAccessor() == nullptr ||
+ accessor_mux.getDerivedAccessor()->getImplementationType()
+ == ValueAccessor::Implementation::kColumnVectors);
+
+ ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+ ColumnVectorsValueAccessor *derived_accesor =
+ static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
+
+ // Dispatch to specialized implementations to achieve maximum performance.
+ InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+ base_accessor,
+ [&argument_ids, &key_ids, &derived_accesor, this](auto *accessor) -> void { // NOLINT(build/c++11)
+ const ValueAccessorSource key_source = key_ids.front().source;
+ const attribute_id key_id = key_ids.front().attr_id;
+ const bool is_key_nullable = key_type_->isNullable();
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ DCHECK_LE(argument_ids[i].size(), 1u);
+
+ const AggregationHandle *handle = handles_[i];
+ const auto &argument_types = handle->getArgumentTypes();
+ const auto &argument_ids_i = argument_ids[i];
+
+ ValueAccessorSource argument_source;
+ attribute_id argument_id;
+ const Type *argument_type;
+ bool is_argument_nullable;
+
+ if (argument_ids_i.empty()) {
+ argument_source = ValueAccessorSource::kInvalid;
+ argument_id = kInvalidAttributeID;
+
+ DCHECK(argument_types.empty());
+ argument_type = nullptr;
+ is_argument_nullable = false;
+ } else {
+ DCHECK_EQ(1u, argument_ids_i.size());
+ argument_source = argument_ids_i.front().source;
+ argument_id = argument_ids_i.front().attr_id;
+
+ DCHECK_EQ(1u, argument_types.size());
+ argument_type = argument_types.front();
+ is_argument_nullable = argument_type->isNullable();
+ }
+
+ if (key_source == ValueAccessorSource::kBase) {
+ if (argument_source == ValueAccessorSource::kBase) {
+ this->upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ key_id,
+ argument_id,
+ vec_tables_[i],
+ accessor,
+ accessor);
+ } else {
+ this->upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ key_id,
+ argument_id,
+ vec_tables_[i],
+ accessor,
+ derived_accesor);
+ }
+ } else {
+ if (argument_source == ValueAccessorSource::kBase) {
+ this->upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ key_id,
+ argument_id,
+ vec_tables_[i],
+ derived_accesor,
+ accessor);
+ } else {
+ this->upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ key_id,
+ argument_id,
+ vec_tables_[i],
+ derived_accesor,
+ derived_accesor);
+ }
+ }
+ }
+ });
+ return true;
+}
+
+void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id,
+ NativeColumnVector *output_cv) const {
+ const std::size_t start_position =
+ calculatePartitionStartPosition(partition_id);
+ const std::size_t end_position =
+ calculatePartitionEndPosition(partition_id);
+
+ switch (key_type_->getTypeID()) {
+ case TypeID::kInt:
+ finalizeKeyInternal<int>(start_position, end_position, output_cv);
+ return;
+ case TypeID::kLong:
+ finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
+ return;
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id,
+ const std::size_t handle_id,
+ NativeColumnVector *output_cv) const {
+ const std::size_t start_position =
+ calculatePartitionStartPosition(partition_id);
+ const std::size_t end_position =
+ calculatePartitionEndPosition(partition_id);
+
+ const AggregationHandle *handle = handles_[handle_id];
+ const auto &argument_types = handle->getArgumentTypes();
+ const Type *argument_type =
+ argument_types.empty() ? nullptr : argument_types.front();
+
+ finalizeStateDispatchHelper(handle->getAggregationID(),
+ argument_type,
+ vec_tables_[handle_id],
+ start_position,
+ end_position,
+ output_cv);
+}
+
+} // namespace quickstep