You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/17 21:56:03 UTC
[5/6] incubator-quickstep git commit: Updates
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index c2d571b..d40ae9f 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -38,100 +38,100 @@ namespace quickstep {
class StorageManager;
-AggregationHandleMax::AggregationHandleMax(const Type &type)
- : type_(type), block_update_(false) {
- fast_comparator_.reset(
- ComparisonFactory::GetComparison(ComparisonID::kGreater)
- .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
-}
-
-AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable(
- hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
-
-AggregationState* AggregationHandleMax::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size();
-
- return new AggregationStateMax(fast_comparator_->accumulateColumnVector(
- type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMax::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for MAX: " << accessor_ids.size();
-
- return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
- type_.getNullableVersion().makeNullValue(),
- accessor,
- accessor_ids.front()));
-}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
-void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- DCHECK_EQ(1u, argument_ids.size())
- << "Got wrong number of arguments for MAX: " << argument_ids.size();
-}
-
-void AggregationHandleMax::mergeStates(const AggregationState &source,
- AggregationState *destination) const {
- const AggregationStateMax &max_source =
- static_cast<const AggregationStateMax &>(source);
- AggregationStateMax *max_destination =
- static_cast<AggregationStateMax *>(destination);
-
- if (!max_source.max_.isNull()) {
- compareAndUpdate(max_destination, max_source.max_);
- }
-}
-
-void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const {
- const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source);
- TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination);
- if (!(src_max_ptr->isNull())) {
- compareAndUpdateFast(dst_max_ptr, *src_max_ptr);
- }
-}
-
-ColumnVector* AggregationHandleMax::finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- return finalizeHashTableHelperFast<AggregationHandleMax,
- AggregationStateFastHashTable>(
- type_.getNullableVersion(), hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
- AggregationHandleMax,
- AggregationStateMax>(distinctify_hash_table);
-}
-
-void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
- AggregationHandleMax,
- AggregationStateFastHashTable>(
- distinctify_hash_table, aggregation_hash_table, index);
-}
+AggregationHandleMax::AggregationHandleMax(const Type &type) {}
+// : type_(type), block_update_(false) {
+// fast_comparator_.reset(
+// ComparisonFactory::GetComparison(ComparisonID::kGreater)
+// .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
+//}
+//
+//AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable(
+// const HashTableImplType hash_table_impl,
+// const std::vector<const Type *> &group_by_types,
+// const std::size_t estimated_num_groups,
+// StorageManager *storage_manager) const {
+// return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable(
+// hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
+//}
+//
+//AggregationState* AggregationHandleMax::accumulateColumnVectors(
+// const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+// DCHECK_EQ(1u, column_vectors.size())
+// << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size();
+//
+// return new AggregationStateMax(fast_comparator_->accumulateColumnVector(
+// type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
+//}
+//
+//#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+//AggregationState* AggregationHandleMax::accumulateValueAccessor(
+// ValueAccessor *accessor,
+// const std::vector<attribute_id> &accessor_ids) const {
+// DCHECK_EQ(1u, accessor_ids.size())
+// << "Got wrong number of attributes for MAX: " << accessor_ids.size();
+//
+// return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
+// type_.getNullableVersion().makeNullValue(),
+// accessor,
+// accessor_ids.front()));
+//}
+//#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+//
+//void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
+// ValueAccessor *accessor,
+// const std::vector<attribute_id> &argument_ids,
+// const std::vector<attribute_id> &group_by_key_ids,
+// AggregationStateHashTableBase *hash_table) const {
+// DCHECK_EQ(1u, argument_ids.size())
+// << "Got wrong number of arguments for MAX: " << argument_ids.size();
+//}
+//
+//void AggregationHandleMax::mergeStates(const AggregationState &source,
+// AggregationState *destination) const {
+// const AggregationStateMax &max_source =
+// static_cast<const AggregationStateMax &>(source);
+// AggregationStateMax *max_destination =
+// static_cast<AggregationStateMax *>(destination);
+//
+// if (!max_source.max_.isNull()) {
+// compareAndUpdate(max_destination, max_source.max_);
+// }
+//}
+//
+//void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source,
+// std::uint8_t *destination) const {
+// const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source);
+// TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination);
+// if (!(src_max_ptr->isNull())) {
+// compareAndUpdateFast(dst_max_ptr, *src_max_ptr);
+// }
+//}
+//
+//ColumnVector* AggregationHandleMax::finalizeHashTable(
+// const AggregationStateHashTableBase &hash_table,
+// std::vector<std::vector<TypedValue>> *group_by_keys,
+// int index) const {
+// return finalizeHashTableHelperFast<AggregationHandleMax,
+// AggregationStateFastHashTable>(
+// type_.getNullableVersion(), hash_table, group_by_keys, index);
+//}
+//
+//AggregationState*
+//AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
+// const AggregationStateHashTableBase &distinctify_hash_table) const {
+// return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
+// AggregationHandleMax,
+// AggregationStateMax>(distinctify_hash_table);
+//}
+//
+//void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
+// const AggregationStateHashTableBase &distinctify_hash_table,
+// AggregationStateHashTableBase *aggregation_hash_table,
+// std::size_t index) const {
+// aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+// AggregationHandleMax,
+// AggregationStateFastHashTable>(
+// distinctify_hash_table, aggregation_hash_table, index);
+//}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 5fb9f44..effc38f 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -26,9 +26,7 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
#include "storage/HashTableBase.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
@@ -49,149 +47,12 @@ class ValueAccessor;
*/
/**
- * @brief Aggregation state for max.
- */
-class AggregationStateMax : public AggregationState {
- public:
- /**
- * @brief Copy constructor (ignores mutex).
- */
- AggregationStateMax(const AggregationStateMax &orig) : max_(orig.max_) {}
-
- /**
- * @brief Destructor.
- */
- ~AggregationStateMax() override{};
-
- const std::uint8_t* getPayloadAddress() const {
- return reinterpret_cast<const uint8_t *>(&max_);
- }
-
- private:
- friend class AggregationHandleMax;
-
- explicit AggregationStateMax(const Type &type)
- : max_(type.getNullableVersion().makeNullValue()) {}
-
- explicit AggregationStateMax(TypedValue &&value) : max_(std::move(value)) {}
-
- TypedValue max_;
- SpinMutex mutex_;
-};
-
-/**
* @brief An aggregationhandle for max.
**/
-class AggregationHandleMax : public AggregationConcreteHandle {
+class AggregationHandleMax : public AggregationHandle {
public:
~AggregationHandleMax() override {}
- AggregationState* createInitialState() const override {
- return new AggregationStateMax(type_);
- }
-
- AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const override;
-
- /**
- * @brief Iterate with max aggregation state.
- */
- inline void iterateUnaryInl(AggregationStateMax *state,
- const TypedValue &value) const {
- DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
- compareAndUpdate(static_cast<AggregationStateMax *>(state), value);
- }
-
- inline void iterateUnaryInlFast(const TypedValue &value,
- std::uint8_t *byte_ptr) const {
- DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
- TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
- compareAndUpdateFast(max_ptr, value);
- }
-
- inline void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const override {
- if (!block_update_) {
- iterateUnaryInlFast(argument, byte_ptr);
- }
- }
-
- void blockUpdate() override { block_update_ = true; }
-
- void allowUpdate() override { block_update_ = false; }
-
- void initPayload(std::uint8_t *byte_ptr) const override {
- TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
- TypedValue t1 = (type_.getNullableVersion().makeNullValue());
- *max_ptr = t1;
- }
-
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const override;
-#endif
-
- void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const override;
-
- void mergeStates(const AggregationState &source,
- AggregationState *destination) const override;
-
- void mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const override;
-
- TypedValue finalize(const AggregationState &state) const override {
- return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
- }
-
- inline TypedValue finalizeHashTableEntry(
- const AggregationState &state) const {
- return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
- }
-
- inline TypedValue finalizeHashTableEntryFast(
- const std::uint8_t *byte_ptr) const {
- const TypedValue *max_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
- return TypedValue(*max_ptr);
- }
-
- ColumnVector* finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
- * for MAX aggregation.
- */
- AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table)
- const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
- * for MAX aggregation.
- */
- void aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const override;
-
- std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
-
private:
friend class AggregateFunctionMax;
@@ -202,37 +63,8 @@ class AggregationHandleMax : public AggregationConcreteHandle {
**/
explicit AggregationHandleMax(const Type &type);
- /**
- * @brief compare the value with max_ and update it if the value is larger
- * than current maximum. NULLs are ignored.
- *
- * @param value A TypedValue to compare
- **/
- inline void compareAndUpdate(AggregationStateMax *state,
- const TypedValue &value) const {
- // TODO(chasseur): Avoid null-checks when aggregating a non-nullable Type.
- if (value.isNull()) return;
-
- SpinMutexLock lock(state->mutex_);
- if (state->max_.isNull() ||
- fast_comparator_->compareTypedValues(value, state->max_)) {
- state->max_ = value;
- }
- }
-
- inline void compareAndUpdateFast(TypedValue *max_ptr,
- const TypedValue &value) const {
- if (value.isNull()) return;
- if (max_ptr->isNull() ||
- fast_comparator_->compareTypedValues(value, *max_ptr)) {
- *max_ptr = value;
- }
- }
-
- const Type &type_;
- std::unique_ptr<UncheckedComparator> fast_comparator_;
-
- bool block_update_;
+// const Type &type_;
+// std::unique_ptr<UncheckedComparator> fast_comparator_;
DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index a07f299..4765c93 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -38,101 +38,101 @@ namespace quickstep {
class StorageManager;
-AggregationHandleMin::AggregationHandleMin(const Type &type)
- : type_(type), block_update_(false) {
- fast_comparator_.reset(
- ComparisonFactory::GetComparison(ComparisonID::kLess)
- .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
-}
-
-AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return AggregationStateHashTableFactory<AggregationStateMin>::CreateResizable(
- hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
-
-AggregationState* AggregationHandleMin::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size();
-
- return new AggregationStateMin(fast_comparator_->accumulateColumnVector(
- type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMin::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for MIN: " << accessor_ids.size();
-
- return new AggregationStateMin(fast_comparator_->accumulateValueAccessor(
- type_.getNullableVersion().makeNullValue(),
- accessor,
- accessor_ids.front()));
-}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
-void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- DCHECK_EQ(1u, argument_ids.size())
- << "Got wrong number of arguments for MIN: " << argument_ids.size();
-}
-
-void AggregationHandleMin::mergeStates(const AggregationState &source,
- AggregationState *destination) const {
- const AggregationStateMin &min_source =
- static_cast<const AggregationStateMin &>(source);
- AggregationStateMin *min_destination =
- static_cast<AggregationStateMin *>(destination);
-
- if (!min_source.min_.isNull()) {
- compareAndUpdate(min_destination, min_source.min_);
- }
-}
-
-void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const {
- const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source);
- TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination);
-
- if (!(src_min_ptr->isNull())) {
- compareAndUpdateFast(dst_min_ptr, *src_min_ptr);
- }
-}
-
-ColumnVector* AggregationHandleMin::finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- return finalizeHashTableHelperFast<AggregationHandleMin,
- AggregationStateFastHashTable>(
- type_.getNonNullableVersion(), hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
- AggregationHandleMin,
- AggregationStateMin>(distinctify_hash_table);
-}
-
-void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
- AggregationHandleMin,
- AggregationStateFastHashTable>(
- distinctify_hash_table, aggregation_hash_table, index);
-}
+AggregationHandleMin::AggregationHandleMin(const Type &type) {}
+// : type_(type), block_update_(false) {
+// fast_comparator_.reset(
+// ComparisonFactory::GetComparison(ComparisonID::kLess)
+// .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
+//}
+//
+//AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable(
+// const HashTableImplType hash_table_impl,
+// const std::vector<const Type *> &group_by_types,
+// const std::size_t estimated_num_groups,
+// StorageManager *storage_manager) const {
+// return AggregationStateHashTableFactory<AggregationStateMin>::CreateResizable(
+// hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
+//}
+//
+//AggregationState* AggregationHandleMin::accumulateColumnVectors(
+// const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+// DCHECK_EQ(1u, column_vectors.size())
+// << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size();
+//
+// return new AggregationStateMin(fast_comparator_->accumulateColumnVector(
+// type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
+//}
+//
+//#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+//AggregationState* AggregationHandleMin::accumulateValueAccessor(
+// ValueAccessor *accessor,
+// const std::vector<attribute_id> &accessor_ids) const {
+// DCHECK_EQ(1u, accessor_ids.size())
+// << "Got wrong number of attributes for MIN: " << accessor_ids.size();
+//
+// return new AggregationStateMin(fast_comparator_->accumulateValueAccessor(
+// type_.getNullableVersion().makeNullValue(),
+// accessor,
+// accessor_ids.front()));
+//}
+//#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+//
+//void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
+// ValueAccessor *accessor,
+// const std::vector<attribute_id> &argument_ids,
+// const std::vector<attribute_id> &group_by_key_ids,
+// AggregationStateHashTableBase *hash_table) const {
+// DCHECK_EQ(1u, argument_ids.size())
+// << "Got wrong number of arguments for MIN: " << argument_ids.size();
+//}
+//
+//void AggregationHandleMin::mergeStates(const AggregationState &source,
+// AggregationState *destination) const {
+// const AggregationStateMin &min_source =
+// static_cast<const AggregationStateMin &>(source);
+// AggregationStateMin *min_destination =
+// static_cast<AggregationStateMin *>(destination);
+//
+// if (!min_source.min_.isNull()) {
+// compareAndUpdate(min_destination, min_source.min_);
+// }
+//}
+//
+//void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source,
+// std::uint8_t *destination) const {
+// const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source);
+// TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination);
+//
+// if (!(src_min_ptr->isNull())) {
+// compareAndUpdateFast(dst_min_ptr, *src_min_ptr);
+// }
+//}
+//
+//ColumnVector* AggregationHandleMin::finalizeHashTable(
+// const AggregationStateHashTableBase &hash_table,
+// std::vector<std::vector<TypedValue>> *group_by_keys,
+// int index) const {
+// return finalizeHashTableHelperFast<AggregationHandleMin,
+// AggregationStateFastHashTable>(
+// type_.getNonNullableVersion(), hash_table, group_by_keys, index);
+//}
+//
+//AggregationState*
+//AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
+// const AggregationStateHashTableBase &distinctify_hash_table) const {
+// return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
+// AggregationHandleMin,
+// AggregationStateMin>(distinctify_hash_table);
+//}
+//
+//void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
+// const AggregationStateHashTableBase &distinctify_hash_table,
+// AggregationStateHashTableBase *aggregation_hash_table,
+// std::size_t index) const {
+// aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+// AggregationHandleMin,
+// AggregationStateFastHashTable>(
+// distinctify_hash_table, aggregation_hash_table, index);
+//}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 173911d..64fddea 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -26,11 +26,8 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
#include "storage/HashTableBase.hpp"
-#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "types/operations/comparisons/Comparison.hpp"
@@ -49,151 +46,12 @@ class ValueAccessor;
*/
/**
- * @brief Aggregation state for min.
- */
-class AggregationStateMin : public AggregationState {
- public:
- /**
- * @brief Copy constructor (ignores mutex).
- */
- AggregationStateMin(const AggregationStateMin &orig) : min_(orig.min_) {}
-
- /**
- * @brief Destructor.
- */
- ~AggregationStateMin() override {}
-
- std::size_t getPayloadSize() const { return sizeof(TypedValue); }
-
- const std::uint8_t *getPayloadAddress() const {
- return reinterpret_cast<const uint8_t *>(&min_);
- }
-
- private:
- friend class AggregationHandleMin;
-
- explicit AggregationStateMin(const Type &type)
- : min_(type.getNullableVersion().makeNullValue()) {}
-
- explicit AggregationStateMin(TypedValue &&value) : min_(std::move(value)) {}
-
- TypedValue min_;
- SpinMutex mutex_;
-};
-
-/**
* @brief An aggregationhandle for min.
**/
-class AggregationHandleMin : public AggregationConcreteHandle {
+class AggregationHandleMin : public AggregationHandle {
public:
~AggregationHandleMin() override {}
- AggregationState* createInitialState() const override {
- return new AggregationStateMin(type_);
- }
-
- AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const override;
-
- /**
- * @brief Iterate with min aggregation state.
- */
- inline void iterateUnaryInl(AggregationStateMin *state,
- const TypedValue &value) const {
- DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
- compareAndUpdate(state, value);
- }
-
- inline void iterateUnaryInlFast(const TypedValue &value,
- std::uint8_t *byte_ptr) const {
- DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
- TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
- compareAndUpdateFast(min_ptr, value);
- }
-
- inline void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const override {
- if (!block_update_) {
- iterateUnaryInlFast(argument, byte_ptr);
- }
- }
-
- void blockUpdate() override { block_update_ = true; }
-
- void allowUpdate() override { block_update_ = false; }
-
- void initPayload(std::uint8_t *byte_ptr) const override {
- TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
- TypedValue t1 = (type_.getNullableVersion().makeNullValue());
- *min_ptr = t1;
- }
-
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const override;
-#endif
-
- void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const override;
-
- void mergeStates(const AggregationState &source,
- AggregationState *destination) const override;
-
- void mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const override;
-
- TypedValue finalize(const AggregationState &state) const override {
- return static_cast<const AggregationStateMin &>(state).min_;
- }
-
- inline TypedValue finalizeHashTableEntry(
- const AggregationState &state) const {
- return static_cast<const AggregationStateMin &>(state).min_;
- }
-
- inline TypedValue finalizeHashTableEntryFast(
- const std::uint8_t *byte_ptr) const {
- const TypedValue *min_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
- return TypedValue(*min_ptr);
- }
-
- ColumnVector* finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
- * for MIN aggregation.
- */
- AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table)
- const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
- * for MIN aggregation.
- */
- void aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const override;
-
- std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
-
private:
friend class AggregateFunctionMin;
@@ -204,36 +62,8 @@ class AggregationHandleMin : public AggregationConcreteHandle {
**/
explicit AggregationHandleMin(const Type &type);
- /**
- * @brief compare the value with min_ and update it if the value is smaller
- * than current minimum. NULLs are ignored.
- *
- * @param value A TypedValue to compare.
- **/
- inline void compareAndUpdate(AggregationStateMin *state,
- const TypedValue &value) const {
- if (value.isNull()) return;
-
- SpinMutexLock lock(state->mutex_);
- if (state->min_.isNull() ||
- fast_comparator_->compareTypedValues(value, state->min_)) {
- state->min_ = value;
- }
- }
-
- inline void compareAndUpdateFast(TypedValue *min_ptr,
- const TypedValue &value) const {
- if (value.isNull()) return;
- if (min_ptr->isNull() ||
- fast_comparator_->compareTypedValues(value, *min_ptr)) {
- *min_ptr = value;
- }
- }
-
- const Type &type_;
- std::unique_ptr<UncheckedComparator> fast_comparator_;
-
- bool block_update_;
+// const Type &type_;
+// std::unique_ptr<UncheckedComparator> fast_comparator_;
DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 642d88d..4e77ed0 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -20,6 +20,7 @@
#include "expressions/aggregation/AggregationHandleSum.hpp"
#include <cstddef>
+#include <cstring>
#include <memory>
#include <utility>
#include <vector>
@@ -35,6 +36,7 @@
#include "types/operations/binary_operations/BinaryOperation.hpp"
#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "types/TypeFunctors.hpp"
#include "glog/logging.h"
@@ -42,12 +44,11 @@ namespace quickstep {
class StorageManager;
-AggregationHandleSum::AggregationHandleSum(const Type &type)
- : argument_type_(type), block_update_(false) {
+AggregationHandleSum::AggregationHandleSum(const Type &argument_type) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_precision_id;
- switch (argument_type_.getTypeID()) {
+ switch (argument_type.getTypeID()) {
case kInt:
case kLong:
type_precision_id = kLong;
@@ -57,134 +58,57 @@ AggregationHandleSum::AggregationHandleSum(const Type &type)
type_precision_id = kDouble;
break;
default:
- type_precision_id = type.getTypeID();
+ type_precision_id = argument_type.getTypeID();
break;
}
const Type &sum_type = TypeFactory::GetType(type_precision_id);
- blank_state_.sum_ = sum_type.makeZeroValue();
+ state_size_ = sum_type.maximumByteLength();
+ blank_state_.reset(state_size_, false);
+
+ tv_blank_state_ = sum_type.makeZeroValue();
// Make operators to do arithmetic:
// Add operator for summing argument values.
- fast_operator_.reset(
+ accumulate_operator_.reset(
BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
- .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_));
+ .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type));
+ accumulate_functor_ = accumulate_operator_->getMergeFunctor();
+
// Add operator for merging states.
merge_operator_.reset(
BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
.makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
+ merge_functor_ = merge_operator_->getMergeFunctor();
- // Result is nullable, because SUM() over 0 values (or all NULL values) is
- // NULL.
- result_type_ = &sum_type.getNullableVersion();
+ finalize_functor_ = MakeUntypedCopyFunctor(&sum_type);
+ result_type_ = &sum_type;
}
-AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return AggregationStateHashTableFactory<AggregationStateSum>::CreateResizable(
- hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
-
-AggregationState* AggregationHandleSum::accumulateColumnVectors(
+void AggregationHandleSum::accumulateColumnVectors(
+ void *state,
const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
DCHECK_EQ(1u, column_vectors.size())
<< "Got wrong number of ColumnVectors for SUM: " << column_vectors.size();
std::size_t num_tuples = 0;
- TypedValue cv_sum = fast_operator_->accumulateColumnVector(
- blank_state_.sum_, *column_vectors.front(), &num_tuples);
- return new AggregationStateSum(std::move(cv_sum), num_tuples == 0);
+ TypedValue cv_sum = accumulate_operator_->accumulateColumnVector(
+ tv_blank_state_, *column_vectors.front(), &num_tuples);
+ cv_sum.copyInto(state);
}
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleSum::accumulateValueAccessor(
+void AggregationHandleSum::accumulateValueAccessor(
+ void *state,
ValueAccessor *accessor,
const std::vector<attribute_id> &accessor_ids) const {
DCHECK_EQ(1u, accessor_ids.size())
<< "Got wrong number of attributes for SUM: " << accessor_ids.size();
std::size_t num_tuples = 0;
- TypedValue va_sum = fast_operator_->accumulateValueAccessor(
- blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples);
- return new AggregationStateSum(std::move(va_sum), num_tuples == 0);
+ TypedValue va_sum = accumulate_operator_->accumulateValueAccessor(
+ tv_blank_state_, accessor, accessor_ids.front(), &num_tuples);
+ va_sum.copyInto(state);
}
#endif
-void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- DCHECK_EQ(1u, argument_ids.size())
- << "Got wrong number of arguments for SUM: " << argument_ids.size();
-}
-
-void AggregationHandleSum::mergeStates(const AggregationState &source,
- AggregationState *destination) const {
- const AggregationStateSum &sum_source =
- static_cast<const AggregationStateSum &>(source);
- AggregationStateSum *sum_destination =
- static_cast<AggregationStateSum *>(destination);
-
- SpinMutexLock lock(sum_destination->mutex_);
- sum_destination->sum_ = merge_operator_->applyToTypedValues(
- sum_destination->sum_, sum_source.sum_);
- sum_destination->null_ = sum_destination->null_ && sum_source.null_;
-}
-
-void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const {
- const TypedValue *src_sum_ptr =
- reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
- const bool *src_null_ptr =
- reinterpret_cast<const bool *>(source + blank_state_.null_offset_);
- TypedValue *dst_sum_ptr =
- reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_);
- bool *dst_null_ptr =
- reinterpret_cast<bool *>(destination + blank_state_.null_offset_);
- *dst_sum_ptr =
- merge_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
- *dst_null_ptr = (*dst_null_ptr) && (*src_null_ptr);
-}
-
-TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
- const AggregationStateSum &agg_state =
- static_cast<const AggregationStateSum &>(state);
- if (agg_state.null_) {
- // SUM() over no values is NULL.
- return result_type_->makeNullValue();
- } else {
- return agg_state.sum_;
- }
-}
-
-ColumnVector* AggregationHandleSum::finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- return finalizeHashTableHelperFast<AggregationHandleSum,
- AggregationStateFastHashTable>(
- *result_type_, hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
- AggregationHandleSum,
- AggregationStateSum>(distinctify_hash_table);
-}
-
-void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
- AggregationHandleSum,
- AggregationStateFastHashTable>(
- distinctify_hash_table, aggregation_hash_table, index);
-}
-
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index 6c334a6..f45e87e 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -26,198 +26,39 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTableBase.hpp"
-#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "utility/ScopedBuffer.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
namespace quickstep {
-class ColumnVector;
-class StorageManager;
-class ValueAccessor;
-
/** \addtogroup Expressions
* @{
*/
/**
- * @brief Aggregation state for sum.
- */
-class AggregationStateSum : public AggregationState {
- public:
- /**
- * @brief Copy constructor (ignores mutex).
- */
- AggregationStateSum(const AggregationStateSum &orig)
- : sum_(orig.sum_),
- null_(orig.null_),
- sum_offset_(orig.sum_offset_),
- null_offset_(orig.null_offset_) {}
-
- std::size_t getPayloadSize() const {
- std::size_t p1 = reinterpret_cast<std::size_t>(&sum_);
- std::size_t p2 = reinterpret_cast<std::size_t>(&mutex_);
- return (p2 - p1);
- }
-
- const std::uint8_t* getPayloadAddress() const {
- return reinterpret_cast<const uint8_t *>(&sum_);
- }
-
- private:
- friend class AggregationHandleSum;
-
- AggregationStateSum()
- : sum_(0),
- null_(true),
- sum_offset_(0),
- null_offset_(reinterpret_cast<std::uint8_t *>(&null_) -
- reinterpret_cast<std::uint8_t *>(&sum_)) {}
-
- AggregationStateSum(TypedValue &&sum, const bool is_null)
- : sum_(std::move(sum)), null_(is_null) {}
-
- // TODO(shoban): We might want to specialize sum_ to use atomics for int types
- // similar to in AggregationStateCount.
- TypedValue sum_;
- bool null_;
- SpinMutex mutex_;
-
- int sum_offset_, null_offset_;
-};
-
-/**
* @brief An aggregationhandle for sum.
**/
-class AggregationHandleSum : public AggregationConcreteHandle {
+class AggregationHandleSum : public AggregationHandle {
public:
~AggregationHandleSum() override {}
- AggregationState* createInitialState() const override {
- return new AggregationStateSum(blank_state_);
- }
-
- AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const override;
-
- inline void iterateUnaryInl(AggregationStateSum *state,
- const TypedValue &value) const {
- DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
- if (value.isNull()) return;
-
- SpinMutexLock lock(state->mutex_);
- state->sum_ = fast_operator_->applyToTypedValues(state->sum_, value);
- state->null_ = false;
- }
-
- inline void iterateUnaryInlFast(const TypedValue &value,
- std::uint8_t *byte_ptr) const {
- DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
- if (value.isNull()) return;
- TypedValue *sum_ptr =
- reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
- bool *null_ptr =
- reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
- *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value);
- *null_ptr = false;
- }
-
- inline void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const override {
- if (!block_update_) {
- iterateUnaryInlFast(argument, byte_ptr);
- }
- }
-
- void blockUpdate() override { block_update_ = true; }
-
- void allowUpdate() override { block_update_ = false; }
-
- void initPayload(std::uint8_t *byte_ptr) const override {
- TypedValue *sum_ptr =
- reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
- bool *null_ptr =
- reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
- *sum_ptr = blank_state_.sum_;
- *null_ptr = true;
- }
-
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
+ void accumulateColumnVectors(
+ void *state,
+ const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
+ void accumulateValueAccessor(
+ void *state,
ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_id) const override;
+ const std::vector<attribute_id> &accessor_ids) const override;
#endif
- void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const override;
-
- void mergeStates(const AggregationState &source,
- AggregationState *destination) const override;
-
- void mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const override;
-
- TypedValue finalize(const AggregationState &state) const override;
-
- inline TypedValue finalizeHashTableEntry(
- const AggregationState &state) const {
- return static_cast<const AggregationStateSum &>(state).sum_;
- }
-
- inline TypedValue finalizeHashTableEntryFast(
- const std::uint8_t *byte_ptr) const {
- std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
- TypedValue *sum_ptr =
- reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_);
- return *sum_ptr;
- }
-
- ColumnVector* finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
- * for SUM aggregation.
- */
- AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table)
- const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
- * for SUM aggregation.
- */
- void aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const override;
-
- std::size_t getPayloadSize() const override {
- return blank_state_.getPayloadSize();
- }
-
private:
friend class AggregateFunctionSum;
@@ -226,15 +67,13 @@ class AggregationHandleSum : public AggregationConcreteHandle {
*
* @param type Type of the sum value.
**/
- explicit AggregationHandleSum(const Type &type);
+ explicit AggregationHandleSum(const Type &argument_type);
- const Type &argument_type_;
- const Type *result_type_;
- AggregationStateSum blank_state_;
- std::unique_ptr<UncheckedBinaryOperator> fast_operator_;
- std::unique_ptr<UncheckedBinaryOperator> merge_operator_;
+ // TODO: temporary
+ TypedValue tv_blank_state_;
- bool block_update_;
+ std::unique_ptr<UncheckedBinaryOperator> accumulate_operator_;
+ std::unique_ptr<UncheckedBinaryOperator> merge_operator_;
DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index e9503f7..7b369ae 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -43,9 +43,6 @@ add_library(quickstep_expressions_aggregation_AggregateFunctionMin
add_library(quickstep_expressions_aggregation_AggregateFunctionSum
AggregateFunctionSum.cpp
AggregateFunctionSum.hpp)
-add_library(quickstep_expressions_aggregation_AggregationConcreteHandle
- AggregationConcreteHandle.cpp
- AggregationConcreteHandle.hpp)
add_library(quickstep_expressions_aggregation_AggregationHandle
../../empty_src.cpp
AggregationHandle.hpp)
@@ -55,9 +52,6 @@ add_library(quickstep_expressions_aggregation_AggregationHandleAvg
add_library(quickstep_expressions_aggregation_AggregationHandleCount
AggregationHandleCount.cpp
AggregationHandleCount.hpp)
-add_library(quickstep_expressions_aggregation_AggregationHandleDistinct
- AggregationHandleDistinct.cpp
- AggregationHandleDistinct.hpp)
add_library(quickstep_expressions_aggregation_AggregationHandleMax
AggregationHandleMax.cpp
AggregationHandleMax.hpp)
@@ -142,34 +136,20 @@ target_link_libraries(quickstep_expressions_aggregation_AggregateFunctionSum
quickstep_types_operations_binaryoperations_BinaryOperationFactory
quickstep_types_operations_binaryoperations_BinaryOperationID
quickstep_utility_Macros)
-target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandle
- glog
- quickstep_catalog_CatalogTypedefs
- quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
- quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
- quickstep_threading_SpinMutex
- quickstep_types_TypedValue
- quickstep_types_containers_ColumnVector
- quickstep_utility_Macros)
target_link_libraries(quickstep_expressions_aggregation_AggregationHandle
glog
quickstep_catalog_CatalogTypedefs
- quickstep_storage_HashTableBase
+ quickstep_types_Type
quickstep_types_TypedValue
- quickstep_utility_Macros)
+ quickstep_utility_Macros
+ quickstep_utility_ScopedBuffer)
target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
glog
quickstep_catalog_CatalogTypedefs
- quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
- quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypeFactory
quickstep_types_TypeID
@@ -181,34 +161,23 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
glog
quickstep_catalog_CatalogTypedefs
- quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
quickstep_storage_ValueAccessor
quickstep_storage_ValueAccessorUtil
+ quickstep_types_LongType
quickstep_types_TypeFactory
quickstep_types_TypeID
quickstep_types_TypedValue
quickstep_types_containers_ColumnVector
quickstep_types_containers_ColumnVectorUtil
quickstep_utility_Macros)
-target_link_libraries(quickstep_expressions_aggregation_AggregationHandleDistinct
- glog
- quickstep_catalog_CatalogTypedefs
- quickstep_expressions_aggregation_AggregationConcreteHandle
- quickstep_storage_HashTable
- quickstep_storage_HashTableBase
- quickstep_types_TypedValue
- quickstep_utility_Macros)
target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
glog
quickstep_catalog_CatalogTypedefs
- quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
@@ -223,9 +192,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
glog
quickstep_catalog_CatalogTypedefs
- quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
@@ -240,21 +207,21 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
glog
quickstep_catalog_CatalogTypedefs
- quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypeFactory
+ quickstep_types_TypeFunctors
quickstep_types_TypeID
quickstep_types_TypedValue
quickstep_types_operations_binaryoperations_BinaryOperation
quickstep_types_operations_binaryoperations_BinaryOperationFactory
quickstep_types_operations_binaryoperations_BinaryOperationID
- quickstep_utility_Macros)
+ quickstep_utility_Macros
+ quickstep_utility_ScopedBuffer)
# Submodule all-in-one library:
add_library(quickstep_expressions_aggregation ../../empty_src.cpp)
@@ -267,11 +234,9 @@ target_link_libraries(quickstep_expressions_aggregation
quickstep_expressions_aggregation_AggregateFunctionMax
quickstep_expressions_aggregation_AggregateFunctionMin
quickstep_expressions_aggregation_AggregateFunctionSum
- quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
quickstep_expressions_aggregation_AggregationHandleAvg
quickstep_expressions_aggregation_AggregationHandleCount
- quickstep_expressions_aggregation_AggregationHandleDistinct
quickstep_expressions_aggregation_AggregationHandleMax
quickstep_expressions_aggregation_AggregationHandleMin
quickstep_expressions_aggregation_AggregationHandleSum
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationHashTable.hpp b/storage/AggregationHashTable.hpp
deleted file mode 100644
index fca6d4c..0000000
--- a/storage/AggregationHashTable.hpp
+++ /dev/null
@@ -1,330 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_STORAGE_AGGREGATION_HASH_TABLE_HPP_
-#define QUICKSTEP_STORAGE_AGGREGATION_HASH_TABLE_HPP_
-
-#include <algorithm>
-#include <atomic>
-#include <cstddef>
-#include <cstdlib>
-#include <cstring>
-#include <limits>
-#include <memory>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/HashTableBase.hpp"
-#include "storage/HashTableUntypedKeyManager.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConstants.hpp"
-#include "storage/StorageManager.hpp"
-#include "storage/ValueAccessor.hpp"
-#include "storage/ValueAccessorUtil.hpp"
-#include "threading/SpinMutex.hpp"
-#include "threading/SpinSharedMutex.hpp"
-#include "types/Type.hpp"
-#include "types/TypeFunctors.hpp"
-#include "utility/Alignment.hpp"
-#include "utility/InlineMemcpy.hpp"
-#include "utility/Macros.hpp"
-#include "utility/PrimeNumber.hpp"
-
-namespace quickstep {
-
-/** \addtogroup Storage
- * @{
- */
-
-template <bool use_mutex>
-class AggregationHashTablePayloadManager {
- public:
- AggregationHashTablePayloadManager(const std::vector<AggregationHandle *> &handles)
- : handles_(handles),
- payload_size_in_bytes_(0) {
- if (use_mutex) {
- payload_size_in_bytes_ += sizeof(SpinMutex);
- }
- for (const AggregationHandle *handle : handles) {
- const std::size_t state_size = handle->getStateSize();
- agg_state_sizes_.emplace_back(state_size);
- agg_state_offsets_.emplace_back(payload_size_in_bytes_);
- payload_size_in_bytes_ += state_size;
- }
-
- initial_payload_ = std::malloc(payload_size_in_bytes_);
- if (use_mutex) {
- new(initial_payload_) Mutex;
- }
-// for (std::size_t i = 0; i < handles_.size(); ++i) {
-// handles_[i]->initPayload(
-// static_cast<std::uint8_t *>(initial_payload_) + agg_state_offsets_[i]);
-// }
- }
-
- ~AggregationHashTablePayloadManager() {
- std::free(initial_payload_);
- }
-
- inline std::size_t getPayloadSizeInBytes() const {
- return payload_size_in_bytes_;
- }
-
- inline void updatePayload(void *payload) const {
- }
-
- inline void initPayload(void *payload) const {
- }
-
- private:
- std::vector<AggregationHandle *> handles_;
-
- std::vector<std::size_t> agg_state_sizes_;
- std::vector<std::size_t> agg_state_offsets_;
- std::size_t payload_size_in_bytes_;
-
- void *initial_payload_;
-
- DISALLOW_COPY_AND_ASSIGN(AggregationHashTablePayloadManager);
-};
-
-class ThreadPrivateAggregationHashTable : public AggregationHashTableBase {
- public:
- ThreadPrivateAggregationHashTable(const std::vector<const Type *> &key_types,
- const std::size_t num_entries,
- const std::vector<AggregationHandle *> &handles,
- StorageManager *storage_manager)
- : payload_manager_(handles),
- key_types_(key_types),
- key_manager_(this->key_types_, payload_manager_.getPayloadSizeInBytes()),
- slots_(num_entries * kHashTableLoadFactor,
- key_manager_.getUntypedKeyHashFunctor(),
- key_manager_.getUntypedKeyEqualityFunctor()),
- bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize(),
- payload_manager_.getPayloadSizeInBytes())),
- buckets_allocated_(0),
- storage_manager_(storage_manager) {
- std::size_t num_storage_slots =
- this->storage_manager_->SlotsNeededForBytes(num_entries);
-
- // Get a StorageBlob to hold the hash table.
- const block_id blob_id = this->storage_manager_->createBlob(num_storage_slots);
- this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
-
- buckets_ = this->blob_->getMemoryMutable();
- num_buckets_ = num_storage_slots * kSlotSizeBytes / bucket_size_;
- }
-
- void resize() {
- const std::size_t resized_memory_required = num_buckets_ * bucket_size_ * 2;
- const std::size_t resized_storage_slots =
- this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
- const block_id resized_blob_id =
- this->storage_manager_->createBlob(resized_storage_slots);
- MutableBlobReference resized_blob =
- this->storage_manager_->getBlobMutable(resized_blob_id);
-
- void *resized_buckets = resized_blob->getMemoryMutable();
- std::memcpy(resized_buckets, buckets_, buckets_allocated_ * bucket_size_);
-
- for (auto &pair : slots_) {
- pair.second =
- (static_cast<const char *>(pair.first) - static_cast<char *>(buckets_))
- + static_cast<char *>(resized_buckets);
- }
-
- buckets_ = resized_buckets;
- num_buckets_ = resized_storage_slots * kSlotSizeBytes / bucket_size_;
- std::swap(this->blob_, resized_blob);
- }
-
- bool upsertValueAccessor(ValueAccessor *accessor,
- const attribute_id key_attr_id,
- const std::vector<attribute_id> &argument_ids,
- const bool check_for_null_keys) override {
- if (check_for_null_keys) {
- return upsertValueAccessorInternal<true>(
- accessor, key_attr_id, argument_ids);
- } else {
- return upsertValueAccessorInternal<false>(
- accessor, key_attr_id, argument_ids);
- }
- }
-
- template <bool check_for_null_keys>
- bool upsertValueAccessorInternal(ValueAccessor *accessor,
- const attribute_id key_attr_id,
- const std::vector<attribute_id> &argument_ids) {
- return InvokeOnAnyValueAccessor(
- accessor,
- [&](auto *accessor) -> bool { // NOLINT(build/c++11)
- accessor->beginIteration();
- while (accessor->next()) {
- const void *key = accessor->template getUntypedValue<check_for_null_keys>(key_attr_id);
- if (check_for_null_keys && key == nullptr) {
- continue;
- }
- bool is_empty;
- void *bucket = locateBucket(key, &is_empty);
- if (is_empty) {
- payload_manager_.initPayload(bucket);
- } else {
- payload_manager_.updatePayload(bucket);
- }
- }
- return true;
- });
- }
-
- bool upsertValueAccessorCompositeKey(ValueAccessor *accessor,
- const std::vector<attribute_id> &key_attr_ids,
- const std::vector<attribute_id> &argument_ids,
- const bool check_for_null_keys) override {
- if (check_for_null_keys) {
- return upsertValueAccessorCompositeKeyInternal<true>(
- accessor, key_attr_ids, argument_ids);
- } else {
- return upsertValueAccessorCompositeKeyInternal<false>(
- accessor, key_attr_ids, argument_ids);
- }
- }
-
- template <bool check_for_null_keys>
- bool upsertValueAccessorCompositeKeyInternal(ValueAccessor *accessor,
- const std::vector<attribute_id> &key_attr_ids,
- const std::vector<attribute_id> &argument_ids) {
- return InvokeOnAnyValueAccessor(
- accessor,
- [&](auto *accessor) -> bool { // NOLINT(build/c++11)
- accessor->beginIteration();
- void *prealloc_bucket = allocateBucket();
- while (accessor->next()) {
- if (check_for_null_keys) {
- const bool is_null =
- key_manager_.writeNullableUntypedKeyFromValueAccessorToBucket(
- accessor,
- key_attr_ids,
- prealloc_bucket);
- if (is_null) {
- continue;
- }
- } else {
- key_manager_.writeUntypedKeyFromValueAccessorToBucket(
- accessor,
- key_attr_ids,
- prealloc_bucket);
- }
- void *bucket = locateBucketWithPrealloc(prealloc_bucket);
- if (bucket != prealloc_bucket) {
- payload_manager_.initPayload(bucket);
- prealloc_bucket = allocateBucket();
- } else {
- payload_manager_.updatePayload(bucket);
- }
- }
- // Reclaim the last unused bucket
- --buckets_allocated_;
- return true;
- });
- }
-
- inline void* locateBucket(const void *key, bool *is_empty) {
- auto slot_it = slots_.find(key);
- if (slot_it == slots_.end()) {
- void *bucket = allocateBucket();
- key_manager_.writeUntypedKeyToBucket(key, bucket);
- slots_.emplace(key_manager_.getUntypedKeyComponent(bucket), bucket);
- *is_empty = true;
- return bucket;
- } else {
- *is_empty = false;
- return slot_it->second;
- }
- }
-
- inline void* locateBucketWithPrealloc(void *prealloc_bucket) {
- const void *key = key_manager_.getUntypedKeyComponent(prealloc_bucket);
- auto slot_it = slots_.find(key);
- if (slot_it == slots_.end()) {
- slots_.emplace(key, prealloc_bucket);
- return prealloc_bucket;
- } else {
- return slot_it->second;
- }
- }
-
- inline void* allocateBucket() {
- if (buckets_allocated_ >= num_buckets_) {
- resize();
- }
- void *bucket = static_cast<char *>(buckets_) + buckets_allocated_ * bucket_size_;
- ++buckets_allocated_;
- return bucket;
- }
-
- void print() const override {
- std::cerr << "Bucket size = " << bucket_size_ << "\n";
- std::cerr << "Buckets: \n";
- for (const auto &pair : slots_) {
- std::cerr << pair.first << " -- " << pair.second << "\n";
- std::cerr << *static_cast<const int *>(pair.second) << "\n";
- }
- }
-
- private:
- // Helper object to manage hash table payloads (i.e. aggregation states).
- AggregationHashTablePayloadManager<false> payload_manager_;
-
- // Type(s) of keys.
- const std::vector<const Type*> key_types_;
-
- // Helper object to manage key storage.
- HashTableUntypedKeyManager key_manager_;
-
- // Round bucket size up to a multiple of kBucketAlignment.
- static std::size_t ComputeBucketSize(const std::size_t fixed_key_size,
- const std::size_t total_payload_size) {
- constexpr std::size_t kBucketAlignment = 4;
- return (((fixed_key_size + total_payload_size - 1)
- / kBucketAlignment) + 1) * kBucketAlignment;
- }
-
- std::unordered_map<const void *, void *,
- UntypedKeyHashFunctor,
- UntypedKeyEqualityFunctor> slots_;
-
- void *buckets_;
- const std::size_t bucket_size_;
- std::size_t num_buckets_;
- std::size_t buckets_allocated_;
-
- StorageManager *storage_manager_;
- MutableBlobReference blob_;
-
- DISALLOW_COPY_AND_ASSIGN(ThreadPrivateAggregationHashTable);
-};
-
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_STORAGE_AGGREGATION_HASH_TABLE_HPP_
-
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index fe16fc4..50e7c06 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -34,13 +34,11 @@
#include "expressions/aggregation/AggregateFunction.hpp"
#include "expressions/aggregation/AggregateFunctionFactory.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
-#include "storage/AggregationHashTable.hpp"
#include "storage/AggregationOperationState.pb.h"
-#include "storage/HashTable.hpp"
+#include "storage/AggregationStateHashTable.hpp"
#include "storage/HashTableBase.hpp"
#include "storage/HashTableFactory.hpp"
#include "storage/InsertDestination.hpp"
@@ -88,122 +86,66 @@ AggregationOperationState::AggregationOperationState(
std::vector<AggregationHandle *> group_by_handles;
group_by_handles.clear();
- if (aggregate_functions.size() == 0) {
- // If there is no aggregation function, then it is a distinctify operation
- // on the group-by expressions.
- DCHECK_GT(group_by_list_.size(), 0u);
-
- handles_.emplace_back(new AggregationHandleDistinct());
- arguments_.push_back({});
- is_distinct_.emplace_back(false);
- group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- {1},
- handles_,
- storage_manager));
- } else {
- // Set up each individual aggregate in this operation.
- std::vector<const AggregateFunction *>::const_iterator agg_func_it =
- aggregate_functions.begin();
- std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
- args_it = arguments_.begin();
- std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
- std::vector<HashTableImplType>::const_iterator
- distinctify_hash_table_impl_types_it =
- distinctify_hash_table_impl_types.begin();
- std::vector<std::size_t> payload_sizes;
- for (; agg_func_it != aggregate_functions.end();
- ++agg_func_it, ++args_it, ++is_distinct_it) {
- // Get the Types of this aggregate's arguments so that we can create an
- // AggregationHandle.
- std::vector<const Type *> argument_types;
- for (const std::unique_ptr<const Scalar> &argument : *args_it) {
- argument_types.emplace_back(&argument->getType());
- }
-
- // Sanity checks: aggregate function exists and can apply to the specified
- // arguments.
- DCHECK(*agg_func_it != nullptr);
- DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
-
- // Have the AggregateFunction create an AggregationHandle that we can use
- // to do actual aggregate computation.
- handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
+ // Set up each individual aggregate in this operation.
+ for (std::size_t i = 0; i < aggregate_functions.size(); ++i) {
+ // Get the Types of this aggregate's arguments so that we can create an
+ // AggregationHandle.
+ std::vector<const Type *> argument_types;
+ for (const std::unique_ptr<const Scalar> &argument : arguments[i]) {
+ argument_types.emplace_back(&argument->getType());
+ }
- if (!group_by_list_.empty()) {
- // Aggregation with GROUP BY: combined payload is partially updated in
- // the presence of DISTINCT.
- if (*is_distinct_it) {
- handles_.back()->blockUpdate();
- }
- group_by_handles.emplace_back(handles_.back());
- payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
- } else {
- // Aggregation without GROUP BY: create a single global state.
- single_states_.emplace_back(handles_.back()->createInitialState());
+ // Sanity checks: aggregate function exists and can apply to the specified
+ // arguments.
+ const AggregateFunction *agg_func = aggregate_functions[i];
+ DCHECK(agg_func != nullptr);
+ DCHECK(agg_func->canApplyToTypes(argument_types));
+
+ // Have the AggregateFunction create an AggregationHandle that we can use
+ // to do actual aggregate computation.
+ handles_.emplace_back(agg_func->createHandle(argument_types));
+
+ if (!group_by_list_.empty()) {
+ // TODO(jianqiao): handle DISTINCT aggregation.
+ // if (is_distinct[i]) {
+ // }
+ group_by_handles.emplace_back(handles_.back());
+ } else {
+ // Aggregation without GROUP BY: create a single global state.
+ single_states_.emplace_back(handles_.back()->createInitialState());
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // See if all of this aggregate's arguments are attributes in the input
- // relation. If so, remember the attribute IDs so that we can do copy
- // elision when actually performing the aggregation.
- std::vector<attribute_id> local_arguments_as_attributes;
- local_arguments_as_attributes.reserve(args_it->size());
- for (const std::unique_ptr<const Scalar> &argument : *args_it) {
- const attribute_id argument_id =
- argument->getAttributeIdForValueAccessor();
- if (argument_id == -1) {
- local_arguments_as_attributes.clear();
- break;
- } else {
- DCHECK_EQ(input_relation_.getID(),
- argument->getRelationIdForValueAccessor());
- local_arguments_as_attributes.push_back(argument_id);
- }
+ // See if all of this aggregate's arguments are attributes in the input
+ // relation. If so, remember the attribute IDs so that we can do copy
+ // elision when actually performing the aggregation.
+ std::vector<attribute_id> local_arguments_as_attributes;
+ local_arguments_as_attributes.reserve(arguments[i].size());
+ for (const std::unique_ptr<const Scalar> &argument : arguments[i]) {
+ const attribute_id argument_id =
+ argument->getAttributeIdForValueAccessor();
+ if (argument_id == -1) {
+ local_arguments_as_attributes.clear();
+ break;
+ } else {
+ DCHECK_EQ(input_relation_.getID(),
+ argument->getRelationIdForValueAccessor());
+ local_arguments_as_attributes.push_back(argument_id);
}
-
- arguments_as_attributes_.emplace_back(
- std::move(local_arguments_as_attributes));
-#endif
}
- // Initialize the corresponding distinctify hash table if this is a
- // DISTINCT
- // aggregation.
- if (*is_distinct_it) {
- std::vector<const Type *> key_types(group_by_types);
- key_types.insert(
- key_types.end(), argument_types.begin(), argument_types.end());
- // TODO(jianqiao): estimated_num_entries is quite inaccurate for
- // estimating
- // the number of entries in the distinctify hash table. We may estimate
- // for each distinct aggregation an estimated_num_distinct_keys value
- // during
- // query optimization, if it worths.
- distinctify_hashtables_.emplace_back(
- AggregationStateFastHashTableFactory::CreateResizable(
- *distinctify_hash_table_impl_types_it,
- key_types,
- estimated_num_entries,
- {0},
- {},
- storage_manager));
- ++distinctify_hash_table_impl_types_it;
- } else {
- distinctify_hashtables_.emplace_back(nullptr);
- }
+ arguments_as_attributes_.emplace_back(
+ std::move(local_arguments_as_attributes));
+#endif
}
+ }
- if (!group_by_handles.empty()) {
- // Aggregation with GROUP BY: create a HashTable pool for per-group
- // states.
- group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- payload_sizes,
- group_by_handles,
- storage_manager));
- }
+ if (!group_by_handles.empty()) {
+ // Aggregation with GROUP BY: create a HashTable pool for per-group states.
+ group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types,
+ group_by_handles,
+ storage_manager));
}
}
@@ -352,12 +294,12 @@ void AggregationOperationState::finalizeAggregate(
}
void AggregationOperationState::mergeSingleState(
- const std::vector<std::unique_ptr<AggregationState>> &local_state) {
+ const std::vector<ScopedBuffer> &local_state) {
DEBUG_ASSERT(local_state.size() == single_states_.size());
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (!is_distinct_[agg_idx]) {
- handles_[agg_idx]->mergeStates(*local_state[agg_idx],
- single_states_[agg_idx].get());
+ handles_[agg_idx]->mergeStates(single_states_[agg_idx].get(),
+ local_state[agg_idx].get());
}
}
}
@@ -365,7 +307,7 @@ void AggregationOperationState::mergeSingleState(
void AggregationOperationState::aggregateBlockSingleState(
const block_id input_block) {
// Aggregate per-block state for each aggregate.
- std::vector<std::unique_ptr<AggregationState>> local_state;
+ std::vector<ScopedBuffer> local_state;
BlockReference block(
storage_manager_->getBlock(input_block, input_relation_));
@@ -386,14 +328,7 @@ void AggregationOperationState::aggregateBlockSingleState(
// Call StorageBlock::aggregateDistinct() to put the arguments as keys
// directly into the (threadsafe) shared global distinctify HashTable
// for this aggregate.
- block->aggregateDistinct(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- {}, /* group_by */
- predicate_.get(),
- distinctify_hashtables_[agg_idx].get(),
- &reuse_matches,
- nullptr /* reuse_group_by_vectors */);
+ // TODO(jianqiao): handle DISTINCT aggregation.
local_state.emplace_back(nullptr);
} else {
// Call StorageBlock::aggregate() to actually do the aggregation.
@@ -426,18 +361,10 @@ void AggregationOperationState::aggregateBlockHashTable(
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
- // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
- // expression
+ // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression
// values and the aggregation arguments together as keys directly into the
// (threadsafe) shared global distinctify HashTable for this aggregate.
- block->aggregateDistinct(*handles_[agg_idx],
- arguments_[agg_idx],
- nullptr, /* arguments_as_attributes */
- group_by_list_,
- predicate_.get(),
- distinctify_hashtables_[agg_idx].get(),
- &reuse_matches,
- &reuse_group_by_vectors);
+ // TODO(jianqiao): handle DISTINCT aggregation.
}
}
@@ -445,16 +372,13 @@ void AggregationOperationState::aggregateBlockHashTable(
// directly into the (threadsafe) shared global HashTable for this
// aggregate.
DCHECK(group_by_hashtable_pool_ != nullptr);
- AggregationStateHashTableBase *agg_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
+ auto *agg_hash_table = group_by_hashtable_pool_->getHashTable();
DCHECK(agg_hash_table != nullptr);
block->aggregateGroupBy(arguments_,
group_by_list_,
predicate_.get(),
agg_hash_table,
- group_by_hashtable_pool_->createNewThreadPrivateHashTable(),
-// nullptr,
&reuse_matches,
&reuse_group_by_vectors);
group_by_hashtable_pool_->returnHashTable(agg_hash_table);
@@ -468,23 +392,23 @@ void AggregationOperationState::finalizeSingleState(
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
- single_states_[agg_idx].reset(
- handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(
- *distinctify_hashtables_[agg_idx]));
+ // TODO(jianqiao): handle DISTINCT aggregation
}
attribute_values.emplace_back(
- handles_[agg_idx]->finalize(*single_states_[agg_idx]));
+ handles_[agg_idx]->finalize(single_states_[agg_idx].get()));
}
output_destination->insertTuple(Tuple(std::move(attribute_values)));
}
void AggregationOperationState::mergeGroupByHashTables(
- AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
- HashTableMergerFast merger(dst);
- (static_cast<FastHashTable<true, false, true, false> *>(src))
- ->forEachCompositeKeyFast(&merger);
+ AggregationStateHashTableBase *destination_hash_table,
+ const AggregationStateHashTableBase *source_hash_table) {
+ static_cast<ThreadPrivateAggregationStateHashTable *>(
+ destination_hash_table)->mergeHashTable(
+ static_cast<const ThreadPrivateAggregationStateHashTable *>(
+ source_hash_table));
}
void AggregationOperationState::finalizeHashTable(
@@ -501,103 +425,22 @@ void AggregationOperationState::finalizeHashTable(
// e.g. Keep merging entries from smaller hash tables to larger.
auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- if (hash_tables->size() > 1) {
- for (int hash_table_index = 0;
- hash_table_index < static_cast<int>(hash_tables->size() - 1);
- ++hash_table_index) {
- // Merge each hash table to the last hash table.
- mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
- hash_tables->back().get());
- }
- }
-
- // Collect per-aggregate finalized values.
- std::vector<std::unique_ptr<ColumnVector>> final_values;
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- if (is_distinct_[agg_idx]) {
- DCHECK(group_by_hashtable_pool_ != nullptr);
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(hash_tables != nullptr);
- if (hash_tables->empty()) {
- // We may have a case where hash_tables is empty, e.g. no input blocks.
- // However for aggregateOnDistinctifyHashTableForGroupBy to work
- // correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- group_by_hashtable_pool_->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- DCHECK(hash_tables->back() != nullptr);
- AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
- DCHECK(agg_hash_table != nullptr);
- handles_[agg_idx]->allowUpdate();
- handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
- *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
- }
-
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(hash_tables != nullptr);
- if (hash_tables->empty()) {
- // We may have a case where hash_tables is empty, e.g. no input blocks.
- // However for aggregateOnDistinctifyHashTableForGroupBy to work
- // correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- group_by_hashtable_pool_->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
- DCHECK(agg_hash_table != nullptr);
- ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
- *agg_hash_table, &group_by_keys, agg_idx);
- if (agg_result_col != nullptr) {
- final_values.emplace_back(agg_result_col);
- }
- }
-
- // Reorganize 'group_by_keys' in column-major order so that we can make a
- // ColumnVectorsValueAccessor to bulk-insert results.
- //
- // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal
- // if there is only one aggregate. The need to do this should hopefully go
- // away when we work out storing composite structures for multiple aggregates
- // in a single HashTable.
- std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
- std::size_t group_by_element_idx = 0;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- const Type &group_by_type = group_by_element->getType();
- if (NativeColumnVector::UsableForType(group_by_type)) {
- NativeColumnVector *element_cv =
- new NativeColumnVector(group_by_type, group_by_keys.size());
- group_by_cvs.emplace_back(element_cv);
- for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(
- std::move(group_key[group_by_element_idx]));
- }
- } else {
- IndirectColumnVector *element_cv =
- new IndirectColumnVector(group_by_type, group_by_keys.size());
- group_by_cvs.emplace_back(element_cv);
- for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(
- std::move(group_key[group_by_element_idx]));
- }
- }
- ++group_by_element_idx;
+ if (hash_tables->size() == 0) {
+ return;
}
- // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys
- // and the finalized aggregates.
- ColumnVectorsValueAccessor complete_result;
- for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) {
- complete_result.addColumn(group_by_cv.release());
- }
- for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) {
- complete_result.addColumn(final_value_cv.release());
+ std::unique_ptr<AggregationStateHashTableBase> final_hash_table(
+ hash_tables->back().release());
+ for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+ std::unique_ptr<AggregationStateHashTableBase> hash_table(
+ hash_tables->at(i).release());
+ mergeGroupByHashTables(final_hash_table.get(), hash_table.get());
}
// Bulk-insert the complete result.
- output_destination->bulkInsertTuples(&complete_result);
+ std::unique_ptr<AggregationResultIterator> results(
+ final_hash_table->createResultIterator());
+ output_destination->bulkInsertAggregationResults(results.get());
}
} // namespace quickstep