You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/20 20:30:35 UTC
[7/8] incubator-quickstep git commit: Initial commit for QUICKSTEP-28
and QUICKSTEP-29.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 7e38473..5fb9f44 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -28,6 +28,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
#include "storage/HashTableBase.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
@@ -55,25 +56,24 @@ class AggregationStateMax : public AggregationState {
/**
* @brief Copy constructor (ignores mutex).
*/
- AggregationStateMax(const AggregationStateMax &orig)
- : max_(orig.max_) {
- }
+ AggregationStateMax(const AggregationStateMax &orig) : max_(orig.max_) {}
/**
* @brief Destructor.
*/
- ~AggregationStateMax() override {};
+ ~AggregationStateMax() override{};
+
+ const std::uint8_t* getPayloadAddress() const {
+ return reinterpret_cast<const uint8_t *>(&max_);
+ }
private:
friend class AggregationHandleMax;
explicit AggregationStateMax(const Type &type)
- : max_(type.getNullableVersion().makeNullValue()) {
- }
+ : max_(type.getNullableVersion().makeNullValue()) {}
- explicit AggregationStateMax(TypedValue &&value)
- : max_(std::move(value)) {
- }
+ explicit AggregationStateMax(TypedValue &&value) : max_(std::move(value)) {}
TypedValue max_;
SpinMutex mutex_;
@@ -84,8 +84,7 @@ class AggregationStateMax : public AggregationState {
**/
class AggregationHandleMax : public AggregationConcreteHandle {
public:
- ~AggregationHandleMax() override {
- }
+ ~AggregationHandleMax() override {}
AggregationState* createInitialState() const override {
return new AggregationStateMax(type_);
@@ -93,20 +92,46 @@ class AggregationHandleMax : public AggregationConcreteHandle {
AggregationStateHashTableBase* createGroupByHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
+ const std::vector<const Type *> &group_by_types,
const std::size_t estimated_num_groups,
StorageManager *storage_manager) const override;
/**
* @brief Iterate with max aggregation state.
*/
- inline void iterateUnaryInl(AggregationStateMax *state, const TypedValue &value) const {
+ inline void iterateUnaryInl(AggregationStateMax *state,
+ const TypedValue &value) const {
DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
- compareAndUpdate(static_cast<AggregationStateMax*>(state), value);
+ compareAndUpdate(static_cast<AggregationStateMax *>(state), value);
+ }
+
+ inline void iterateUnaryInlFast(const TypedValue &value,
+ std::uint8_t *byte_ptr) const {
+ DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
+ TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+ compareAndUpdateFast(max_ptr, value);
+ }
+
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
+ if (!block_update_) {
+ iterateUnaryInlFast(argument, byte_ptr);
+ }
+ }
+
+ void blockUpdate() override { block_update_ = true; }
+
+ void allowUpdate() override { block_update_ = false; }
+
+ void initPayload(std::uint8_t *byte_ptr) const override {
+ TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+ TypedValue t1 = (type_.getNullableVersion().makeNullValue());
+ *max_ptr = t1;
}
AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
+ const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+ const override;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
AggregationState* accumulateValueAccessor(
@@ -123,37 +148,49 @@ class AggregationHandleMax : public AggregationConcreteHandle {
void mergeStates(const AggregationState &source,
AggregationState *destination) const override;
+ void mergeStatesFast(const std::uint8_t *source,
+ std::uint8_t *destination) const override;
+
TypedValue finalize(const AggregationState &state) const override {
- return TypedValue(static_cast<const AggregationStateMax&>(state).max_);
+ return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
+ }
+
+ inline TypedValue finalizeHashTableEntry(
+ const AggregationState &state) const {
+ return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
}
- inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
- return TypedValue(static_cast<const AggregationStateMax&>(state).max_);
+ inline TypedValue finalizeHashTableEntryFast(
+ const std::uint8_t *byte_ptr) const {
+ const TypedValue *max_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
+ return TypedValue(*max_ptr);
}
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const override;
/**
- * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
+ * @brief Implementation of
+ * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
* for MAX aggregation.
*/
AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const override;
-
+ const AggregationStateHashTableBase &distinctify_hash_table)
+ const override;
/**
- * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
+ * @brief Implementation of
+ * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
* for MAX aggregation.
*/
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const override;
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const override;
- void mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const override;
+ std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
private:
friend class AggregateFunctionMax;
@@ -166,24 +203,37 @@ class AggregationHandleMax : public AggregationConcreteHandle {
explicit AggregationHandleMax(const Type &type);
/**
- * @brief compare the value with max_ and update it if the value is larger than
- * current maximum. NULLs are ignored.
+ * @brief compare the value with max_ and update it if the value is larger
+ * than current maximum. NULLs are ignored.
*
* @param value A TypedValue to compare
**/
- inline void compareAndUpdate(AggregationStateMax *state, const TypedValue &value) const {
+ inline void compareAndUpdate(AggregationStateMax *state,
+ const TypedValue &value) const {
// TODO(chasseur): Avoid null-checks when aggregating a non-nullable Type.
if (value.isNull()) return;
SpinMutexLock lock(state->mutex_);
- if (state->max_.isNull() || fast_comparator_->compareTypedValues(value, state->max_)) {
+ if (state->max_.isNull() ||
+ fast_comparator_->compareTypedValues(value, state->max_)) {
state->max_ = value;
}
}
+ inline void compareAndUpdateFast(TypedValue *max_ptr,
+ const TypedValue &value) const {
+ if (value.isNull()) return;
+ if (max_ptr->isNull() ||
+ fast_comparator_->compareTypedValues(value, *max_ptr)) {
+ *max_ptr = value;
+ }
+ }
+
const Type &type_;
std::unique_ptr<UncheckedComparator> fast_comparator_;
+ bool block_update_;
+
DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index e860d8d..a07f299 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -39,22 +39,19 @@ namespace quickstep {
class StorageManager;
AggregationHandleMin::AggregationHandleMin(const Type &type)
- : type_(type) {
- fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kLess)
- .makeUncheckedComparatorForTypes(type,
- type.getNonNullableVersion()));
+ : type_(type), block_update_(false) {
+ fast_comparator_.reset(
+ ComparisonFactory::GetComparison(ComparisonID::kLess)
+ .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
}
AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
+ const std::vector<const Type *> &group_by_types,
const std::size_t estimated_num_groups,
StorageManager *storage_manager) const {
return AggregationStateHashTableFactory<AggregationStateMin>::CreateResizable(
- hash_table_impl,
- group_by_types,
- estimated_num_groups,
- storage_manager);
+ hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
}
AggregationState* AggregationHandleMin::accumulateColumnVectors(
@@ -62,9 +59,8 @@ AggregationState* AggregationHandleMin::accumulateColumnVectors(
DCHECK_EQ(1u, column_vectors.size())
<< "Got wrong number of ColumnVectors for MIN: " << column_vectors.size();
- return new AggregationStateMin(
- fast_comparator_->accumulateColumnVector(type_.getNullableVersion().makeNullValue(),
- *column_vectors.front()));
+ return new AggregationStateMin(fast_comparator_->accumulateColumnVector(
+ type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
}
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -74,10 +70,10 @@ AggregationState* AggregationHandleMin::accumulateValueAccessor(
DCHECK_EQ(1u, accessor_ids.size())
<< "Got wrong number of attributes for MIN: " << accessor_ids.size();
- return new AggregationStateMin(
- fast_comparator_->accumulateValueAccessor(type_.getNullableVersion().makeNullValue(),
- accessor,
- accessor_ids.front()));
+ return new AggregationStateMin(fast_comparator_->accumulateValueAccessor(
+ type_.getNullableVersion().makeNullValue(),
+ accessor,
+ accessor_ids.front()));
}
#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -88,66 +84,55 @@ void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
AggregationStateHashTableBase *hash_table) const {
DCHECK_EQ(1u, argument_ids.size())
<< "Got wrong number of arguments for MIN: " << argument_ids.size();
-
- aggregateValueAccessorIntoHashTableUnaryHelper<
- AggregationHandleMin,
- AggregationStateMin,
- AggregationStateHashTable<AggregationStateMin>>(
- accessor,
- argument_ids.front(),
- group_by_key_ids,
- AggregationStateMin(type_),
- hash_table);
}
-void AggregationHandleMin::mergeStates(
- const AggregationState &source,
- AggregationState *destination) const {
- const AggregationStateMin &min_source = static_cast<const AggregationStateMin&>(source);
- AggregationStateMin *min_destination = static_cast<AggregationStateMin*>(destination);
+void AggregationHandleMin::mergeStates(const AggregationState &source,
+ AggregationState *destination) const {
+ const AggregationStateMin &min_source =
+ static_cast<const AggregationStateMin &>(source);
+ AggregationStateMin *min_destination =
+ static_cast<AggregationStateMin *>(destination);
if (!min_source.min_.isNull()) {
compareAndUpdate(min_destination, min_source.min_);
}
}
+void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source,
+ std::uint8_t *destination) const {
+ const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source);
+ TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination);
+
+ if (!(src_min_ptr->isNull())) {
+ compareAndUpdateFast(dst_min_ptr, *src_min_ptr);
+ }
+}
+
ColumnVector* AggregationHandleMin::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
- return finalizeHashTableHelper<AggregationHandleMin,
- AggregationStateHashTable<AggregationStateMin>>(
- type_.getNonNullableVersion(),
- hash_table,
- group_by_keys);
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ return finalizeHashTableHelperFast<AggregationHandleMin,
+ AggregationStateFastHashTable>(
+ type_.getNonNullableVersion(), hash_table, group_by_keys, index);
}
-AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
+AggregationState*
+AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
AggregationHandleMin,
- AggregationStateMin>(
- distinctify_hash_table);
+ AggregationStateMin>(distinctify_hash_table);
}
void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
AggregationHandleMin,
- AggregationStateMin,
- AggregationStateHashTable<AggregationStateMin>>(
- distinctify_hash_table,
- AggregationStateMin(type_),
- aggregation_hash_table);
-}
-
-void AggregationHandleMin::mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const {
- mergeGroupByHashTablesHelper<AggregationHandleMin,
- AggregationStateMin,
- AggregationStateHashTable<AggregationStateMin>>(
- source_hash_table, destination_hash_table);
+ AggregationStateFastHashTable>(
+ distinctify_hash_table, aggregation_hash_table, index);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 924698c..173911d 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -28,6 +28,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
#include "storage/HashTableBase.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
@@ -55,24 +56,26 @@ class AggregationStateMin : public AggregationState {
/**
* @brief Copy constructor (ignores mutex).
*/
- AggregationStateMin(const AggregationStateMin &orig)
- : min_(orig.min_) {
- }
+ AggregationStateMin(const AggregationStateMin &orig) : min_(orig.min_) {}
/**
* @brief Destructor.
*/
~AggregationStateMin() override {}
+ std::size_t getPayloadSize() const { return sizeof(TypedValue); }
+
+ const std::uint8_t *getPayloadAddress() const {
+ return reinterpret_cast<const uint8_t *>(&min_);
+ }
+
private:
friend class AggregationHandleMin;
explicit AggregationStateMin(const Type &type)
: min_(type.getNullableVersion().makeNullValue()) {}
- explicit AggregationStateMin(TypedValue &&value)
- : min_(std::move(value)) {
- }
+ explicit AggregationStateMin(TypedValue &&value) : min_(std::move(value)) {}
TypedValue min_;
SpinMutex mutex_;
@@ -83,8 +86,7 @@ class AggregationStateMin : public AggregationState {
**/
class AggregationHandleMin : public AggregationConcreteHandle {
public:
- ~AggregationHandleMin() override {
- }
+ ~AggregationHandleMin() override {}
AggregationState* createInitialState() const override {
return new AggregationStateMin(type_);
@@ -92,20 +94,46 @@ class AggregationHandleMin : public AggregationConcreteHandle {
AggregationStateHashTableBase* createGroupByHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
+ const std::vector<const Type *> &group_by_types,
const std::size_t estimated_num_groups,
StorageManager *storage_manager) const override;
/**
* @brief Iterate with min aggregation state.
*/
- inline void iterateUnaryInl(AggregationStateMin *state, const TypedValue &value) const {
+ inline void iterateUnaryInl(AggregationStateMin *state,
+ const TypedValue &value) const {
DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
compareAndUpdate(state, value);
}
+ inline void iterateUnaryInlFast(const TypedValue &value,
+ std::uint8_t *byte_ptr) const {
+ DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
+ TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+ compareAndUpdateFast(min_ptr, value);
+ }
+
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
+ if (!block_update_) {
+ iterateUnaryInlFast(argument, byte_ptr);
+ }
+ }
+
+ void blockUpdate() override { block_update_ = true; }
+
+ void allowUpdate() override { block_update_ = false; }
+
+ void initPayload(std::uint8_t *byte_ptr) const override {
+ TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+ TypedValue t1 = (type_.getNullableVersion().makeNullValue());
+ *min_ptr = t1;
+ }
+
AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
+ const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+ const override;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
AggregationState* accumulateValueAccessor(
@@ -122,36 +150,49 @@ class AggregationHandleMin : public AggregationConcreteHandle {
void mergeStates(const AggregationState &source,
AggregationState *destination) const override;
+ void mergeStatesFast(const std::uint8_t *source,
+ std::uint8_t *destination) const override;
+
TypedValue finalize(const AggregationState &state) const override {
- return static_cast<const AggregationStateMin&>(state).min_;
+ return static_cast<const AggregationStateMin &>(state).min_;
+ }
+
+ inline TypedValue finalizeHashTableEntry(
+ const AggregationState &state) const {
+ return static_cast<const AggregationStateMin &>(state).min_;
}
- inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
- return static_cast<const AggregationStateMin&>(state).min_;
+ inline TypedValue finalizeHashTableEntryFast(
+ const std::uint8_t *byte_ptr) const {
+ const TypedValue *min_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
+ return TypedValue(*min_ptr);
}
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const override;
/**
- * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
+ * @brief Implementation of
+ * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
* for MIN aggregation.
*/
AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const override;
+ const AggregationStateHashTableBase &distinctify_hash_table)
+ const override;
/**
- * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
+ * @brief Implementation of
+ * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
* for MIN aggregation.
*/
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const override;
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const override;
- void mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const override;
+ std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
private:
friend class AggregateFunctionMin;
@@ -164,23 +205,36 @@ class AggregationHandleMin : public AggregationConcreteHandle {
explicit AggregationHandleMin(const Type &type);
/**
- * @brief compare the value with min_ and update it if the value is smaller than
- * current minimum. NULLs are ignored.
+ * @brief compare the value with min_ and update it if the value is smaller
+ * than current minimum. NULLs are ignored.
*
* @param value A TypedValue to compare.
**/
- inline void compareAndUpdate(AggregationStateMin *state, const TypedValue &value) const {
+ inline void compareAndUpdate(AggregationStateMin *state,
+ const TypedValue &value) const {
if (value.isNull()) return;
SpinMutexLock lock(state->mutex_);
- if (state->min_.isNull() || fast_comparator_->compareTypedValues(value, state->min_)) {
+ if (state->min_.isNull() ||
+ fast_comparator_->compareTypedValues(value, state->min_)) {
state->min_ = value;
}
}
+ inline void compareAndUpdateFast(TypedValue *min_ptr,
+ const TypedValue &value) const {
+ if (value.isNull()) return;
+ if (min_ptr->isNull() ||
+ fast_comparator_->compareTypedValues(value, *min_ptr)) {
+ *min_ptr = value;
+ }
+ }
+
const Type &type_;
std::unique_ptr<UncheckedComparator> fast_comparator_;
+ bool block_update_;
+
DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index b5036a8..642d88d 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -43,7 +43,7 @@ namespace quickstep {
class StorageManager;
AggregationHandleSum::AggregationHandleSum(const Type &type)
- : argument_type_(type) {
+ : argument_type_(type), block_update_(false) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_precision_id;
@@ -66,11 +66,13 @@ AggregationHandleSum::AggregationHandleSum(const Type &type)
// Make operators to do arithmetic:
// Add operator for summing argument values.
- fast_operator_.reset(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
- .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_));
+ fast_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+ .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_));
// Add operator for merging states.
- merge_operator_.reset(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
- .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
+ merge_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+ .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
// Result is nullable, because SUM() over 0 values (or all NULL values) is
// NULL.
@@ -79,26 +81,20 @@ AggregationHandleSum::AggregationHandleSum(const Type &type)
AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
+ const std::vector<const Type *> &group_by_types,
const std::size_t estimated_num_groups,
StorageManager *storage_manager) const {
return AggregationStateHashTableFactory<AggregationStateSum>::CreateResizable(
- hash_table_impl,
- group_by_types,
- estimated_num_groups,
- storage_manager);
+ hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
}
AggregationState* AggregationHandleSum::accumulateColumnVectors(
const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
DCHECK_EQ(1u, column_vectors.size())
<< "Got wrong number of ColumnVectors for SUM: " << column_vectors.size();
-
std::size_t num_tuples = 0;
TypedValue cv_sum = fast_operator_->accumulateColumnVector(
- blank_state_.sum_,
- *column_vectors.front(),
- &num_tuples);
+ blank_state_.sum_, *column_vectors.front(), &num_tuples);
return new AggregationStateSum(std::move(cv_sum), num_tuples == 0);
}
@@ -111,10 +107,7 @@ AggregationState* AggregationHandleSum::accumulateValueAccessor(
std::size_t num_tuples = 0;
TypedValue va_sum = fast_operator_->accumulateValueAccessor(
- blank_state_.sum_,
- accessor,
- accessor_ids.front(),
- &num_tuples);
+ blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples);
return new AggregationStateSum(std::move(va_sum), num_tuples == 0);
}
#endif
@@ -126,32 +119,39 @@ void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
AggregationStateHashTableBase *hash_table) const {
DCHECK_EQ(1u, argument_ids.size())
<< "Got wrong number of arguments for SUM: " << argument_ids.size();
-
- aggregateValueAccessorIntoHashTableUnaryHelper<
- AggregationHandleSum,
- AggregationStateSum,
- AggregationStateHashTable<AggregationStateSum>>(
- accessor,
- argument_ids.front(),
- group_by_key_ids,
- blank_state_,
- hash_table);
}
-void AggregationHandleSum::mergeStates(
- const AggregationState &source,
- AggregationState *destination) const {
- const AggregationStateSum &sum_source = static_cast<const AggregationStateSum&>(source);
- AggregationStateSum *sum_destination = static_cast<AggregationStateSum*>(destination);
+void AggregationHandleSum::mergeStates(const AggregationState &source,
+ AggregationState *destination) const {
+ const AggregationStateSum &sum_source =
+ static_cast<const AggregationStateSum &>(source);
+ AggregationStateSum *sum_destination =
+ static_cast<AggregationStateSum *>(destination);
SpinMutexLock lock(sum_destination->mutex_);
- sum_destination->sum_ = merge_operator_->applyToTypedValues(sum_destination->sum_,
- sum_source.sum_);
+ sum_destination->sum_ = merge_operator_->applyToTypedValues(
+ sum_destination->sum_, sum_source.sum_);
sum_destination->null_ = sum_destination->null_ && sum_source.null_;
}
+void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source,
+ std::uint8_t *destination) const {
+ const TypedValue *src_sum_ptr =
+ reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
+ const bool *src_null_ptr =
+ reinterpret_cast<const bool *>(source + blank_state_.null_offset_);
+ TypedValue *dst_sum_ptr =
+ reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_);
+ bool *dst_null_ptr =
+ reinterpret_cast<bool *>(destination + blank_state_.null_offset_);
+ *dst_sum_ptr =
+ merge_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
+ *dst_null_ptr = (*dst_null_ptr) && (*src_null_ptr);
+}
+
TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
- const AggregationStateSum &agg_state = static_cast<const AggregationStateSum&>(state);
+ const AggregationStateSum &agg_state =
+ static_cast<const AggregationStateSum &>(state);
if (agg_state.null_) {
// SUM() over no values is NULL.
return result_type_->makeNullValue();
@@ -162,41 +162,29 @@ TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
ColumnVector* AggregationHandleSum::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
- return finalizeHashTableHelper<AggregationHandleSum,
- AggregationStateHashTable<AggregationStateSum>>(
- *result_type_,
- hash_table,
- group_by_keys);
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ return finalizeHashTableHelperFast<AggregationHandleSum,
+ AggregationStateFastHashTable>(
+ *result_type_, hash_table, group_by_keys, index);
}
-AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
+AggregationState*
+AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
AggregationHandleSum,
- AggregationStateSum>(
- distinctify_hash_table);
+ AggregationStateSum>(distinctify_hash_table);
}
void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
AggregationHandleSum,
- AggregationStateSum,
- AggregationStateHashTable<AggregationStateSum>>(
- distinctify_hash_table,
- blank_state_,
- aggregation_hash_table);
-}
-
-void AggregationHandleSum::mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const {
- mergeGroupByHashTablesHelper<AggregationHandleSum,
- AggregationStateSum,
- AggregationStateHashTable<AggregationStateSum>>(
- source_hash_table, destination_hash_table);
+ AggregationStateFastHashTable>(
+ distinctify_hash_table, aggregation_hash_table, index);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index 3382646..6c334a6 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -28,6 +28,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
#include "storage/HashTableBase.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
@@ -57,25 +58,40 @@ class AggregationStateSum : public AggregationState {
*/
AggregationStateSum(const AggregationStateSum &orig)
: sum_(orig.sum_),
- null_(orig.null_) {
+ null_(orig.null_),
+ sum_offset_(orig.sum_offset_),
+ null_offset_(orig.null_offset_) {}
+
+ std::size_t getPayloadSize() const {
+ std::size_t p1 = reinterpret_cast<std::size_t>(&sum_);
+ std::size_t p2 = reinterpret_cast<std::size_t>(&mutex_);
+ return (p2 - p1);
+ }
+
+ const std::uint8_t* getPayloadAddress() const {
+ return reinterpret_cast<const uint8_t *>(&sum_);
}
private:
friend class AggregationHandleSum;
AggregationStateSum()
- : sum_(0), null_(true) {
- }
+ : sum_(0),
+ null_(true),
+ sum_offset_(0),
+ null_offset_(reinterpret_cast<std::uint8_t *>(&null_) -
+ reinterpret_cast<std::uint8_t *>(&sum_)) {}
AggregationStateSum(TypedValue &&sum, const bool is_null)
- : sum_(std::move(sum)), null_(is_null) {
- }
+ : sum_(std::move(sum)), null_(is_null) {}
// TODO(shoban): We might want to specialize sum_ to use atomics for int types
// similar to in AggregationStateCount.
TypedValue sum_;
bool null_;
SpinMutex mutex_;
+
+ int sum_offset_, null_offset_;
};
/**
@@ -83,8 +99,7 @@ class AggregationStateSum : public AggregationState {
**/
class AggregationHandleSum : public AggregationConcreteHandle {
public:
- ~AggregationHandleSum() override {
- }
+ ~AggregationHandleSum() override {}
AggregationState* createInitialState() const override {
return new AggregationStateSum(blank_state_);
@@ -92,11 +107,12 @@ class AggregationHandleSum : public AggregationConcreteHandle {
AggregationStateHashTableBase* createGroupByHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
+ const std::vector<const Type *> &group_by_types,
const std::size_t estimated_num_groups,
StorageManager *storage_manager) const override;
- inline void iterateUnaryInl(AggregationStateSum *state, const TypedValue &value) const {
+ inline void iterateUnaryInl(AggregationStateSum *state,
+ const TypedValue &value) const {
DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
if (value.isNull()) return;
@@ -105,8 +121,41 @@ class AggregationHandleSum : public AggregationConcreteHandle {
state->null_ = false;
}
+ inline void iterateUnaryInlFast(const TypedValue &value,
+ std::uint8_t *byte_ptr) const {
+ DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
+ if (value.isNull()) return;
+ TypedValue *sum_ptr =
+ reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+ bool *null_ptr =
+ reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
+ *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value);
+ *null_ptr = false;
+ }
+
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
+ if (!block_update_) {
+ iterateUnaryInlFast(argument, byte_ptr);
+ }
+ }
+
+ void blockUpdate() override { block_update_ = true; }
+
+ void allowUpdate() override { block_update_ = false; }
+
+ void initPayload(std::uint8_t *byte_ptr) const override {
+ TypedValue *sum_ptr =
+ reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+ bool *null_ptr =
+ reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
+ *sum_ptr = blank_state_.sum_;
+ *null_ptr = true;
+ }
+
AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
+ const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+ const override;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
AggregationState* accumulateValueAccessor(
@@ -123,34 +172,51 @@ class AggregationHandleSum : public AggregationConcreteHandle {
void mergeStates(const AggregationState &source,
AggregationState *destination) const override;
+ void mergeStatesFast(const std::uint8_t *source,
+ std::uint8_t *destination) const override;
+
TypedValue finalize(const AggregationState &state) const override;
- inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
- return static_cast<const AggregationStateSum&>(state).sum_;
+ inline TypedValue finalizeHashTableEntry(
+ const AggregationState &state) const {
+ return static_cast<const AggregationStateSum &>(state).sum_;
+ }
+
+ inline TypedValue finalizeHashTableEntryFast(
+ const std::uint8_t *byte_ptr) const {
+ std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
+ TypedValue *sum_ptr =
+ reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_);
+ return *sum_ptr;
}
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const override;
/**
- * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
+ * @brief Implementation of
+ * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
* for SUM aggregation.
*/
AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const override;
+ const AggregationStateHashTableBase &distinctify_hash_table)
+ const override;
/**
- * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
+ * @brief Implementation of
+ * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
* for SUM aggregation.
*/
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const override;
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const override;
- void mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const override;
+ std::size_t getPayloadSize() const override {
+ return blank_state_.getPayloadSize();
+ }
private:
friend class AggregateFunctionSum;
@@ -168,6 +234,8 @@ class AggregationHandleSum : public AggregationConcreteHandle {
std::unique_ptr<UncheckedBinaryOperator> fast_operator_;
std::unique_ptr<UncheckedBinaryOperator> merge_operator_;
+ bool block_update_;
+
DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 888d95c..e9503f7 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -146,9 +146,11 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
glog
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
+ quickstep_threading_SpinMutex
quickstep_types_TypedValue
quickstep_types_containers_ColumnVector
quickstep_utility_Macros)
@@ -163,6 +165,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
@@ -180,6 +183,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
@@ -204,6 +208,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
@@ -220,6 +225,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
@@ -236,6 +242,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
@@ -292,6 +299,7 @@ target_link_libraries(AggregationHandle_tests
quickstep_expressions_aggregation_AggregationHandleMin
quickstep_expressions_aggregation_AggregationHandleSum
quickstep_expressions_aggregation_AggregationID
+ quickstep_storage_AggregationOperationState
quickstep_storage_HashTableBase
quickstep_storage_StorageManager
quickstep_types_CharType
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
index afc02ec..79d4448 100644
--- a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
@@ -28,6 +28,8 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/aggregation/AggregationHandleAvg.hpp"
#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/FastHashTableFactory.hpp"
#include "storage/StorageManager.hpp"
#include "types/CharType.hpp"
#include "types/DateOperatorOverloads.hpp"
@@ -53,51 +55,56 @@
namespace quickstep {
-class AggregationHandleAvgTest : public::testing::Test {
+class AggregationHandleAvgTest : public ::testing::Test {
protected:
static const int kNumSamples = 100;
// Helper method that calls AggregationHandleAvg::iterateUnaryInl() to
// aggregate 'value' into '*state'.
void iterateHandle(AggregationState *state, const TypedValue &value) {
- static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_).iterateUnaryInl(
- static_cast<AggregationStateAvg*>(state),
- value);
+ static_cast<const AggregationHandleAvg &>(*aggregation_handle_avg_)
+ .iterateUnaryInl(static_cast<AggregationStateAvg *>(state), value);
}
void initializeHandle(const Type &type) {
aggregation_handle_avg_.reset(
- AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
- std::vector<const Type*>(1, &type)));
+ AggregateFunctionFactory::Get(AggregationID::kAvg)
+ .createHandle(std::vector<const Type *>(1, &type)));
aggregation_handle_avg_state_.reset(
aggregation_handle_avg_->createInitialState());
}
static bool ApplyToTypesTest(TypeID typeID) {
- const Type &type = (typeID == kChar || typeID == kVarChar) ?
- TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
- TypeFactory::GetType(typeID);
+ const Type &type =
+ (typeID == kChar || typeID == kVarChar)
+ ? TypeFactory::GetType(typeID, static_cast<std::size_t>(10))
+ : TypeFactory::GetType(typeID);
- return AggregateFunctionFactory::Get(AggregationID::kAvg).canApplyToTypes(
- std::vector<const Type*>(1, &type));
+ return AggregateFunctionFactory::Get(AggregationID::kAvg)
+ .canApplyToTypes(std::vector<const Type *>(1, &type));
}
static bool ResultTypeForArgumentTypeTest(TypeID input_type_id,
TypeID output_type_id) {
- const Type *result_type
- = AggregateFunctionFactory::Get(AggregationID::kAvg).resultTypeForArgumentTypes(
- std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+ const Type *result_type =
+ AggregateFunctionFactory::Get(AggregationID::kAvg)
+ .resultTypeForArgumentTypes(std::vector<const Type *>(
+ 1, &TypeFactory::GetType(input_type_id)));
return (result_type->getTypeID() == output_type_id);
}
template <typename CppType>
- static void CheckAvgValue(
- CppType expected,
- const AggregationHandle &handle,
- const AggregationState &state) {
+ static void CheckAvgValue(CppType expected,
+ const AggregationHandle &handle,
+ const AggregationState &state) {
EXPECT_EQ(expected, handle.finalize(state).getLiteral<CppType>());
}
+ template <typename CppType>
+ static void CheckAvgValue(CppType expected, const TypedValue &value) {
+ EXPECT_EQ(expected, value.getLiteral<CppType>());
+ }
+
// Static templated method for set a meaningful value to data types.
template <typename CppType>
static void SetDataType(int value, CppType *data) {
@@ -108,7 +115,9 @@ class AggregationHandleAvgTest : public::testing::Test {
void checkAggregationAvgGeneric() {
const GenericType &type = GenericType::Instance(true);
initializeHandle(type);
- EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+ EXPECT_TRUE(
+ aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_)
+ .isNull());
typename GenericType::cpptype val;
typename GenericType::cpptype sum;
@@ -119,15 +128,16 @@ class AggregationHandleAvgTest : public::testing::Test {
if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
SetDataType(i - 10, &val);
} else {
- SetDataType(static_cast<float>(i - 10)/10, &val);
+ SetDataType(static_cast<float>(i - 10) / 10, &val);
}
iterateHandle(aggregation_handle_avg_state_.get(), type.makeValue(&val));
sum += val;
}
iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue());
- CheckAvgValue<typename OutputType::cpptype>(static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
- *aggregation_handle_avg_,
- *aggregation_handle_avg_state_);
+ CheckAvgValue<typename OutputType::cpptype>(
+ static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+ *aggregation_handle_avg_,
+ *aggregation_handle_avg_state_);
// Test mergeStates().
std::unique_ptr<AggregationState> merge_state(
@@ -140,7 +150,7 @@ class AggregationHandleAvgTest : public::testing::Test {
if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
SetDataType(i - 10, &val);
} else {
- SetDataType(static_cast<float>(i - 10)/10, &val);
+ SetDataType(static_cast<float>(i - 10) / 10, &val);
}
iterateHandle(merge_state.get(), type.makeValue(&val));
sum += val;
@@ -155,7 +165,8 @@ class AggregationHandleAvgTest : public::testing::Test {
}
template <typename GenericType>
- ColumnVector *createColumnVectorGeneric(const Type &type, typename GenericType::cpptype *sum) {
+ ColumnVector* createColumnVectorGeneric(const Type &type,
+ typename GenericType::cpptype *sum) {
NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3);
typename GenericType::cpptype val;
@@ -166,12 +177,12 @@ class AggregationHandleAvgTest : public::testing::Test {
if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
SetDataType(i - 10, &val);
} else {
- SetDataType(static_cast<float>(i - 10)/10, &val);
+ SetDataType(static_cast<float>(i - 10) / 10, &val);
}
column->appendTypedValue(type.makeValue(&val));
*sum += val;
// One NULL in the middle.
- if (i == kNumSamples/2) {
+ if (i == kNumSamples / 2) {
column->appendTypedValue(type.makeNullValue());
}
}
@@ -184,12 +195,15 @@ class AggregationHandleAvgTest : public::testing::Test {
void checkAggregationAvgGenericColumnVector() {
const GenericType &type = GenericType::Instance(true);
initializeHandle(type);
- EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+ EXPECT_TRUE(
+ aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_)
+ .isNull());
typename GenericType::cpptype sum;
SetDataType(0, &sum);
std::vector<std::unique_ptr<ColumnVector>> column_vectors;
- column_vectors.emplace_back(createColumnVectorGeneric<GenericType>(type, &sum));
+ column_vectors.emplace_back(
+ createColumnVectorGeneric<GenericType>(type, &sum));
std::unique_ptr<AggregationState> cv_state(
aggregation_handle_avg_->accumulateColumnVectors(column_vectors));
@@ -201,7 +215,8 @@ class AggregationHandleAvgTest : public::testing::Test {
*aggregation_handle_avg_,
*cv_state);
- aggregation_handle_avg_->mergeStates(*cv_state, aggregation_handle_avg_state_.get());
+ aggregation_handle_avg_->mergeStates(*cv_state,
+ aggregation_handle_avg_state_.get());
CheckAvgValue<typename OutputType::cpptype>(
static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
*aggregation_handle_avg_,
@@ -213,16 +228,19 @@ class AggregationHandleAvgTest : public::testing::Test {
void checkAggregationAvgGenericValueAccessor() {
const GenericType &type = GenericType::Instance(true);
initializeHandle(type);
- EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+ EXPECT_TRUE(
+ aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_)
+ .isNull());
typename GenericType::cpptype sum;
SetDataType(0, &sum);
- std::unique_ptr<ColumnVectorsValueAccessor> accessor(new ColumnVectorsValueAccessor());
+ std::unique_ptr<ColumnVectorsValueAccessor> accessor(
+ new ColumnVectorsValueAccessor());
accessor->addColumn(createColumnVectorGeneric<GenericType>(type, &sum));
std::unique_ptr<AggregationState> va_state(
- aggregation_handle_avg_->accumulateValueAccessor(accessor.get(),
- std::vector<attribute_id>(1, 0)));
+ aggregation_handle_avg_->accumulateValueAccessor(
+ accessor.get(), std::vector<attribute_id>(1, 0)));
// Test the state generated directly by accumulateValueAccessor(), and also
// test after merging back.
@@ -231,7 +249,8 @@ class AggregationHandleAvgTest : public::testing::Test {
*aggregation_handle_avg_,
*va_state);
- aggregation_handle_avg_->mergeStates(*va_state, aggregation_handle_avg_state_.get());
+ aggregation_handle_avg_->mergeStates(*va_state,
+ aggregation_handle_avg_state_.get());
CheckAvgValue<typename OutputType::cpptype>(
static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
*aggregation_handle_avg_,
@@ -255,12 +274,14 @@ void AggregationHandleAvgTest::CheckAvgValue<double>(
}
template <>
-void AggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(int value, DatetimeIntervalLit *data) {
+void AggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(
+ int value, DatetimeIntervalLit *data) {
data->interval_ticks = value;
}
template <>
-void AggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(int value, YearMonthIntervalLit *data) {
+void AggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(
+ int value, YearMonthIntervalLit *data) {
data->months = value;
}
@@ -307,11 +328,13 @@ TEST_F(AggregationHandleAvgTest, DoubleTypeColumnVectorTest) {
}
TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeColumnVectorTest) {
- checkAggregationAvgGenericColumnVector<DatetimeIntervalType, DatetimeIntervalType>();
+ checkAggregationAvgGenericColumnVector<DatetimeIntervalType,
+ DatetimeIntervalType>();
}
TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeColumnVectorTest) {
- checkAggregationAvgGenericColumnVector<YearMonthIntervalType, YearMonthIntervalType>();
+ checkAggregationAvgGenericColumnVector<YearMonthIntervalType,
+ YearMonthIntervalType>();
}
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -332,11 +355,13 @@ TEST_F(AggregationHandleAvgTest, DoubleTypeValueAccessorTest) {
}
TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeValueAccessorTest) {
- checkAggregationAvgGenericValueAccessor<DatetimeIntervalType, DatetimeIntervalType>();
+ checkAggregationAvgGenericValueAccessor<DatetimeIntervalType,
+ DatetimeIntervalType>();
}
TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeValueAccessorTest) {
- checkAggregationAvgGenericValueAccessor<YearMonthIntervalType, YearMonthIntervalType>();
+ checkAggregationAvgGenericValueAccessor<YearMonthIntervalType,
+ YearMonthIntervalType>();
}
#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -365,38 +390,53 @@ TEST_F(AggregationHandleAvgDeathTest, WrongTypeTest) {
double double_val = 0;
float float_val = 0;
- iterateHandle(aggregation_handle_avg_state_.get(), int_non_null_type.makeValue(&int_val));
+ iterateHandle(aggregation_handle_avg_state_.get(),
+ int_non_null_type.makeValue(&int_val));
- EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), long_type.makeValue(&long_val)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), double_type.makeValue(&double_val)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), float_type.makeValue(&float_val)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), char_type.makeValue("asdf", 5)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), varchar_type.makeValue("asdf", 5)), "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(),
+ long_type.makeValue(&long_val)),
+ "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(),
+ double_type.makeValue(&double_val)),
+ "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(),
+ float_type.makeValue(&float_val)),
+ "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(),
+ char_type.makeValue("asdf", 5)),
+ "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(),
+ varchar_type.makeValue("asdf", 5)),
+ "");
// Test mergeStates() with incorrectly typed handles.
std::unique_ptr<AggregationHandle> aggregation_handle_avg_double(
- AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
- std::vector<const Type*>(1, &double_type)));
+ AggregateFunctionFactory::Get(AggregationID::kAvg)
+ .createHandle(std::vector<const Type *>(1, &double_type)));
std::unique_ptr<AggregationState> aggregation_state_avg_merge_double(
aggregation_handle_avg_double->createInitialState());
- static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_double).iterateUnaryInl(
- static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_double.get()),
- double_type.makeValue(&double_val));
- EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double,
- aggregation_handle_avg_state_.get()),
- "");
+ static_cast<const AggregationHandleAvg &>(*aggregation_handle_avg_double)
+ .iterateUnaryInl(static_cast<AggregationStateAvg *>(
+ aggregation_state_avg_merge_double.get()),
+ double_type.makeValue(&double_val));
+ EXPECT_DEATH(
+ aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double,
+ aggregation_handle_avg_state_.get()),
+ "");
std::unique_ptr<AggregationHandle> aggregation_handle_avg_float(
- AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
- std::vector<const Type*>(1, &float_type)));
+ AggregateFunctionFactory::Get(AggregationID::kAvg)
+ .createHandle(std::vector<const Type *>(1, &float_type)));
std::unique_ptr<AggregationState> aggregation_state_avg_merge_float(
aggregation_handle_avg_float->createInitialState());
- static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_float).iterateUnaryInl(
- static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_float.get()),
- float_type.makeValue(&float_val));
- EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float,
- aggregation_handle_avg_state_.get()),
- "");
+ static_cast<const AggregationHandleAvg &>(*aggregation_handle_avg_float)
+ .iterateUnaryInl(static_cast<AggregationStateAvg *>(
+ aggregation_state_avg_merge_float.get()),
+ float_type.makeValue(&float_val));
+ EXPECT_DEATH(
+ aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float,
+ aggregation_handle_avg_state_.get()),
+ "");
}
#endif
@@ -417,8 +457,10 @@ TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) {
EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kDouble));
EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble));
EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
- EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
- EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
+ EXPECT_TRUE(
+ ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
+ EXPECT_TRUE(
+ ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
}
TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
@@ -426,25 +468,28 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
initializeHandle(long_non_null_type);
storage_manager_.reset(new StorageManager("./test_avg_data"));
std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
- aggregation_handle_avg_->createGroupByHashTable(
- HashTableImplType::kSimpleScalarSeparateChaining,
+ AggregationStateFastHashTableFactory::CreateResizable(
+ HashTableImplType::kSeparateChaining,
std::vector<const Type *>(1, &long_non_null_type),
10,
+ {aggregation_handle_avg_.get()->getPayloadSize()},
+ {aggregation_handle_avg_.get()},
storage_manager_.get()));
std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
- aggregation_handle_avg_->createGroupByHashTable(
- HashTableImplType::kSimpleScalarSeparateChaining,
+ AggregationStateFastHashTableFactory::CreateResizable(
+ HashTableImplType::kSeparateChaining,
std::vector<const Type *>(1, &long_non_null_type),
10,
+ {aggregation_handle_avg_.get()->getPayloadSize()},
+ {aggregation_handle_avg_.get()},
storage_manager_.get()));
- AggregationStateHashTable<AggregationStateAvg> *destination_hash_table_derived =
- static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+ AggregationStateFastHashTable *destination_hash_table_derived =
+ static_cast<AggregationStateFastHashTable *>(
destination_hash_table.get());
- AggregationStateHashTable<AggregationStateAvg> *source_hash_table_derived =
- static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
- source_hash_table.get());
+ AggregationStateFastHashTable *source_hash_table_derived =
+ static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
AggregationHandleAvg *aggregation_handle_avg_derived =
static_cast<AggregationHandleAvg *>(aggregation_handle_avg_.get());
@@ -496,36 +541,56 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
exclusive_key_source_state.get(), exclusive_key_source_avg_val);
// Add the key-state pairs to the hash tables.
- source_hash_table_derived->putCompositeKey(common_key,
- *common_key_source_state);
- destination_hash_table_derived->putCompositeKey(
- common_key, *common_key_destination_state);
- source_hash_table_derived->putCompositeKey(exclusive_source_key,
- *exclusive_key_source_state);
- destination_hash_table_derived->putCompositeKey(
- exclusive_destination_key, *exclusive_key_destination_state);
+ unsigned char buffer[100];
+ buffer[0] = '\0';
+ memcpy(buffer + 1,
+ common_key_source_state.get()->getPayloadAddress(),
+ aggregation_handle_avg_.get()->getPayloadSize());
+ source_hash_table_derived->putCompositeKey(common_key, buffer);
+
+ memcpy(buffer + 1,
+ common_key_destination_state.get()->getPayloadAddress(),
+ aggregation_handle_avg_.get()->getPayloadSize());
+ destination_hash_table_derived->putCompositeKey(common_key, buffer);
+
+ memcpy(buffer + 1,
+ exclusive_key_source_state.get()->getPayloadAddress(),
+ aggregation_handle_avg_.get()->getPayloadSize());
+ source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+
+ memcpy(buffer + 1,
+ exclusive_key_destination_state.get()->getPayloadAddress(),
+ aggregation_handle_avg_.get()->getPayloadSize());
+ destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
+ buffer);
EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
EXPECT_EQ(2u, source_hash_table_derived->numEntries());
- aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table,
- destination_hash_table.get());
+ AggregationOperationState::mergeGroupByHashTables(
+ source_hash_table.get(), destination_hash_table.get());
EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
CheckAvgValue<double>(
(common_key_destination_avg_val.getLiteral<std::int64_t>() +
- common_key_source_avg_val.getLiteral<std::int64_t>()) / static_cast<double>(2),
- *aggregation_handle_avg_derived,
- *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
- CheckAvgValue<double>(exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
- *aggregation_handle_avg_derived,
- *(destination_hash_table_derived->getSingleCompositeKey(
- exclusive_destination_key)));
- CheckAvgValue<double>(exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
- *aggregation_handle_avg_derived,
- *(source_hash_table_derived->getSingleCompositeKey(
- exclusive_source_key)));
+ common_key_source_avg_val.getLiteral<std::int64_t>()) /
+ static_cast<double>(2),
+ aggregation_handle_avg_derived->finalizeHashTableEntryFast(
+ destination_hash_table_derived->getSingleCompositeKey(common_key) +
+ 1));
+ CheckAvgValue<double>(
+ exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
+ aggregation_handle_avg_derived->finalizeHashTableEntryFast(
+ destination_hash_table_derived->getSingleCompositeKey(
+ exclusive_destination_key) +
+ 1));
+ CheckAvgValue<double>(
+ exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
+ aggregation_handle_avg_derived->finalizeHashTableEntryFast(
+ source_hash_table_derived->getSingleCompositeKey(
+ exclusive_source_key) +
+ 1));
}
} // namespace quickstep