You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/31 23:38:13 UTC
[7/8] incubator-quickstep git commit: Initial commit.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 642d88d..00b229e 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -25,8 +25,8 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypeFactory.hpp"
@@ -43,7 +43,8 @@ namespace quickstep {
class StorageManager;
AggregationHandleSum::AggregationHandleSum(const Type &type)
- : argument_type_(type), block_update_(false) {
+ : AggregationConcreteHandle(AggregationID::kSum),
+ argument_type_(type) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_precision_id;
@@ -79,47 +80,26 @@ AggregationHandleSum::AggregationHandleSum(const Type &type)
result_type_ = &sum_type.getNullableVersion();
}
-AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return AggregationStateHashTableFactory<AggregationStateSum>::CreateResizable(
- hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
+AggregationState* AggregationHandleSum::accumulate(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const {
+ DCHECK_EQ(1u, argument_ids.size())
+ << "Got wrong number of attributes for SUM: " << argument_ids.size();
-AggregationState* AggregationHandleSum::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size();
- std::size_t num_tuples = 0;
- TypedValue cv_sum = fast_operator_->accumulateColumnVector(
- blank_state_.sum_, *column_vectors.front(), &num_tuples);
- return new AggregationStateSum(std::move(cv_sum), num_tuples == 0);
-}
+ const attribute_id argument_id = argument_ids.front();
+ DCHECK_NE(argument_id, kInvalidAttributeID);
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleSum::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for SUM: " << accessor_ids.size();
+ ValueAccessor *target_accessor =
+ argument_id >= 0 ? accessor : aux_accessor;
+ const attribute_id target_argument_id =
+ argument_id >= 0 ? argument_id : -(argument_id+2);
std::size_t num_tuples = 0;
TypedValue va_sum = fast_operator_->accumulateValueAccessor(
- blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples);
+ blank_state_.sum_, target_accessor, target_argument_id, &num_tuples);
return new AggregationStateSum(std::move(va_sum), num_tuples == 0);
}
-#endif
-
-void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- DCHECK_EQ(1u, argument_ids.size())
- << "Got wrong number of arguments for SUM: " << argument_ids.size();
-}
void AggregationHandleSum::mergeStates(const AggregationState &source,
AggregationState *destination) const {
@@ -134,8 +114,8 @@ void AggregationHandleSum::mergeStates(const AggregationState &source,
sum_destination->null_ = sum_destination->null_ && sum_source.null_;
}
-void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const {
+void AggregationHandleSum::mergeStates(const std::uint8_t *source,
+ std::uint8_t *destination) const {
const TypedValue *src_sum_ptr =
reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
const bool *src_null_ptr =
@@ -164,27 +144,10 @@ ColumnVector* AggregationHandleSum::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
std::vector<std::vector<TypedValue>> *group_by_keys,
int index) const {
- return finalizeHashTableHelperFast<AggregationHandleSum,
- AggregationStateFastHashTable>(
- *result_type_, hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
- AggregationHandleSum,
- AggregationStateSum>(distinctify_hash_table);
-}
-
-void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+ return finalizeHashTableHelper<
AggregationHandleSum,
- AggregationStateFastHashTable>(
- distinctify_hash_table, aggregation_hash_table, index);
+ PackedPayloadSeparateChainingAggregationStateHashTable>(
+ *result_type_, hash_table, group_by_keys, index);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index f0d23e1..9fb7706 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -28,7 +28,6 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
#include "storage/HashTableBase.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
@@ -41,6 +40,7 @@
namespace quickstep {
class ColumnVector;
+class ColumnVectorsValueAccessor;
class StorageManager;
class ValueAccessor;
@@ -101,16 +101,18 @@ class AggregationHandleSum : public AggregationConcreteHandle {
public:
~AggregationHandleSum() override {}
+ std::vector<const Type *> getArgumentTypes() const override {
+ return {&argument_type_};
+ }
+
+ const Type* getResultType() const override {
+ return result_type_;
+ }
+
AggregationState* createInitialState() const override {
return new AggregationStateSum(blank_state_);
}
- AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const override;
-
inline void iterateUnaryInl(AggregationStateSum *state,
const TypedValue &value) const {
DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
@@ -121,28 +123,19 @@ class AggregationHandleSum : public AggregationConcreteHandle {
state->null_ = false;
}
- inline void iterateUnaryInlFast(const TypedValue &value,
- std::uint8_t *byte_ptr) const {
- DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
- if (value.isNull()) return;
- TypedValue *sum_ptr =
- reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
- bool *null_ptr =
- reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
- *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value);
- *null_ptr = false;
- }
+ AggregationState* accumulate(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const override;
- inline void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const override {
- if (!block_update_) {
- iterateUnaryInlFast(argument, byte_ptr);
- }
- }
+ void mergeStates(const AggregationState &source,
+ AggregationState *destination) const override;
- void blockUpdate() override { block_update_ = true; }
+ TypedValue finalize(const AggregationState &state) const override;
- void allowUpdate() override { block_update_ = false; }
+ std::size_t getPayloadSize() const override {
+ return blank_state_.getPayloadSize();
+ }
void initPayload(std::uint8_t *byte_ptr) const override {
TypedValue *sum_ptr =
@@ -161,41 +154,23 @@ class AggregationHandleSum : public AggregationConcreteHandle {
}
}
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_id) const override;
-#endif
-
- void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const override;
-
- void mergeStates(const AggregationState &source,
- AggregationState *destination) const override;
-
- void mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const override;
-
- TypedValue finalize(const AggregationState &state) const override;
-
- inline TypedValue finalizeHashTableEntry(
- const AggregationState &state) const {
- return static_cast<const AggregationStateSum &>(state).sum_;
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
+ DCHECK(argument.isPlausibleInstanceOf(argument_type_.getSignature()));
+ if (argument.isNull()) return;
+ TypedValue *sum_ptr =
+ reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+ bool *null_ptr =
+ reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
+ *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, argument);
+ *null_ptr = false;
}
- inline TypedValue finalizeHashTableEntryFast(
- const std::uint8_t *byte_ptr) const {
- std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
- TypedValue *sum_ptr =
- reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_);
- return *sum_ptr;
+ void mergeStates(const std::uint8_t *source,
+ std::uint8_t *destination) const override;
+
+ inline TypedValue finalizeHashTableEntry(const std::uint8_t *byte_ptr) const {
+ return *reinterpret_cast<const TypedValue *>(byte_ptr + blank_state_.sum_offset_);
}
ColumnVector* finalizeHashTable(
@@ -203,29 +178,6 @@ class AggregationHandleSum : public AggregationConcreteHandle {
std::vector<std::vector<TypedValue>> *group_by_keys,
int index) const override;
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
- * for SUM aggregation.
- */
- AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table)
- const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
- * for SUM aggregation.
- */
- void aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const override;
-
- std::size_t getPayloadSize() const override {
- return blank_state_.getPayloadSize();
- }
-
private:
friend class AggregateFunctionSum;
@@ -242,8 +194,6 @@ class AggregationHandleSum : public AggregationConcreteHandle {
std::unique_ptr<UncheckedBinaryOperator> fast_operator_;
std::unique_ptr<UncheckedBinaryOperator> merge_operator_;
- bool block_update_;
-
DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/expressions/aggregation/AggregationID.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationID.hpp b/expressions/aggregation/AggregationID.hpp
index 1efb35c..cd18d47 100644
--- a/expressions/aggregation/AggregationID.hpp
+++ b/expressions/aggregation/AggregationID.hpp
@@ -32,9 +32,11 @@ namespace quickstep {
enum class AggregationID {
kAvg = 0,
kCount,
+ kDistinct,
kMax,
kMin,
- kSum
+ kSum,
+ kUnknown
};
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index e9503f7..bd239d4 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -146,10 +146,8 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
glog
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
quickstep_threading_SpinMutex
quickstep_types_TypedValue
quickstep_types_containers_ColumnVector
@@ -157,6 +155,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
target_link_libraries(quickstep_expressions_aggregation_AggregationHandle
glog
quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
quickstep_types_TypedValue
quickstep_utility_Macros)
@@ -165,10 +164,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypeFactory
@@ -183,12 +181,12 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_storage_ValueAccessor
quickstep_storage_ValueAccessorUtil
+ quickstep_types_LongType
quickstep_types_TypeFactory
quickstep_types_TypeID
quickstep_types_TypedValue
@@ -199,8 +197,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleDistinc
glog
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_types_TypedValue
quickstep_utility_Macros)
target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
@@ -208,10 +207,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypedValue
@@ -225,10 +223,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypedValue
@@ -242,10 +239,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypeFactory
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 895c2ea..ed0f99c 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -200,20 +200,6 @@ class QueryContext {
}
/**
- * @brief Destroy the payloads from the aggregation hash tables.
- *
- * @warning After calling these methods, the hash table will be in an invalid
- * state. No other operation should be performed on them.
- *
- * @param id The ID of the AggregationOperationState.
- **/
- inline void destroyAggregationHashTablePayload(const aggregation_state_id id) {
- DCHECK_LT(id, aggregation_states_.size());
- DCHECK(aggregation_states_[id]);
- aggregation_states_[id]->destroyAggregationHashTablePayload();
- }
-
- /**
* @brief Whether the given GeneratorFunctionHandle id is valid.
*
* @param id The GeneratorFunctionHandle id.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7f90e11..7f75264 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -64,6 +64,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_expressions_Expressions_proto
quickstep_expressions_aggregation_AggregateFunction
quickstep_expressions_aggregation_AggregateFunction_proto
+ quickstep_expressions_aggregation_AggregationID
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
@@ -125,6 +126,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_relationaloperators_DropTableOperator
quickstep_relationaloperators_FinalizeAggregationOperator
quickstep_relationaloperators_HashJoinOperator
+ quickstep_relationaloperators_InitializeAggregationStateOperator
quickstep_relationaloperators_InsertOperator
quickstep_relationaloperators_NestedLoopsJoinOperator
quickstep_relationaloperators_RelationalOperator
@@ -145,6 +147,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_storage_StorageBlockLayout_proto
quickstep_storage_SubBlockTypeRegistry
quickstep_types_Type
+ quickstep_types_TypeID
quickstep_types_Type_proto
quickstep_types_TypedValue
quickstep_types_TypedValue_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index ce1452e..6694001 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -49,6 +49,7 @@
#include "expressions/Expressions.pb.h"
#include "expressions/aggregation/AggregateFunction.hpp"
#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
@@ -105,6 +106,7 @@
#include "relational_operators/DropTableOperator.hpp"
#include "relational_operators/FinalizeAggregationOperator.hpp"
#include "relational_operators/HashJoinOperator.hpp"
+#include "relational_operators/InitializeAggregationStateOperator.hpp"
#include "relational_operators/InsertOperator.hpp"
#include "relational_operators/NestedLoopsJoinOperator.hpp"
#include "relational_operators/RelationalOperator.hpp"
@@ -126,6 +128,7 @@
#include "storage/SubBlockTypeRegistry.hpp"
#include "types/Type.hpp"
#include "types/Type.pb.h"
+#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
#include "types/TypedValue.pb.h"
#include "types/containers/Tuple.pb.h"
@@ -371,6 +374,91 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
}
}
+bool ExecutionGenerator::canUseCollisionFreeAggregation(
+ const P::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups,
+ std::size_t *exact_num_groups) const {
+ if (aggregate->grouping_expressions().size() != 1) {
+ return false;
+ }
+
+ E::AttributeReferencePtr group_by_key_attr;
+ const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
+ if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
+ return false;
+ }
+
+ bool min_value_stat_is_exact;
+ bool max_value_stat_is_exact;
+ const TypedValue min_value =
+ cost_model_for_aggregation_->findMinValueStat(
+ aggregate, group_by_key_attr, &min_value_stat_is_exact);
+ const TypedValue max_value =
+ cost_model_for_aggregation_->findMaxValueStat(
+ aggregate, group_by_key_attr, &max_value_stat_is_exact);
+ if (min_value.isNull() || max_value.isNull() ||
+ (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
+ return false;
+ }
+
+ std::int64_t min_cpp_value;
+ std::int64_t max_cpp_value;
+ switch (group_by_key_attr->getValueType().getTypeID()) {
+ case TypeID::kInt: {
+ min_cpp_value = min_value.getLiteral<int>();
+ max_cpp_value = max_value.getLiteral<int>();
+ break;
+ }
+ case TypeID::kLong: {
+ min_cpp_value = min_value.getLiteral<std::int64_t>();
+ max_cpp_value = max_value.getLiteral<std::int64_t>();
+ break;
+ }
+ default:
+ return false;
+ }
+
+ // TODO
+ if (min_cpp_value < 0 ||
+ max_cpp_value > 1000000000 ||
+ max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+ return false;
+ }
+
+
+ for (const auto &agg_expr : aggregate->aggregate_expressions()) {
+ const E::AggregateFunctionPtr agg_func =
+ std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+ switch (agg_func->getAggregate().getAggregationID()) {
+ case AggregationID::kCount: // Fall through
+ case AggregationID::kSum:
+ break;
+ default:
+ return false;
+ }
+
+ const auto &arguments = agg_func->getArguments();
+ if (arguments.size() > 1) {
+ return false;
+ }
+
+ if (arguments.size() == 1) {
+ switch (arguments.front()->getValueType().getTypeID()) {
+ case TypeID::kInt: // Fall through
+ case TypeID::kLong:
+ case TypeID::kFloat:
+ case TypeID::kDouble:
+ break;
+ default:
+ return false;
+ }
+ }
+ }
+
+ *exact_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+ return true;
+}
+
void ExecutionGenerator::convertNamedExpressions(
const std::vector<E::NamedExpressionPtr> &named_expressions,
S::QueryContext::ScalarGroup *scalar_group_proto) {
@@ -1454,6 +1542,8 @@ void ExecutionGenerator::convertAggregate(
findRelationInfoOutputByPhysical(physical_plan->input());
aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+ bool use_parallel_initialization = false;
+
std::vector<const Type*> group_by_types;
for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
unique_ptr<const Scalar> execution_group_by_expression;
@@ -1474,9 +1564,34 @@ void ExecutionGenerator::convertAggregate(
}
if (!group_by_types.empty()) {
- // Right now, only SeparateChaining is supported.
- aggr_state_proto->set_hash_table_impl_type(
- serialization::HashTableImplType::SEPARATE_CHAINING);
+ const std::size_t estimated_num_groups =
+ cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
+
+ std::size_t exact_num_groups;
+ const bool can_use_collision_free_aggregation =
+ canUseCollisionFreeAggregation(physical_plan,
+ estimated_num_groups,
+ &exact_num_groups);
+
+ if (can_use_collision_free_aggregation) {
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::COLLISION_FREE_VECTOR);
+ std::cout << "Use collision free aggregation!\n"
+ << "Size = " << exact_num_groups << "\n";
+
+ aggr_state_proto->set_estimated_num_entries(exact_num_groups);
+ use_parallel_initialization = true;
+ } else {
+ // Otherwise, use SeparateChaining.
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::SEPARATE_CHAINING);
+ std::cout << "Use normal aggregation\n"
+ << "Size = " << estimated_num_groups << "\n";
+
+ aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
+ }
+ } else {
+ aggr_state_proto->set_estimated_num_entries(1uL);
}
for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
@@ -1514,10 +1629,6 @@ void ExecutionGenerator::convertAggregate(
aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
}
- const std::size_t estimated_num_groups =
- cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
- aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
-
const QueryPlan::DAGNodeIndex aggregation_operator_index =
execution_plan_->addRelationalOperator(
new AggregationOperator(
@@ -1532,6 +1643,18 @@ void ExecutionGenerator::convertAggregate(
false /* is_pipeline_breaker */);
}
+ if (use_parallel_initialization) {
+ const QueryPlan::DAGNodeIndex initialize_aggregation_state_operator_index =
+ execution_plan_->addRelationalOperator(
+ new InitializeAggregationStateOperator(
+ query_handle_->query_id(),
+ aggr_state_index));
+
+ execution_plan_->addDirectDependency(aggregation_operator_index,
+ initialize_aggregation_state_operator_index,
+ true);
+ }
+
// Create InsertDestination proto.
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index eba6eee..b52fe97 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -20,6 +20,7 @@
#ifndef QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_
#define QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_
+#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
@@ -37,6 +38,7 @@
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryPlan.hpp"
#include "query_optimizer/cost_model/CostModel.hpp"
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/NamedExpression.hpp"
#include "query_optimizer/expressions/Predicate.hpp"
@@ -203,6 +205,10 @@ class ExecutionGenerator {
*/
std::string getNewRelationName();
+ bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups,
+ std::size_t *exact_num_groups) const;
+
/**
* @brief Sets up the info of the CatalogRelation represented by TableReference.
* TableReference is not converted to any operator.
@@ -427,7 +433,7 @@ class ExecutionGenerator {
/**
* @brief The cost model to use for estimating aggregation hash table size.
*/
- std::unique_ptr<cost::CostModel> cost_model_for_aggregation_;
+ std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_for_aggregation_;
/**
* @brief The cost model to use for estimating join hash table size.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index c18dc77..bd20059 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -47,6 +47,9 @@ add_library(quickstep_relationaloperators_FinalizeAggregationOperator
FinalizeAggregationOperator.cpp
FinalizeAggregationOperator.hpp)
add_library(quickstep_relationaloperators_HashJoinOperator HashJoinOperator.cpp HashJoinOperator.hpp)
+add_library(quickstep_relationaloperators_InitializeAggregationStateOperator
+ InitializeAggregationStateOperator.cpp
+ InitializeAggregationStateOperator.hpp)
add_library(quickstep_relationaloperators_InsertOperator InsertOperator.cpp InsertOperator.hpp)
add_library(quickstep_relationaloperators_NestedLoopsJoinOperator
NestedLoopsJoinOperator.cpp
@@ -254,6 +257,17 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
quickstep_utility_lipfilter_LIPFilterAdaptiveProber
quickstep_utility_lipfilter_LIPFilterUtil
tmb)
+target_link_libraries(quickstep_relationaloperators_InitializeAggregationStateOperator
+ glog
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_WorkOrderProtosContainer
+ quickstep_queryexecution_WorkOrdersContainer
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder
+ quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_AggregationOperationState
+ quickstep_utility_Macros
+ tmb)
target_link_libraries(quickstep_relationaloperators_InsertOperator
glog
quickstep_catalog_CatalogRelation
@@ -548,6 +562,7 @@ target_link_libraries(quickstep_relationaloperators
quickstep_relationaloperators_DropTableOperator
quickstep_relationaloperators_FinalizeAggregationOperator
quickstep_relationaloperators_HashJoinOperator
+ quickstep_relationaloperators_InitializeAggregationStateOperator
quickstep_relationaloperators_InsertOperator
quickstep_relationaloperators_NestedLoopsJoinOperator
quickstep_relationaloperators_RebuildWorkOrder
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
index 49be43d..62ca9e7 100644
--- a/relational_operators/DestroyAggregationStateOperator.cpp
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -58,13 +58,6 @@ bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosConta
}
void DestroyAggregationStateWorkOrder::execute() {
- // NOTE(harshad) : The destroyAggregationHashTablePayload call is separate
- // from the destroyAggregationState call. The reason is that the aggregation
- // hash tables don't own the AggregationHandle objects. However the hash table
- // class requires the handles for destroying the payload (see the
- // destroyPayload methods in AggregationHandle classes). Therefore, we first
- // destroy the payloads in the hash table and then destroy the hash table.
- query_context_->destroyAggregationHashTablePayload(aggr_state_index_);
query_context_->destroyAggregationState(aggr_state_index_);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 0cbf635..b66030b 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -44,15 +44,15 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
AggregationOperationState *agg_state =
query_context->getAggregationState(aggr_state_index_);
DCHECK(agg_state != nullptr);
- for (int part_id = 0;
- part_id < static_cast<int>(agg_state->getNumPartitions());
- ++part_id) {
+ for (std::size_t partition_id = 0;
+ partition_id < agg_state->getNumPartitions();
+ ++partition_id) {
container->addNormalWorkOrder(
new FinalizeAggregationWorkOrder(
query_id_,
+ partition_id,
agg_state,
- query_context->getInsertDestination(output_destination_index_),
- part_id),
+ query_context->getInsertDestination(output_destination_index_)),
op_index_);
}
}
@@ -80,11 +80,7 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
}
void FinalizeAggregationWorkOrder::execute() {
- if (state_->isAggregatePartitioned()) {
- state_->finalizeAggregatePartitioned(part_id_, output_destination_);
- } else {
- state_->finalizeAggregate(output_destination_);
- }
+ state_->finalizeAggregate(partition_id_, output_destination_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index ae7127a..3c209b1 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -116,29 +116,29 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
* @note InsertWorkOrder takes ownership of \c state.
*
* @param query_id The ID of the query to which this operator belongs.
+ * @param partition_id The partition ID for which the Finalize aggregation
+ * work order is issued.
* @param state The AggregationState to use.
* @param output_destination The InsertDestination to insert aggregation
* results.
- * @param part_id The partition ID for which the Finalize aggregation work
- * order is issued. Ignore if aggregation is not partitioned.
*/
FinalizeAggregationWorkOrder(const std::size_t query_id,
+ const std::size_t partition_id,
AggregationOperationState *state,
- InsertDestination *output_destination,
- const int part_id = -1)
+ InsertDestination *output_destination)
: WorkOrder(query_id),
+ partition_id_(partition_id),
state_(DCHECK_NOTNULL(state)),
- output_destination_(DCHECK_NOTNULL(output_destination)),
- part_id_(part_id) {}
+ output_destination_(DCHECK_NOTNULL(output_destination)) {}
~FinalizeAggregationWorkOrder() override {}
void execute() override;
private:
+ const std::size_t partition_id_;
AggregationOperationState *state_;
InsertDestination *output_destination_;
- const int part_id_;
DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/InitializeAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationStateOperator.cpp b/relational_operators/InitializeAggregationStateOperator.cpp
new file mode 100644
index 0000000..dfee459
--- /dev/null
+++ b/relational_operators/InitializeAggregationStateOperator.cpp
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/InitializeAggregationStateOperator.hpp"
+
+#include <vector>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool InitializeAggregationStateOperator::getAllWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) {
+ if (!started_) {
+ AggregationOperationState *agg_state =
+ query_context->getAggregationState(aggr_state_index_);
+ DCHECK(agg_state != nullptr);
+
+ for (std::size_t part_id = 0;
+ part_id < agg_state->getNumInitializationPartitions();
+ ++part_id) {
+ container->addNormalWorkOrder(
+ new InitializeAggregationStateWorkOrder(query_id_,
+ part_id,
+ agg_state),
+ op_index_);
+ }
+ started_ = true;
+ }
+ return started_;
+}
+
+bool InitializeAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ // TODO
+ LOG(FATAL) << "Not implemented";
+}
+
+void InitializeAggregationStateWorkOrder::execute() {
+ state_->initializeState(partition_id_);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/InitializeAggregationStateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationStateOperator.hpp b/relational_operators/InitializeAggregationStateOperator.hpp
new file mode 100644
index 0000000..10403b3
--- /dev/null
+++ b/relational_operators/InitializeAggregationStateOperator.hpp
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_
+
+#include <string>
+
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class AggregationOperationState;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ * @{
+ */
+
+class InitializeAggregationStateOperator : public RelationalOperator {
+ public:
+ InitializeAggregationStateOperator(const std::size_t query_id,
+ const QueryContext::aggregation_state_id aggr_state_index)
+ : RelationalOperator(query_id),
+ aggr_state_index_(aggr_state_index),
+ started_(false) {}
+
+ ~InitializeAggregationStateOperator() override {}
+
+ std::string getName() const override {
+ return "InitializeAggregationStateOperator";
+ }
+
+ bool getAllWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) override;
+
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ private:
+ const QueryContext::aggregation_state_id aggr_state_index_;
+ bool started_;
+
+ DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateOperator);
+};
+
+class InitializeAggregationStateWorkOrder : public WorkOrder {
+ public:
+ InitializeAggregationStateWorkOrder(const std::size_t query_id,
+ const std::size_t partition_id,
+ AggregationOperationState *state)
+ : WorkOrder(query_id),
+ partition_id_(partition_id),
+ state_(DCHECK_NOTNULL(state)) {}
+
+ ~InitializeAggregationStateWorkOrder() override {}
+
+ void execute() override;
+
+ private:
+ const std::size_t partition_id_;
+
+ AggregationOperationState *state_;
+
+ DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateWorkOrder);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index b942c1b..5de2653 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -39,15 +39,17 @@
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "storage/AggregationOperationState.pb.h"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableBase.hpp"
#include "storage/HashTableFactory.hpp"
+#include "storage/HashTableBase.hpp"
#include "storage/InsertDestination.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
#include "storage/TupleIdSequence.hpp"
#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
@@ -80,50 +82,63 @@ AggregationOperationState::AggregationOperationState(
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
StorageManager *storage_manager)
: input_relation_(input_relation),
- is_aggregate_partitioned_(checkAggregatePartitioned(
- estimated_num_entries, is_distinct, group_by, aggregate_functions)),
+ is_aggregate_collision_free_(false),
+ is_aggregate_partitioned_(false),
predicate_(predicate),
- group_by_list_(std::move(group_by)),
- arguments_(std::move(arguments)),
is_distinct_(std::move(is_distinct)),
storage_manager_(storage_manager) {
+ if (!group_by.empty()) {
+ if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
+ is_aggregate_collision_free_ = true;
+ } else {
+ is_aggregate_partitioned_ = checkAggregatePartitioned(
+ estimated_num_entries, is_distinct_, group_by, aggregate_functions);
+ }
+ }
+
// Sanity checks: each aggregate has a corresponding list of arguments.
- DCHECK(aggregate_functions.size() == arguments_.size());
+ DCHECK(aggregate_functions.size() == arguments.size());
// Get the types of GROUP BY expressions for creating HashTables below.
- std::vector<const Type *> group_by_types;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- group_by_types.emplace_back(&group_by_element->getType());
+ for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ group_by_types_.emplace_back(&group_by_element->getType());
}
- std::vector<AggregationHandle *> group_by_handles;
- group_by_handles.clear();
+ // Prepare group-by element attribute ids and non-trivial expressions.
+ for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ const attribute_id attr_id =
+ group_by_element->getAttributeIdForValueAccessor();
+ if (attr_id == kInvalidAttributeID) {
+ const attribute_id non_trivial_attr_id =
+ -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
+ non_trivial_expressions_.emplace_back(group_by_element.release());
+ group_by_key_ids_.emplace_back(non_trivial_attr_id);
+ } else {
+ group_by_key_ids_.emplace_back(attr_id);
+ }
+ }
if (aggregate_functions.size() == 0) {
// If there is no aggregation function, then it is a distinctify operation
// on the group-by expressions.
- DCHECK_GT(group_by_list_.size(), 0u);
+ DCHECK_GT(group_by_key_ids_.size(), 0u);
handles_.emplace_back(new AggregationHandleDistinct());
- arguments_.push_back({});
is_distinct_.emplace_back(false);
group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
hash_table_impl_type,
- group_by_types,
- {1},
- handles_,
+ group_by_types_,
+ {handles_.front().get()},
storage_manager));
} else {
+ std::vector<AggregationHandle *> group_by_handles;
+
// Set up each individual aggregate in this operation.
std::vector<const AggregateFunction *>::const_iterator agg_func_it =
aggregate_functions.begin();
- std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
- args_it = arguments_.begin();
+ std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
+ args_it = arguments.begin();
std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
- std::vector<HashTableImplType>::const_iterator
- distinctify_hash_table_impl_types_it =
- distinctify_hash_table_impl_types.begin();
- std::vector<std::size_t> payload_sizes;
for (; agg_func_it != aggregate_functions.end();
++agg_func_it, ++args_it, ++is_distinct_it) {
// Get the Types of this aggregate's arguments so that we can create an
@@ -133,6 +148,22 @@ AggregationOperationState::AggregationOperationState(
argument_types.emplace_back(&argument->getType());
}
+ // Prepare argument attribute ids and non-trivial expressions.
+ std::vector<attribute_id> argument_ids;
+ for (std::unique_ptr<const Scalar> &argument : *args_it) {
+ const attribute_id attr_id =
+ argument->getAttributeIdForValueAccessor();
+ if (attr_id == kInvalidAttributeID) {
+ const attribute_id non_trivial_attr_id =
+ -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
+ non_trivial_expressions_.emplace_back(argument.release());
+ argument_ids.emplace_back(non_trivial_attr_id);
+ } else {
+ argument_ids.emplace_back(attr_id);
+ }
+ }
+ argument_ids_.emplace_back(std::move(argument_ids));
+
// Sanity checks: aggregate function exists and can apply to the specified
// arguments.
DCHECK(*agg_func_it != nullptr);
@@ -142,85 +173,43 @@ AggregationOperationState::AggregationOperationState(
// to do actual aggregate computation.
handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
- if (!group_by_list_.empty()) {
+ if (!group_by_key_ids_.empty()) {
// Aggregation with GROUP BY: combined payload is partially updated in
// the presence of DISTINCT.
if (*is_distinct_it) {
- handles_.back()->blockUpdate();
+ LOG(FATAL) << "Distinct aggregation not supported";
}
- group_by_handles.emplace_back(handles_.back());
- payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
+ group_by_handles.emplace_back(handles_.back().get());
} else {
// Aggregation without GROUP BY: create a single global state.
single_states_.emplace_back(handles_.back()->createInitialState());
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // See if all of this aggregate's arguments are attributes in the input
- // relation. If so, remember the attribute IDs so that we can do copy
- // elision when actually performing the aggregation.
- std::vector<attribute_id> local_arguments_as_attributes;
- local_arguments_as_attributes.reserve(args_it->size());
- for (const std::unique_ptr<const Scalar> &argument : *args_it) {
- const attribute_id argument_id =
- argument->getAttributeIdForValueAccessor();
- if (argument_id == -1) {
- local_arguments_as_attributes.clear();
- break;
- } else {
- DCHECK_EQ(input_relation_.getID(),
- argument->getRelationIdForValueAccessor());
- local_arguments_as_attributes.push_back(argument_id);
- }
- }
-
- arguments_as_attributes_.emplace_back(
- std::move(local_arguments_as_attributes));
-#endif
}
+ }
- // Initialize the corresponding distinctify hash table if this is a
- // DISTINCT aggregation.
- if (*is_distinct_it) {
- std::vector<const Type *> key_types(group_by_types);
- key_types.insert(
- key_types.end(), argument_types.begin(), argument_types.end());
- // TODO(jianqiao): estimated_num_entries is quite inaccurate for
- // estimating the number of entries in the distinctify hash table.
- // We may estimate for each distinct aggregation an
- // estimated_num_distinct_keys value during query optimization, if it's
- // worth.
- distinctify_hashtables_.emplace_back(
- AggregationStateFastHashTableFactory::CreateResizable(
- *distinctify_hash_table_impl_types_it,
- key_types,
+ // Aggregation with GROUP BY: create a HashTable pool.
+ if (!group_by_key_ids_.empty()) {
+ if (is_aggregate_collision_free_) {
+ collision_free_hashtable_.reset(
+ AggregationStateHashTableFactory::CreateResizable(
+ hash_table_impl_type,
+ group_by_types_,
estimated_num_entries,
- {0},
- {},
+ group_by_handles,
storage_manager));
- ++distinctify_hash_table_impl_types_it;
- } else {
- distinctify_hashtables_.emplace_back(nullptr);
- }
- }
-
- if (!group_by_handles.empty()) {
- // Aggregation with GROUP BY: create a HashTable pool.
- if (!is_aggregate_partitioned_) {
- group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- payload_sizes,
- group_by_handles,
- storage_manager));
- } else {
+ } else if (is_aggregate_partitioned_) {
partitioned_group_by_hashtable_pool_.reset(
new PartitionedHashTablePool(estimated_num_entries,
FLAGS_num_aggregation_partitions,
hash_table_impl_type,
- group_by_types,
- payload_sizes,
+ group_by_types_,
group_by_handles,
storage_manager));
+ } else {
+ group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types_,
+ group_by_handles,
+ storage_manager));
}
}
}
@@ -269,7 +258,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
proto.group_by_expressions(group_by_idx), database));
}
- unique_ptr<Predicate> predicate;
+ std::unique_ptr<Predicate> predicate;
if (proto.has_predicate()) {
predicate.reset(
PredicateFactory::ReconstructFromProto(proto.predicate(), database));
@@ -353,33 +342,72 @@ bool AggregationOperationState::ProtoIsValid(
return true;
}
-void AggregationOperationState::aggregateBlock(const block_id input_block,
- LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
- if (group_by_list_.empty()) {
- aggregateBlockSingleState(input_block);
+std::size_t AggregationOperationState::getNumPartitions() const {
+ if (is_aggregate_collision_free_) {
+ return static_cast<CollisionFreeAggregationStateHashTable *>(
+ collision_free_hashtable_.get())->getNumFinalizationPartitions();
+ } else if (is_aggregate_partitioned_) {
+ return partitioned_group_by_hashtable_pool_->getNumPartitions();
+ } else {
+ return 1u;
+ }
+}
+
+std::size_t AggregationOperationState::getNumInitializationPartitions() const {
+ if (is_aggregate_collision_free_) {
+ return static_cast<CollisionFreeAggregationStateHashTable *>(
+ collision_free_hashtable_.get())->getNumInitializationPartitions();
} else {
- aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
+ return 0u;
}
}
-void AggregationOperationState::finalizeAggregate(
- InsertDestination *output_destination) {
- if (group_by_list_.empty()) {
- finalizeSingleState(output_destination);
+void AggregationOperationState::initializeState(const std::size_t partition_id) {
+ if (is_aggregate_collision_free_) {
+ static_cast<CollisionFreeAggregationStateHashTable *>(
+ collision_free_hashtable_.get())->initialize(partition_id);
} else {
- finalizeHashTable(output_destination);
+ LOG(FATAL) << "AggregationOperationState::initializeState() "
+ << "is not supported by this aggregation";
}
}
-void AggregationOperationState::mergeSingleState(
- const std::vector<std::unique_ptr<AggregationState>> &local_state) {
- DEBUG_ASSERT(local_state.size() == single_states_.size());
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (!is_distinct_[agg_idx]) {
- handles_[agg_idx]->mergeStates(*local_state[agg_idx],
- single_states_[agg_idx].get());
+bool AggregationOperationState::checkAggregatePartitioned(
+ const std::size_t estimated_num_groups,
+ const std::vector<bool> &is_distinct,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const std::vector<const AggregateFunction *> &aggregate_functions) const {
+ // If there's no aggregation, return false.
+ if (aggregate_functions.empty()) {
+ return false;
+ }
+ // Check if there's a distinct operation involved in any aggregate, if so
+ // the aggregate can't be partitioned.
+ for (auto distinct : is_distinct) {
+ if (distinct) {
+ return false;
}
}
+ // There's no distinct aggregation involved, Check if there's at least one
+ // GROUP BY operation.
+ if (group_by.empty()) {
+ return false;
+ }
+ // There are GROUP BYs without DISTINCT. Check if the estimated number of
+ // groups is large enough to warrant a partitioned aggregation.
+ return estimated_num_groups >
+ static_cast<std::size_t>(
+ FLAGS_partition_aggregation_num_groups_threshold);
+ return false;
+}
+
+void AggregationOperationState::aggregateBlock(const block_id input_block,
+ LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
+ if (group_by_key_ids_.empty()) {
+ aggregateBlockSingleState(input_block);
+ } else {
+ aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
+ }
}
void AggregationOperationState::aggregateBlockSingleState(
@@ -392,114 +420,137 @@ void AggregationOperationState::aggregateBlockSingleState(
std::unique_ptr<TupleIdSequence> matches;
if (predicate_ != nullptr) {
- std::unique_ptr<ValueAccessor> accessor(
- block->getTupleStorageSubBlock().createValueAccessor());
matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
}
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all arguments are attributes of the input relation, elide a copy.
- if (!arguments_as_attributes_[agg_idx].empty()) {
- local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
+ const auto &tuple_store = block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> accessor(
+ tuple_store.createValueAccessor(matches.get()));
+
+ ColumnVectorsValueAccessor non_trivial_results;
+ if (!non_trivial_expressions_.empty()) {
+ SubBlocksReference sub_blocks_ref(tuple_store,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ for (const auto &expression : non_trivial_expressions_) {
+ non_trivial_results.addColumn(
+ expression->getAllValues(accessor.get(), &sub_blocks_ref));
}
-#endif
- if (is_distinct_[agg_idx]) {
- // Call StorageBlock::aggregateDistinct() to put the arguments as keys
- // directly into the (threadsafe) shared global distinctify HashTable
- // for this aggregate.
- block->aggregateDistinct(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- {}, /* group_by */
- matches.get(),
- distinctify_hashtables_[agg_idx].get(),
- nullptr /* reuse_group_by_vectors */);
- local_state.emplace_back(nullptr);
+ }
+
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+ const auto &argument_ids = argument_ids_[agg_idx];
+ const auto &handle = handles_[agg_idx];
+
+ AggregationState *state;
+ if (argument_ids.empty()) {
+ // Special case. This is a nullary aggregate (i.e. COUNT(*)).
+ state = handle->accumulateNullary(matches == nullptr ? tuple_store.numTuples()
+ : matches->size());
} else {
- // Call StorageBlock::aggregate() to actually do the aggregation.
- local_state.emplace_back(block->aggregate(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- matches.get()));
+ // Have the AggregationHandle actually do the aggregation.
+ state = handle->accumulate(accessor.get(), &non_trivial_results, argument_ids);
}
+ local_state.emplace_back(state);
}
// Merge per-block aggregation states back with global state.
mergeSingleState(local_state);
}
+void AggregationOperationState::mergeSingleState(
+ const std::vector<std::unique_ptr<AggregationState>> &local_state) {
+ DEBUG_ASSERT(local_state.size() == single_states_.size());
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+ if (!is_distinct_[agg_idx]) {
+ handles_[agg_idx]->mergeStates(*local_state[agg_idx],
+ single_states_[agg_idx].get());
+ }
+ }
+}
+
+void AggregationOperationState::mergeGroupByHashTables(
+ AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) const {
+ HashTableMergerFast merger(dst);
+ static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src)
+ ->forEach(&merger);
+}
+
void AggregationOperationState::aggregateBlockHashTable(
const block_id input_block,
LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
BlockReference block(
storage_manager_->getBlock(input_block, input_relation_));
+ const auto &tuple_store = block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor());
+ std::unique_ptr<ValueAccessor> shared_accessor;
+ ValueAccessor *accessor = base_accessor.get();
// Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
// as the existence map for the tuples.
std::unique_ptr<TupleIdSequence> matches;
if (predicate_ != nullptr) {
matches.reset(block->getMatchesForPredicate(predicate_.get()));
+ shared_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+ accessor = shared_accessor.get();
}
if (lip_filter_adaptive_prober != nullptr) {
- std::unique_ptr<ValueAccessor> accessor(
- block->getTupleStorageSubBlock().createValueAccessor(matches.get()));
- matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
+ matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor));
+ shared_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+ accessor = shared_accessor.get();
}
- // This holds values of all the GROUP BY attributes so that the can be reused
- // across multiple aggregates (i.e. we only pay the cost of evaluatin the
- // GROUP BY expressions once).
- std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
-
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (is_distinct_[agg_idx]) {
- // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
- // expression
- // values and the aggregation arguments together as keys directly into the
- // (threadsafe) shared global distinctify HashTable for this aggregate.
- block->aggregateDistinct(*handles_[agg_idx],
- arguments_[agg_idx],
- nullptr, /* arguments_as_attributes */
- group_by_list_,
- matches.get(),
- distinctify_hashtables_[agg_idx].get(),
- &reuse_group_by_vectors);
+ std::unique_ptr<ColumnVectorsValueAccessor> non_trivial_results;
+ if (!non_trivial_expressions_.empty()) {
+ non_trivial_results.reset(new ColumnVectorsValueAccessor());
+ SubBlocksReference sub_blocks_ref(tuple_store,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ for (const auto &expression : non_trivial_expressions_) {
+ non_trivial_results->addColumn(
+ expression->getAllValues(accessor, &sub_blocks_ref));
}
}
- if (!is_aggregate_partitioned_) {
- // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
- // directly into the (threadsafe) shared global HashTable for this
- // aggregate.
- DCHECK(group_by_hashtable_pool_ != nullptr);
- AggregationStateHashTableBase *agg_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- DCHECK(agg_hash_table != nullptr);
- block->aggregateGroupBy(arguments_,
- group_by_list_,
- matches.get(),
- agg_hash_table,
- &reuse_group_by_vectors);
- group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+ accessor->beginIterationVirtual();
+
+ // TODO
+ if (is_aggregate_collision_free_) {
+ aggregateBlockHashTableImplCollisionFree(
+ accessor, non_trivial_results.get());
+ } else if (is_aggregate_partitioned_) {
+ aggregateBlockHashTableImplPartitioned(
+ accessor, non_trivial_results.get());
} else {
- ColumnVectorsValueAccessor temp_result;
- // IDs of 'arguments' as attributes in the ValueAccessor we create below.
- std::vector<attribute_id> argument_ids;
+ aggregateBlockHashTableImplThreadPrivate(
+ accessor, non_trivial_results.get());
+ }
+}
- // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
- std::vector<attribute_id> key_ids;
+void AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor) {
+ DCHECK(collision_free_hashtable_ != nullptr);
+
+ collision_free_hashtable_->upsertValueAccessor(argument_ids_,
+ group_by_key_ids_,
+ accessor,
+ aux_accessor);
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor) {
+ DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
+
+ InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ // TODO(jianqiao): handle the situation when keys in non_trivial_results
const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
- block->aggregateGroupByPartitioned(
- arguments_,
- group_by_list_,
- matches.get(),
- num_partitions,
- &temp_result,
- &argument_ids,
- &key_ids,
- &reuse_group_by_vectors);
+
// Compute the partitions for the tuple formed by group by values.
std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
partition_membership.resize(num_partitions);
@@ -507,32 +558,57 @@ void AggregationOperationState::aggregateBlockHashTable(
// Create a tuple-id sequence for each partition.
for (std::size_t partition = 0; partition < num_partitions; ++partition) {
partition_membership[partition].reset(
- new TupleIdSequence(temp_result.getEndPosition()));
+ new TupleIdSequence(accessor->getEndPosition()));
}
// Iterate over ValueAccessor for each tuple,
// set a bit in the appropriate TupleIdSequence.
- temp_result.beginIteration();
- while (temp_result.next()) {
+ while (accessor->next()) {
// We need a unique_ptr because getTupleWithAttributes() uses "new".
- std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
+ std::unique_ptr<Tuple> curr_tuple(
+ accessor->getTupleWithAttributes(group_by_key_ids_));
const std::size_t curr_tuple_partition_id =
curr_tuple->getTupleHash() % num_partitions;
partition_membership[curr_tuple_partition_id]->set(
- temp_result.getCurrentPosition(), true);
+ accessor->getCurrentPosition(), true);
}
- // For each partition, create an adapter around Value Accessor and
- // TupleIdSequence.
- std::vector<std::unique_ptr<
- TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
- adapter.resize(num_partitions);
+ // Aggregate each partition.
for (std::size_t partition = 0; partition < num_partitions; ++partition) {
- adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
- *(partition_membership)[partition]));
+ std::unique_ptr<ValueAccessor> adapter(
+ accessor->createSharedTupleIdSequenceAdapter(
+ *(partition_membership)[partition]));
partitioned_group_by_hashtable_pool_->getHashTable(partition)
- ->upsertValueAccessorCompositeKeyFast(
- argument_ids, adapter[partition].get(), key_ids, true);
+ ->upsertValueAccessor(argument_ids_,
+ group_by_key_ids_,
+ adapter.get(),
+ aux_accessor);
}
+ });
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor) {
+ DCHECK(group_by_hashtable_pool_ != nullptr);
+
+ AggregationStateHashTableBase *agg_hash_table =
+ group_by_hashtable_pool_->getHashTable();
+
+ agg_hash_table->upsertValueAccessor(argument_ids_,
+ group_by_key_ids_,
+ accessor,
+ aux_accessor);
+ group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+}
+
+void AggregationOperationState::finalizeAggregate(
+ const std::size_t partition_id,
+ InsertDestination *output_destination) {
+ if (group_by_key_ids_.empty()) {
+ DCHECK_EQ(0u, partition_id);
+ finalizeSingleState(output_destination);
+ } else {
+ finalizeHashTable(partition_id, output_destination);
}
}
@@ -543,12 +619,6 @@ void AggregationOperationState::finalizeSingleState(
std::vector<TypedValue> attribute_values;
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (is_distinct_[agg_idx]) {
- single_states_[agg_idx].reset(
- handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(
- *distinctify_hashtables_[agg_idx]));
- }
-
attribute_values.emplace_back(
handles_[agg_idx]->finalize(*single_states_[agg_idx]));
}
@@ -556,80 +626,79 @@ void AggregationOperationState::finalizeSingleState(
output_destination->insertTuple(Tuple(std::move(attribute_values)));
}
-void AggregationOperationState::mergeGroupByHashTables(
- AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
- HashTableMergerFast merger(dst);
- (static_cast<FastHashTable<true, false, true, false> *>(src))
- ->forEachCompositeKeyFast(&merger);
+void AggregationOperationState::finalizeHashTable(
+ const std::size_t partition_id,
+ InsertDestination *output_destination) {
+ if (is_aggregate_collision_free_) {
+ finalizeHashTableImplCollisionFree(partition_id, output_destination);
+ } else if (is_aggregate_partitioned_) {
+ finalizeHashTableImplPartitioned(partition_id, output_destination);
+ } else {
+ DCHECK_EQ(0u, partition_id);
+ finalizeHashTableImplThreadPrivate(output_destination);
+ }
}
-void AggregationOperationState::finalizeHashTable(
+void AggregationOperationState::finalizeHashTableImplCollisionFree(
+ const std::size_t partition_id,
InsertDestination *output_destination) {
- // Each element of 'group_by_keys' is a vector of values for a particular
- // group (which is also the prefix of the finalized Tuple for that group).
- std::vector<std::vector<TypedValue>> group_by_keys;
+ std::vector<std::unique_ptr<ColumnVector>> final_values;
+ CollisionFreeAggregationStateHashTable *hash_table =
+ static_cast<CollisionFreeAggregationStateHashTable *>(
+ collision_free_hashtable_.get());
- // TODO(harshad) - The merge phase may be slower when each hash table contains
- // large number of entries. We should find ways in which we can perform a
- // parallel merge.
+ // TODO
+ const std::size_t max_length =
+ hash_table->getNumTuplesInPartition(partition_id);
+ ColumnVectorsValueAccessor complete_result;
- // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
- // e.g. Keep merging entries from smaller hash tables to larger.
+ DCHECK_EQ(1u, group_by_types_.size());
+ const Type *key_type = group_by_types_.front();
+ DCHECK(NativeColumnVector::UsableForType(*key_type));
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- if (hash_tables->size() > 1) {
- for (int hash_table_index = 0;
- hash_table_index < static_cast<int>(hash_tables->size() - 1);
- ++hash_table_index) {
- // Merge each hash table to the last hash table.
- mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
- hash_tables->back().get());
+ std::unique_ptr<NativeColumnVector> key_cv(
+ new NativeColumnVector(*key_type, max_length));
+ hash_table->finalizeKey(partition_id, key_cv.get());
+ complete_result.addColumn(key_cv.release());
+
+ for (std::size_t i = 0; i < handles_.size(); ++i) {
+ if (handles_[i]->getAggregationID() == AggregationID::kDistinct) {
+ DCHECK_EQ(1u, handles_.size());
+ break;
}
+
+ const Type *result_type = handles_[i]->getResultType();
+ DCHECK(NativeColumnVector::UsableForType(*result_type));
+
+ std::unique_ptr<NativeColumnVector> result_cv(
+ new NativeColumnVector(*result_type, max_length));
+ hash_table->finalizeState(partition_id, i, result_cv.get());
+ complete_result.addColumn(result_cv.release());
}
+ // Bulk-insert the complete result.
+ output_destination->bulkInsertTuples(&complete_result);
+}
+
+void AggregationOperationState::finalizeHashTableImplPartitioned(
+ const std::size_t partition_id,
+ InsertDestination *output_destination) {
+ // Each element of 'group_by_keys' is a vector of values for a particular
+ // group (which is also the prefix of the finalized Tuple for that group).
+ std::vector<std::vector<TypedValue>> group_by_keys;
+
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
+ AggregationStateHashTableBase *hash_table =
+ partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (is_distinct_[agg_idx]) {
- DCHECK(group_by_hashtable_pool_ != nullptr);
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(hash_tables != nullptr);
- if (hash_tables->empty()) {
- // We may have a case where hash_tables is empty, e.g. no input blocks.
- // However for aggregateOnDistinctifyHashTableForGroupBy to work
- // correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- group_by_hashtable_pool_->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- DCHECK(hash_tables->back() != nullptr);
- AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
- DCHECK(agg_hash_table != nullptr);
- handles_[agg_idx]->allowUpdate();
- handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
- *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
- }
-
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(hash_tables != nullptr);
- if (hash_tables->empty()) {
- // We may have a case where hash_tables is empty, e.g. no input blocks.
- // However for aggregateOnDistinctifyHashTableForGroupBy to work
- // correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- group_by_hashtable_pool_->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
- DCHECK(agg_hash_table != nullptr);
ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
- *agg_hash_table, &group_by_keys, agg_idx);
+ *hash_table, &group_by_keys, agg_idx);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
}
}
+ hash_table->destroyPayload();
// Reorganize 'group_by_keys' in column-major order so that we can make a
// ColumnVectorsValueAccessor to bulk-insert results.
@@ -640,23 +709,20 @@ void AggregationOperationState::finalizeHashTable(
// in a single HashTable.
std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
std::size_t group_by_element_idx = 0;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- const Type &group_by_type = group_by_element->getType();
- if (NativeColumnVector::UsableForType(group_by_type)) {
+ for (const Type *group_by_type : group_by_types_) {
+ if (NativeColumnVector::UsableForType(*group_by_type)) {
NativeColumnVector *element_cv =
- new NativeColumnVector(group_by_type, group_by_keys.size());
+ new NativeColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(
- std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
}
} else {
IndirectColumnVector *element_cv =
- new IndirectColumnVector(group_by_type, group_by_keys.size());
+ new IndirectColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(
- std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
}
}
++group_by_element_idx;
@@ -676,42 +742,44 @@ void AggregationOperationState::finalizeHashTable(
output_destination->bulkInsertTuples(&complete_result);
}
-void AggregationOperationState::destroyAggregationHashTablePayload() {
- std::vector<std::unique_ptr<AggregationStateHashTableBase>> *all_hash_tables =
- nullptr;
- if (!is_aggregate_partitioned_) {
- if (group_by_hashtable_pool_ != nullptr) {
- all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- } else {
- if (partitioned_group_by_hashtable_pool_ != nullptr) {
- all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables();
- }
- }
- if (all_hash_tables != nullptr) {
- for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index) {
- (*all_hash_tables)[ht_index]->destroyPayload();
- }
- }
-}
-
-void AggregationOperationState::finalizeAggregatePartitioned(
- const std::size_t partition_id, InsertDestination *output_destination) {
+void AggregationOperationState::finalizeHashTableImplThreadPrivate(
+ InsertDestination *output_destination) {
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
+ // TODO(harshad) - The merge phase may be slower when each hash table contains
+ // large number of entries. We should find ways in which we can perform a
+ // parallel merge.
+
+ // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
+ // e.g. Keep merging entries from smaller hash tables to larger.
+
+ auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+ DCHECK(hash_tables != nullptr);
+ if (hash_tables->empty()) {
+ return;
+ }
+
+ std::unique_ptr<AggregationStateHashTableBase> final_hash_table(
+ hash_tables->back().release());
+ for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+ std::unique_ptr<AggregationStateHashTableBase> hash_table(
+ hash_tables->at(i).release());
+ mergeGroupByHashTables(hash_table.get(), final_hash_table.get());
+ hash_table->destroyPayload();
+ }
+
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- AggregationStateHashTableBase *hash_table =
- partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
- *hash_table, &group_by_keys, agg_idx);
+ *final_hash_table, &group_by_keys, agg_idx);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
}
}
+ final_hash_table->destroyPayload();
// Reorganize 'group_by_keys' in column-major order so that we can make a
// ColumnVectorsValueAccessor to bulk-insert results.
@@ -722,19 +790,22 @@ void AggregationOperationState::finalizeAggregatePartitioned(
// in a single HashTable.
std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
std::size_t group_by_element_idx = 0;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- const Type &group_by_type = group_by_element->getType();
- if (NativeColumnVector::UsableForType(group_by_type)) {
- NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+ for (const Type *group_by_type : group_by_types_) {
+ if (NativeColumnVector::UsableForType(*group_by_type)) {
+ NativeColumnVector *element_cv =
+ new NativeColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(
+ std::move(group_key[group_by_element_idx]));
}
} else {
- IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+ IndirectColumnVector *element_cv =
+ new IndirectColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(
+ std::move(group_key[group_by_element_idx]));
}
}
++group_by_element_idx;