You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/05 04:25:43 UTC
[09/17] incubator-quickstep git commit: Initial commit for
QUICKSTEP-28 and QUICKSTEP-29.
Initial commit for QUICKSTEP-28 and QUICKSTEP-29.
- This PR creates a specialized hash table implementation for
aggregation.
- A hash table's bucket can store multiple AggregationStates,
corresponding to various AggregationHandles in the query. Earlier we
created one hash table for each AggregationHandle.
- All Aggregation states in a single hash table bucket are protected
using a single mutex. Earlier AggregationStates provided internal
concurrency protection (mutex).
- Single aggregationGroupBy method in StorageBlock.
- New methods for separating unary and nullary updation of AggregationState.
- Added TODO to move method from HashTableBase class.
- Added doxygen for the AggregationHandle new functions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ac3512ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ac3512ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ac3512ce
Branch: refs/heads/lip-refactor
Commit: ac3512ceb16109702116aa2a51db382dc99ba23e
Parents: 43c7a42
Author: rathijit <ra...@node-2.hashtable.quickstep-pg0.wisc.cloudlab.us>
Authored: Mon Jul 4 02:44:48 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Sep 20 15:26:52 2016 -0500
----------------------------------------------------------------------
catalog/CatalogTypedefs.hpp | 2 +
.../aggregation/AggregationConcreteHandle.cpp | 21 +-
.../aggregation/AggregationConcreteHandle.hpp | 372 +--
expressions/aggregation/AggregationHandle.hpp | 118 +-
.../aggregation/AggregationHandleAvg.cpp | 118 +-
.../aggregation/AggregationHandleAvg.hpp | 118 +-
.../aggregation/AggregationHandleCount.cpp | 180 +-
.../aggregation/AggregationHandleCount.hpp | 111 +-
.../aggregation/AggregationHandleDistinct.cpp | 5 +-
.../aggregation/AggregationHandleDistinct.hpp | 39 +-
.../aggregation/AggregationHandleMax.cpp | 98 +-
.../aggregation/AggregationHandleMax.hpp | 112 +-
.../aggregation/AggregationHandleMin.cpp | 99 +-
.../aggregation/AggregationHandleMin.hpp | 106 +-
.../aggregation/AggregationHandleSum.cpp | 112 +-
.../aggregation/AggregationHandleSum.hpp | 108 +-
expressions/aggregation/CMakeLists.txt | 8 +
.../tests/AggregationHandleAvg_unittest.cpp | 255 +-
.../tests/AggregationHandleCount_unittest.cpp | 311 ++-
.../tests/AggregationHandleMax_unittest.cpp | 382 +--
.../tests/AggregationHandleMin_unittest.cpp | 378 +--
.../tests/AggregationHandleSum_unittest.cpp | 291 +-
query_optimizer/ExecutionGenerator.cpp | 20 +-
.../tests/AggregationOperator_unittest.cpp | 3 +-
storage/AggregationOperationState.cpp | 314 ++-
storage/AggregationOperationState.hpp | 47 +-
storage/CMakeLists.txt | 55 +
storage/FastHashTable.hpp | 2515 ++++++++++++++++++
storage/FastHashTableFactory.hpp | 257 ++
storage/FastSeparateChainingHashTable.hpp | 1750 ++++++++++++
storage/HashTable.hpp | 10 -
storage/HashTableBase.hpp | 37 +-
storage/HashTablePool.hpp | 65 +
storage/StorageBlock.cpp | 27 +-
storage/StorageBlock.hpp | 16 +-
35 files changed, 6803 insertions(+), 1657 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/catalog/CatalogTypedefs.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogTypedefs.hpp b/catalog/CatalogTypedefs.hpp
index f7a2d53..70bac84 100644
--- a/catalog/CatalogTypedefs.hpp
+++ b/catalog/CatalogTypedefs.hpp
@@ -49,6 +49,8 @@ constexpr int kInvalidCatalogId = -1;
// Used to indicate no preference for a NUMA Node ID.
constexpr numa_node_id kAnyNUMANodeID = -1;
+constexpr attribute_id kInvalidAttributeID = -1;
+
/** @} */
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index 719920f..e3fb520 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -23,6 +23,7 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
+#include "storage/FastHashTable.hpp"
#include "storage/HashTable.hpp"
#include "storage/HashTableFactory.hpp"
@@ -51,22 +52,16 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
AggregationStateHashTableBase *distinctify_hash_table) const {
// If the key-value pair is already there, we don't need to update the value,
// which should always be "true". I.e. the value is just a placeholder.
- const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {};
- AggregationStateHashTable<bool> *hash_table =
- static_cast<AggregationStateHashTable<bool>*>(distinctify_hash_table);
+ AggregationStateFastHashTable *hash_table =
+ static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
if (key_ids.size() == 1) {
- hash_table->upsertValueAccessor(accessor,
- key_ids[0],
- true /* check_for_null_keys */,
- true /* initial_value */,
- &noop_upserter);
+ hash_table->upsertValueAccessorFast(
+ key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
} else {
- hash_table->upsertValueAccessorCompositeKey(accessor,
- key_ids,
- true /* check_for_null_keys */,
- true /* initial_value */,
- &noop_upserter);
+ std::vector<attribute_id> empty_args {kInvalidAttributeID};
+ hash_table->upsertValueAccessorCompositeKeyFast(
+ empty_args, accessor, key_ids, true /* check_for_null_keys */);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index c5ca061..398a032 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -21,13 +21,15 @@
#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
#include <cstddef>
-#include <vector>
#include <utility>
+#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
#include "storage/HashTable.hpp"
#include "storage/HashTableBase.hpp"
+#include "threading/SpinMutex.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "utility/Macros.hpp"
@@ -48,8 +50,8 @@ class ValueAccessor;
* @brief An upserter class for modifying the destination hash table while
* merging two group by hash tables.
**/
-template <typename HandleT, typename StateT>
-class HashTableStateUpserter {
+template <typename HandleT>
+class HashTableStateUpserterFast {
public:
/**
* @brief Constructor.
@@ -59,7 +61,8 @@ class HashTableStateUpserter {
* table. The corresponding state (for the same key) in the destination
* hash table will be upserted.
**/
- HashTableStateUpserter(const HandleT &handle, const StateT &source_state)
+ HashTableStateUpserterFast(const HandleT &handle,
+ const std::uint8_t *source_state)
: handle_(handle), source_state_(source_state) {}
/**
@@ -68,65 +71,15 @@ class HashTableStateUpserter {
* @param destination_state The aggregation state in the aggregation hash
* table that is being upserted.
**/
- void operator()(StateT *destination_state) {
- handle_.mergeStates(source_state_, destination_state);
+ void operator()(std::uint8_t *destination_state) {
+ handle_.mergeStatesFast(source_state_, destination_state);
}
private:
const HandleT &handle_;
- const StateT &source_state_;
+ const std::uint8_t *source_state_;
- DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter);
-};
-
-/**
- * @brief A class to support the functor for merging group by hash tables.
- **/
-template <typename HandleT, typename StateT, typename HashTableT>
-class HashTableMerger {
- public:
- /**
- * @brief Constructor
- *
- * @param handle The Aggregation handle being used.
- * @param destination_hash_table The destination hash table to which other
- * hash tables will be merged.
- **/
- HashTableMerger(const HandleT &handle,
- AggregationStateHashTableBase *destination_hash_table)
- : handle_(handle),
- destination_hash_table_(
- static_cast<HashTableT *>(destination_hash_table)) {}
-
- /**
- * @brief The operator for the functor.
- *
- * @param group_by_key The group by key being merged.
- * @param source_state The aggregation state for the given key in the source
- * aggregation hash table.
- **/
- inline void operator()(const std::vector<TypedValue> &group_by_key,
- const StateT &source_state) {
- const StateT *original_state =
- destination_hash_table_->getSingleCompositeKey(group_by_key);
- if (original_state != nullptr) {
- HashTableStateUpserter<HandleT, StateT> upserter(
- handle_, source_state);
- // The CHECK is required as upsertCompositeKey can return false if the
- // hash table runs out of space during the upsert process. The ideal
- // solution will be to retry again if the upsert fails.
- CHECK(destination_hash_table_->upsertCompositeKey(
- group_by_key, *original_state, &upserter));
- } else {
- destination_hash_table_->putCompositeKey(group_by_key, source_state);
- }
- }
-
- private:
- const HandleT &handle_;
- HashTableT *destination_hash_table_;
-
- DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
+ DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast);
};
/**
@@ -156,14 +109,15 @@ class AggregationConcreteHandle : public AggregationHandle {
*/
AggregationStateHashTableBase* createDistinctifyHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &key_types,
+ const std::vector<const Type *> &key_types,
const std::size_t estimated_num_distinct_keys,
StorageManager *storage_manager) const override;
/**
- * @brief Implementaion for AggregationHandle::insertValueAccessorIntoDistinctifyHashTable()
- * that inserts the GROUP BY expressions and aggregation arguments together
- * as keys into the distinctify hash table.
+ * @brief Implementaion for
+ * AggregationHandle::insertValueAccessorIntoDistinctifyHashTable()
+ * that inserts the GROUP BY expressions and aggregation arguments together
+ * as keys into the distinctify hash table.
*/
void insertValueAccessorIntoDistinctifyHashTable(
ValueAccessor *accessor,
@@ -171,61 +125,40 @@ class AggregationConcreteHandle : public AggregationHandle {
AggregationStateHashTableBase *distinctify_hash_table) const override;
protected:
- AggregationConcreteHandle() {
- }
+ AggregationConcreteHandle() {}
- template <typename HandleT,
- typename StateT,
- typename HashTableT>
- void aggregateValueAccessorIntoHashTableNullaryHelper(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &group_by_key_ids,
- const StateT &default_state,
- AggregationStateHashTableBase *hash_table) const;
-
- template <typename HandleT,
- typename StateT,
- typename HashTableT>
- void aggregateValueAccessorIntoHashTableUnaryHelper(
- ValueAccessor *accessor,
- const attribute_id argument_id,
- const std::vector<attribute_id> &group_by_key_ids,
- const StateT &default_state,
- AggregationStateHashTableBase *hash_table) const;
-
- template <typename HandleT,
- typename StateT>
- StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelper(
+ template <typename HandleT, typename StateT>
+ StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
const AggregationStateHashTableBase &distinctify_hash_table) const;
- template <typename HandleT,
- typename StateT,
- typename HashTableT>
- void aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
+ template <typename HandleT, typename HashTableT>
+ void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
const AggregationStateHashTableBase &distinctify_hash_table,
- const StateT &default_state,
- AggregationStateHashTableBase *hash_table) const;
+ AggregationStateHashTableBase *hash_table,
+ std::size_t index) const;
- template <typename HandleT,
- typename HashTableT>
- ColumnVector* finalizeHashTableHelper(
+ template <typename HandleT, typename HashTableT>
+ ColumnVector* finalizeHashTableHelperFast(
const Type &result_type,
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const;
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const;
template <typename HandleT, typename HashTableT>
- inline TypedValue finalizeGroupInHashTable(
+ inline TypedValue finalizeGroupInHashTableFast(
const AggregationStateHashTableBase &hash_table,
- const std::vector<TypedValue> &group_key) const {
- const AggregationState *group_state
- = static_cast<const HashTableT&>(hash_table).getSingleCompositeKey(group_key);
+ const std::vector<TypedValue> &group_key,
+ int index) const {
+ const std::uint8_t *group_state =
+ static_cast<const HashTableT &>(hash_table).getSingleCompositeKey(group_key, index);
DCHECK(group_state != nullptr)
<< "Could not find entry for specified group_key in HashTable";
- return static_cast<const HandleT*>(this)->finalizeHashTableEntry(*group_state);
+ return static_cast<const HandleT *>(this)->finalizeHashTableEntryFast(
+ group_state);
}
- template <typename HandleT, typename StateT, typename HashTableT>
- void mergeGroupByHashTablesHelper(
+ template <typename HandleT, typename HashTableT>
+ void mergeGroupByHashTablesHelperFast(
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const;
@@ -234,51 +167,6 @@ class AggregationConcreteHandle : public AggregationHandle {
};
/**
- * @brief Templated class to implement value-accessor-based upserter for each
- * aggregation state payload type. This version is for nullary
- * aggregates (those that take no arguments).
- **/
-template <typename HandleT, typename StateT>
-class NullaryAggregationStateValueAccessorUpserter {
- public:
- explicit NullaryAggregationStateValueAccessorUpserter(const HandleT &handle)
- : handle_(handle) {
- }
-
- template <typename ValueAccessorT>
- inline void operator()(const ValueAccessorT &accessor, StateT *state) {
- handle_.iterateNullaryInl(state);
- }
-
- private:
- const HandleT &handle_;
-};
-
-/**
- * @brief Templated class to implement value-accessor-based upserter for each
- * aggregation state payload type. This version is for unary aggregates
- * (those that take a single argument).
- **/
-template <typename HandleT, typename StateT>
-class UnaryAggregationStateValueAccessorUpserter {
- public:
- UnaryAggregationStateValueAccessorUpserter(const HandleT &handle,
- attribute_id value_id)
- : handle_(handle),
- value_id_(value_id) {
- }
-
- template <typename ValueAccessorT>
- inline void operator()(const ValueAccessorT &accessor, StateT *state) {
- handle_.iterateUnaryInl(state, accessor.getTypedValue(value_id_));
- }
-
- private:
- const HandleT &handle_;
- const attribute_id value_id_;
-};
-
-/**
* @brief Templated helper class used to implement
* AggregationHandle::finalizeHashTable() by visiting each entry (i.e.
* GROUP) in a HashTable, finalizing the aggregation for the GROUP, and
@@ -288,18 +176,26 @@ class UnaryAggregationStateValueAccessorUpserter {
template <typename HandleT, typename ColumnVectorT>
class HashTableAggregateFinalizer {
public:
- HashTableAggregateFinalizer(const HandleT &handle,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- ColumnVectorT *output_column_vector)
+ HashTableAggregateFinalizer(
+ const HandleT &handle,
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ ColumnVectorT *output_column_vector)
: handle_(handle),
group_by_keys_(group_by_keys),
- output_column_vector_(output_column_vector) {
- }
+ output_column_vector_(output_column_vector) {}
inline void operator()(const std::vector<TypedValue> &group_by_key,
const AggregationState &group_state) {
group_by_keys_->emplace_back(group_by_key);
- output_column_vector_->appendTypedValue(handle_.finalizeHashTableEntry(group_state));
+ output_column_vector_->appendTypedValue(
+ handle_.finalizeHashTableEntry(group_state));
+ }
+
+ inline void operator()(const std::vector<TypedValue> &group_by_key,
+ const unsigned char *byte_ptr) {
+ group_by_keys_->emplace_back(group_by_key);
+ output_column_vector_->appendTypedValue(
+ handle_.finalizeHashTableEntryFast(byte_ptr));
}
private:
@@ -313,171 +209,117 @@ class HashTableAggregateFinalizer {
// ----------------------------------------------------------------------------
// Implementations of templated methods follow:
-template <typename HandleT,
- typename StateT,
- typename HashTableT>
-void AggregationConcreteHandle::aggregateValueAccessorIntoHashTableNullaryHelper(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &group_by_key_ids,
- const StateT &default_state,
- AggregationStateHashTableBase *hash_table) const {
- NullaryAggregationStateValueAccessorUpserter<HandleT, StateT>
- upserter(static_cast<const HandleT&>(*this));
- static_cast<HashTableT*>(hash_table)->upsertValueAccessorCompositeKey(
- accessor,
- group_by_key_ids,
- true,
- default_state,
- &upserter);
-}
-
-template <typename HandleT,
- typename StateT,
- typename HashTableT>
-void AggregationConcreteHandle::aggregateValueAccessorIntoHashTableUnaryHelper(
- ValueAccessor *accessor,
- const attribute_id argument_id,
- const std::vector<attribute_id> &group_by_key_ids,
- const StateT &default_state,
- AggregationStateHashTableBase *hash_table) const {
- UnaryAggregationStateValueAccessorUpserter<HandleT, StateT>
- upserter(static_cast<const HandleT&>(*this), argument_id);
- static_cast<HashTableT*>(hash_table)->upsertValueAccessorCompositeKey(
- accessor,
- group_by_key_ids,
- true,
- default_state,
- &upserter);
-}
-
-template <typename HandleT,
- typename StateT>
-StateT* AggregationConcreteHandle::aggregateOnDistinctifyHashTableForSingleUnaryHelper(
- const AggregationStateHashTableBase &distinctify_hash_table) const {
- const HandleT& handle = static_cast<const HandleT&>(*this);
- StateT *state = static_cast<StateT*>(createInitialState());
+template <typename HandleT, typename StateT>
+StateT* AggregationConcreteHandle::
+ aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+ const AggregationStateHashTableBase &distinctify_hash_table) const {
+ const HandleT &handle = static_cast<const HandleT &>(*this);
+ StateT *state = static_cast<StateT *>(createInitialState());
// A lambda function which will be called on each key from the distinctify
// hash table.
- const auto aggregate_functor = [&handle, &state](const TypedValue &key,
- const bool &dumb_placeholder) {
+ const auto aggregate_functor = [&handle, &state](
+ const TypedValue &key, const std::uint8_t &dumb_placeholder) {
// For each (unary) key in the distinctify hash table, aggregate the key
// into "state".
handle.iterateUnaryInl(state, key);
};
- const AggregationStateHashTable<bool> &hash_table =
- static_cast<const AggregationStateHashTable<bool>&>(distinctify_hash_table);
- // Invoke the lambda function "aggregate_functor" on each key from the distinctify
- // hash table.
+ const AggregationStateFastHashTable &hash_table =
+ static_cast<const AggregationStateFastHashTable &>(
+ distinctify_hash_table);
+ // Invoke the lambda function "aggregate_functor" on each key from the
+ // distinctify hash table.
hash_table.forEach(&aggregate_functor);
return state;
}
-template <typename HandleT,
- typename StateT,
- typename HashTableT>
-void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
- const AggregationStateHashTableBase &distinctify_hash_table,
- const StateT &default_state,
- AggregationStateHashTableBase *aggregation_hash_table) const {
- const HandleT& handle = static_cast<const HandleT&>(*this);
- HashTableT *target_hash_table = static_cast<HashTableT*>(aggregation_hash_table);
+template <typename HandleT, typename HashTableT>
+void AggregationConcreteHandle::
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+ const AggregationStateHashTableBase &distinctify_hash_table,
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const {
+ const HandleT &handle = static_cast<const HandleT &>(*this);
+ HashTableT *target_hash_table =
+ static_cast<HashTableT *>(aggregation_hash_table);
// A lambda function which will be called on each key-value pair from the
// distinctify hash table.
- const auto aggregate_functor = [&handle, &target_hash_table, &default_state](
- std::vector<TypedValue> &key,
- const bool &dumb_placeholder) {
+ const auto aggregate_functor = [&handle, &target_hash_table, &index](
+ std::vector<TypedValue> &key, const bool &dumb_placeholder) {
// For each (composite) key vector in the distinctify hash table with size N.
- // The first N-1 entries are GROUP BY columns and the last entry is the argument
- // to be aggregated on.
+ // The first N-1 entries are GROUP BY columns and the last entry is the
+ // argument to be aggregated on.
const TypedValue argument(std::move(key.back()));
key.pop_back();
// An upserter as lambda function for aggregating the argument into its
// GROUP BY group's entry inside aggregation_hash_table.
- const auto upserter = [&handle, &argument](StateT *state) {
- handle.iterateUnaryInl(state, argument);
+ const auto upserter = [&handle, &argument](std::uint8_t *state) {
+ handle.iterateUnaryInlFast(argument, state);
};
- target_hash_table->upsertCompositeKey(key, default_state, &upserter);
+ target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index);
};
- const AggregationStateHashTable<bool> &source_hash_table =
- static_cast<const AggregationStateHashTable<bool>&>(distinctify_hash_table);
+ const HashTableT &source_hash_table =
+ static_cast<const HashTableT &>(distinctify_hash_table);
// Invoke the lambda function "aggregate_functor" on each composite key vector
// from the distinctify hash table.
- source_hash_table.forEachCompositeKey(&aggregate_functor);
+ source_hash_table.forEachCompositeKeyFast(&aggregate_functor);
}
-template <typename HandleT,
- typename HashTableT>
-ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
+template <typename HandleT, typename HashTableT>
+ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
const Type &result_type,
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
- const HandleT &handle = static_cast<const HandleT&>(*this);
- const HashTableT &hash_table_concrete = static_cast<const HashTableT&>(hash_table);
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ const HandleT &handle = static_cast<const HandleT &>(*this);
+ const HashTableT &hash_table_concrete =
+ static_cast<const HashTableT &>(hash_table);
if (group_by_keys->empty()) {
if (NativeColumnVector::UsableForType(result_type)) {
- NativeColumnVector *result = new NativeColumnVector(result_type,
- hash_table_concrete.numEntries());
+ NativeColumnVector *result =
+ new NativeColumnVector(result_type, hash_table_concrete.numEntries());
HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer(
- handle,
- group_by_keys,
- result);
- hash_table_concrete.forEachCompositeKey(&finalizer);
+ handle, group_by_keys, result);
+ hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
return result;
} else {
- IndirectColumnVector *result = new IndirectColumnVector(result_type,
- hash_table_concrete.numEntries());
+ IndirectColumnVector *result = new IndirectColumnVector(
+ result_type, hash_table_concrete.numEntries());
HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer(
- handle,
- group_by_keys,
- result);
- hash_table_concrete.forEachCompositeKey(&finalizer);
+ handle, group_by_keys, result);
+ hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
return result;
}
} else {
if (NativeColumnVector::UsableForType(result_type)) {
- NativeColumnVector *result = new NativeColumnVector(result_type,
- group_by_keys->size());
+ NativeColumnVector *result =
+ new NativeColumnVector(result_type, group_by_keys->size());
for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
- result->appendTypedValue(finalizeGroupInHashTable<HandleT, HashTableT>(hash_table,
- group_by_key));
+ result->appendTypedValue(
+ finalizeGroupInHashTableFast<HandleT, HashTableT>(
+ hash_table, group_by_key, index));
}
return result;
} else {
- IndirectColumnVector *result = new IndirectColumnVector(result_type,
- hash_table_concrete.numEntries());
+ IndirectColumnVector *result = new IndirectColumnVector(
+ result_type, hash_table_concrete.numEntries());
for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
- result->appendTypedValue(finalizeGroupInHashTable<HandleT, HashTableT>(hash_table,
- group_by_key));
+ result->appendTypedValue(
+ finalizeGroupInHashTableFast<HandleT, HashTableT>(
+ hash_table, group_by_key, index));
}
return result;
}
}
}
-template <typename HandleT,
- typename StateT,
- typename HashTableT>
-void AggregationConcreteHandle::mergeGroupByHashTablesHelper(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const {
- const HandleT &handle = static_cast<const HandleT &>(*this);
- const HashTableT &source_hash_table_concrete =
- static_cast<const HashTableT &>(source_hash_table);
-
- HashTableMerger<HandleT, StateT, HashTableT> merger(handle,
- destination_hash_table);
-
- source_hash_table_concrete.forEachCompositeKey(&merger);
-}
-
} // namespace quickstep
#endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 3d6e872..48ce7fe 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -40,7 +40,6 @@ class ValueAccessor;
* @{
*/
-
/**
* @brief Abstract base class for aggregation state.
**/
@@ -107,8 +106,7 @@ class AggregationHandle {
* @brief Virtual destructor.
*
**/
- virtual ~AggregationHandle() {
- }
+ virtual ~AggregationHandle() {}
/**
* @brief Create an initial "blank" state for this aggregation.
@@ -132,11 +130,11 @@ class AggregationHandle {
* A StorageBlob will be allocated to serve as the HashTable's
* in-memory storage.
* @return A new HashTable instance with the appropriate state type for this
- * aggregate as the ValueT.
+ * aggregate.
**/
virtual AggregationStateHashTableBase* createGroupByHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
+ const std::vector<const Type *> &group_by_types,
const std::size_t estimated_num_groups,
StorageManager *storage_manager) const = 0;
@@ -261,14 +259,18 @@ class AggregationHandle {
* values returned in the ColumnVector. If this is already filled in,
* then this method will visit the GROUP BY keys in the exact order
* specified.
+ * @param index The index of the AggregationHandle to be finalized.
+ *
* @return A ColumnVector containing each group's finalized aggregate value.
**/
virtual ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const = 0;
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const = 0;
/**
- * @brief Create a new HashTable for the distinctify step for DISTINCT aggregation.
+ * @brief Create a new HashTable for the distinctify step for DISTINCT
+ * aggregation.
*
* Distinctify is the first step for DISTINCT aggregation. This step inserts
* the GROUP BY expression values and aggregation arguments together as keys
@@ -281,8 +283,8 @@ class AggregationHandle {
* we simply treat it as a special GROUP BY case that the GROUP BY expression
* vector is empty.
*
- * @param hash_table_impl The choice of which concrete HashTable implementation
- * to use.
+ * @param hash_table_impl The choice of which concrete HashTable
+ * implementation to use.
* @param key_types The types of the GROUP BY expressions together with the
* types of the aggregation arguments.
* @param estimated_num_distinct_keys The estimated number of distinct keys
@@ -291,14 +293,15 @@ class AggregationHandle {
* This is an estimate only, and the HashTable will be resized if it
* becomes over-full.
* @param storage_manager The StorageManager to use to create the HashTable.
- * A StorageBlob will be allocated to serve as the HashTable's in-memory
- * storage.
+ * A StorageBlob will be allocated to serve as the HashTable's
+ * in-memory storage.
+ *
* @return A new HashTable instance with the appropriate state type for this
- * aggregate as the ValueT.
+ * aggregate.
*/
virtual AggregationStateHashTableBase* createDistinctifyHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &key_types,
+ const std::vector<const Type *> &key_types,
const std::size_t estimated_num_distinct_keys,
StorageManager *storage_manager) const = 0;
@@ -306,15 +309,16 @@ class AggregationHandle {
* @brief Inserts the GROUP BY expressions and aggregation arguments together
* as keys into the distinctify hash table.
*
- * @param accessor The ValueAccessor that will be iterated over to read tuples.
+ * @param accessor The ValueAccessor that will be iterated over to read
+ * tuples.
* @param key_ids The attribute_ids of the GROUP BY expressions in accessor
* together with the attribute_ids of the arguments to this aggregate
* in accessor, in order.
- * @param distinctify_hash_table The HashTable to store the GROUP BY expressions
- * and the aggregation arguments together as hash table keys and a bool
- * constant \c true as hash table value (So the hash table actually
- * serves as a hash set). This should have been created by calling
- * createDistinctifyHashTable();
+ * @param distinctify_hash_table The HashTable to store the GROUP BY
+ * expressions and the aggregation arguments together as hash table
+ * keys and a bool constant \c true as hash table value (So the hash
+ * table actually serves as a hash set). This should have been created
+ * by calling createDistinctifyHashTable();
*/
virtual void insertValueAccessorIntoDistinctifyHashTable(
ValueAccessor *accessor,
@@ -339,32 +343,82 @@ class AggregationHandle {
* @brief Perform GROUP BY aggregation on the keys from the distinctify hash
* table and upserts states into the aggregation hash table.
*
- * @param distinctify_hash_table Hash table which stores the GROUP BY expression
- * values and aggregation arguments together as hash table keys.
+ * @param distinctify_hash_table Hash table which stores the GROUP BY
+ * expression values and aggregation arguments together as hash table
+ * keys.
* @param aggregation_hash_table The HashTable to upsert AggregationStates in.
* This should have been created by calling createGroupByHashTable() on
* this same AggregationHandle.
+ * @param index The index of the distinctify hash table for which we perform
+ * the DISTINCT aggregation.
*/
virtual void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const = 0;
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const = 0;
+
+ /**
+ * @brief Get the number of bytes needed to store the aggregation handle's
+ * state.
+ **/
+ virtual std::size_t getPayloadSize() const { return 1; }
/**
- * @brief Merge two GROUP BY hash tables in one.
+ * @brief Update the aggregation state for nullary aggregation function e.g.
+ * COUNT(*).
*
- * @note Both the hash tables should have the same structure.
+ * @note This function should be overloaded by those aggregation function
+ * which can perform nullary operations, e.g. COUNT.
+ *
+ * @param byte_ptr The pointer where the aggregation state is stored.
+ **/
+ virtual void updateStateNullary(std::uint8_t *byte_ptr) const {}
+
+ /**
+ * @brief Update the aggregation state for unary aggregation function e.g.
+ * SUM(a).
*
- * @param source_hash_table The hash table which will get merged.
- * @param destination_hash_table The hash table to which we will merge the
- * other hash table.
+ * @param argument The argument which will be used to update the state of the
+ * aggregation function.
+ * @param byte_ptr The pointer where the aggregation state is stored.
+ **/
+ virtual void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const {}
+
+ /**
+ * @brief Merge two aggregation states for this aggregation handle.
+ *
+ * @note This function should be used with the hash table specifically meant
+ * for aggregations only.
+ *
+ * @param src A pointer to the source aggregation state.
+ * @param dst A pointer to the destination aggregation state.
+ **/
+ virtual void mergeStatesFast(const std::uint8_t *src,
+ std::uint8_t *dst) const {}
+
+ /**
+ * @brief Initialize the payload (in the aggregation hash table) for the given
+ * aggregation handle.
+ *
+ * @param byte_ptr The pointer to the aggregation state in the hash table.
+ **/
+ virtual void initPayload(std::uint8_t *byte_ptr) const {}
+
+ /**
+ * @brief Inform the aggregation handle to block (prohibit) updates on the
+ * aggregation state.
+ **/
+ virtual void blockUpdate() {}
+
+ /**
+ * @brief Inform the aggregation handle to allow updates on the
+ * aggregation state.
**/
- virtual void mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const = 0;
+ virtual void allowUpdate() {}
protected:
- AggregationHandle() {
- }
+ AggregationHandle() {}
private:
DISALLOW_COPY_AND_ASSIGN(AggregationHandle);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 4bd43d6..2481092 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -42,7 +42,7 @@ namespace quickstep {
class StorageManager;
AggregationHandleAvg::AggregationHandleAvg(const Type &type)
- : argument_type_(type) {
+ : argument_type_(type), block_update_(false) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_precision_id;
@@ -76,26 +76,24 @@ AggregationHandleAvg::AggregationHandleAvg(const Type &type)
// Divide operator for dividing sum by count to get final average.
divide_operator_.reset(
BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
- .makeUncheckedBinaryOperatorForTypes(sum_type, TypeFactory::GetType(kDouble)));
+ .makeUncheckedBinaryOperatorForTypes(sum_type,
+ TypeFactory::GetType(kDouble)));
// Result is nullable, because AVG() over 0 values (or all NULL values) is
// NULL.
- result_type_
- = &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
- .resultTypeForArgumentTypes(sum_type, TypeFactory::GetType(kDouble))
- ->getNullableVersion());
+ result_type_ =
+ &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .resultTypeForArgumentTypes(sum_type, TypeFactory::GetType(kDouble))
+ ->getNullableVersion());
}
AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
+ const std::vector<const Type *> &group_by_types,
const std::size_t estimated_num_groups,
StorageManager *storage_manager) const {
return AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable(
- hash_table_impl,
- group_by_types,
- estimated_num_groups,
- storage_manager);
+ hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
}
AggregationState* AggregationHandleAvg::accumulateColumnVectors(
@@ -105,9 +103,8 @@ AggregationState* AggregationHandleAvg::accumulateColumnVectors(
AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
std::size_t count = 0;
- state->sum_ = fast_add_operator_->accumulateColumnVector(state->sum_,
- *column_vectors.front(),
- &count);
+ state->sum_ = fast_add_operator_->accumulateColumnVector(
+ state->sum_, *column_vectors.front(), &count);
state->count_ = count;
return state;
}
@@ -121,10 +118,8 @@ AggregationState* AggregationHandleAvg::accumulateValueAccessor(
AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
std::size_t count = 0;
- state->sum_ = fast_add_operator_->accumulateValueAccessor(state->sum_,
- accessor,
- accessor_ids.front(),
- &count);
+ state->sum_ = fast_add_operator_->accumulateValueAccessor(
+ state->sum_, accessor, accessor_ids.front(), &count);
state->count_ = count;
return state;
}
@@ -137,79 +132,74 @@ void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
AggregationStateHashTableBase *hash_table) const {
DCHECK_EQ(1u, argument_ids.size())
<< "Got wrong number of arguments for AVG: " << argument_ids.size();
-
- aggregateValueAccessorIntoHashTableUnaryHelper<
- AggregationHandleAvg,
- AggregationStateAvg,
- AggregationStateHashTable<AggregationStateAvg>>(
- accessor,
- argument_ids.front(),
- group_by_key_ids,
- blank_state_,
- hash_table);
}
-void AggregationHandleAvg::mergeStates(
- const AggregationState &source,
- AggregationState *destination) const {
- const AggregationStateAvg &avg_source = static_cast<const AggregationStateAvg&>(source);
- AggregationStateAvg *avg_destination = static_cast<AggregationStateAvg*>(destination);
+void AggregationHandleAvg::mergeStates(const AggregationState &source,
+ AggregationState *destination) const {
+ const AggregationStateAvg &avg_source =
+ static_cast<const AggregationStateAvg &>(source);
+ AggregationStateAvg *avg_destination =
+ static_cast<AggregationStateAvg *>(destination);
SpinMutexLock lock(avg_destination->mutex_);
avg_destination->count_ += avg_source.count_;
- avg_destination->sum_ = merge_add_operator_->applyToTypedValues(avg_destination->sum_,
- avg_source.sum_);
+ avg_destination->sum_ = merge_add_operator_->applyToTypedValues(
+ avg_destination->sum_, avg_source.sum_);
+}
+
+void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source,
+ std::uint8_t *destination) const {
+ const TypedValue *src_sum_ptr =
+ reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
+ const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(
+ source + blank_state_.count_offset_);
+ TypedValue *dst_sum_ptr =
+ reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_);
+ std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(
+ destination + blank_state_.count_offset_);
+ (*dst_count_ptr) += (*src_count_ptr);
+ *dst_sum_ptr =
+ merge_add_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
}
TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
- const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state);
+ const AggregationStateAvg &agg_state =
+ static_cast<const AggregationStateAvg &>(state);
if (agg_state.count_ == 0) {
// AVG() over no values is NULL.
return result_type_->makeNullValue();
} else {
// Divide sum by count to get final average.
- return divide_operator_->applyToTypedValues(agg_state.sum_,
- TypedValue(static_cast<double>(agg_state.count_)));
+ return divide_operator_->applyToTypedValues(
+ agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
}
}
ColumnVector* AggregationHandleAvg::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
- return finalizeHashTableHelper<AggregationHandleAvg,
- AggregationStateHashTable<AggregationStateAvg>>(
- *result_type_,
- hash_table,
- group_by_keys);
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ return finalizeHashTableHelperFast<AggregationHandleAvg,
+ AggregationStateFastHashTable>(
+ *result_type_, hash_table, group_by_keys, index);
}
-AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
+AggregationState*
+AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
AggregationHandleAvg,
- AggregationStateAvg>(
- distinctify_hash_table);
+ AggregationStateAvg>(distinctify_hash_table);
}
void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
AggregationHandleAvg,
- AggregationStateAvg,
- AggregationStateHashTable<AggregationStateAvg>>(
- distinctify_hash_table,
- blank_state_,
- aggregation_hash_table);
-}
-
-void AggregationHandleAvg::mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const {
- mergeGroupByHashTablesHelper<AggregationHandleAvg,
- AggregationStateAvg,
- AggregationStateHashTable<AggregationStateAvg>>(
- source_hash_table, destination_hash_table);
+ AggregationStateFastHashTable>(
+ distinctify_hash_table, aggregation_hash_table, index);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 31997b1..3c6e0c2 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -28,6 +28,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
#include "storage/HashTableBase.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
@@ -57,26 +58,45 @@ class AggregationStateAvg : public AggregationState {
*/
AggregationStateAvg(const AggregationStateAvg &orig)
: sum_(orig.sum_),
- count_(orig.count_) {
- }
+ count_(orig.count_),
+ sum_offset_(orig.sum_offset_),
+ count_offset_(orig.count_offset_),
+ mutex_offset_(orig.mutex_offset_) {}
/**
* @brief Destructor.
*/
~AggregationStateAvg() override {}
+ std::size_t getPayloadSize() const {
+ std::size_t p1 = reinterpret_cast<std::size_t>(&sum_);
+ std::size_t p2 = reinterpret_cast<std::size_t>(&mutex_);
+ return (p2 - p1);
+ }
+
+ const std::uint8_t *getPayloadAddress() const {
+ return reinterpret_cast<const uint8_t *>(&sum_);
+ }
+
private:
friend class AggregationHandleAvg;
AggregationStateAvg()
- : sum_(0), count_(0) {
- }
+ : sum_(0),
+ count_(0),
+ sum_offset_(0),
+ count_offset_(reinterpret_cast<std::uint8_t *>(&count_) -
+ reinterpret_cast<std::uint8_t *>(&sum_)),
+ mutex_offset_(reinterpret_cast<std::uint8_t *>(&mutex_) -
+ reinterpret_cast<std::uint8_t *>(&sum_)) {}
// TODO(shoban): We might want to specialize sum_ and count_ to use atomics
// for int types similar to in AggregationStateCount.
TypedValue sum_;
std::int64_t count_;
SpinMutex mutex_;
+
+ int sum_offset_, count_offset_, mutex_offset_;
};
/**
@@ -84,8 +104,7 @@ class AggregationStateAvg : public AggregationState {
**/
class AggregationHandleAvg : public AggregationConcreteHandle {
public:
- ~AggregationHandleAvg() override {
- }
+ ~AggregationHandleAvg() override {}
AggregationState* createInitialState() const override {
return new AggregationStateAvg(blank_state_);
@@ -93,14 +112,15 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
AggregationStateHashTableBase* createGroupByHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
+ const std::vector<const Type *> &group_by_types,
const std::size_t estimated_num_groups,
StorageManager *storage_manager) const override;
/**
* @brief Iterate method with average aggregation state.
**/
- inline void iterateUnaryInl(AggregationStateAvg *state, const TypedValue &value) const {
+ inline void iterateUnaryInl(AggregationStateAvg *state,
+ const TypedValue &value) const {
DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
if (value.isNull()) return;
@@ -109,8 +129,41 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
++state->count_;
}
+ inline void iterateUnaryInlFast(const TypedValue &value,
+ std::uint8_t *byte_ptr) const {
+ DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
+ if (value.isNull()) return;
+ TypedValue *sum_ptr =
+ reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+ std::int64_t *count_ptr =
+ reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset_);
+ *sum_ptr = fast_add_operator_->applyToTypedValues(*sum_ptr, value);
+ ++(*count_ptr);
+ }
+
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
+ if (!block_update_) {
+ iterateUnaryInlFast(argument, byte_ptr);
+ }
+ }
+
+ void blockUpdate() override { block_update_ = true; }
+
+ void allowUpdate() override { block_update_ = false; }
+
+ void initPayload(std::uint8_t *byte_ptr) const override {
+ TypedValue *sum_ptr =
+ reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+ std::int64_t *count_ptr =
+ reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset_);
+ *sum_ptr = blank_state_.sum_;
+ *count_ptr = blank_state_.count_;
+ }
+
AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
+ const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+ const override;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
AggregationState* accumulateValueAccessor(
@@ -127,40 +180,61 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
void mergeStates(const AggregationState &source,
AggregationState *destination) const override;
+ void mergeStatesFast(const std::uint8_t *source,
+ std::uint8_t *destination) const override;
+
TypedValue finalize(const AggregationState &state) const override;
- inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
- const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state);
+ inline TypedValue finalizeHashTableEntry(
+ const AggregationState &state) const {
+ const AggregationStateAvg &agg_state =
+ static_cast<const AggregationStateAvg &>(state);
// TODO(chasseur): Could improve performance further if we made a special
// version of finalizeHashTable() that collects all the sums into one
// ColumnVector and all the counts into another and then applies
// '*divide_operator_' to them in bulk.
- return divide_operator_->applyToTypedValues(agg_state.sum_,
- TypedValue(static_cast<double>(agg_state.count_)));
+ return divide_operator_->applyToTypedValues(
+ agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
+ }
+
+ inline TypedValue finalizeHashTableEntryFast(
+ const std::uint8_t *byte_ptr) const {
+ std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
+ TypedValue *sum_ptr =
+ reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_);
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(
+ value_ptr + blank_state_.count_offset_);
+ return divide_operator_->applyToTypedValues(
+ *sum_ptr, TypedValue(static_cast<double>(*count_ptr)));
}
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const override;
/**
- * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
+ * @brief Implementation of
+ * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
* for AVG aggregation.
*/
AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const override;
+ const AggregationStateHashTableBase &distinctify_hash_table)
+ const override;
/**
- * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
+ * @brief Implementation of
+ * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
* for AVG aggregation.
*/
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const override;
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const override;
- void mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const override;
+ std::size_t getPayloadSize() const override {
+ return blank_state_.getPayloadSize();
+ }
private:
friend class AggregateFunctionAvg;
@@ -179,6 +253,8 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
+ bool block_update_;
+
DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index dfcf131..034c942 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -49,48 +49,47 @@ class ValueAccessor;
template <bool count_star, bool nullable_type>
AggregationStateHashTableBase*
- AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return AggregationStateHashTableFactory<AggregationStateCount>::CreateResizable(
- hash_table_impl,
- group_by_types,
- estimated_num_groups,
- storage_manager);
+AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable(
+ const HashTableImplType hash_table_impl,
+ const std::vector<const Type *> &group_by_types,
+ const std::size_t estimated_num_groups,
+ StorageManager *storage_manager) const {
+ return AggregationStateHashTableFactory<
+ AggregationStateCount>::CreateResizable(hash_table_impl,
+ group_by_types,
+ estimated_num_groups,
+ storage_manager);
}
template <bool count_star, bool nullable_type>
AggregationState*
- AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
+ const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
DCHECK(!count_star)
<< "Called non-nullary accumulation method on an AggregationHandleCount "
<< "set up for nullary COUNT(*)";
DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for COUNT: " << column_vectors.size();
+ << "Got wrong number of ColumnVectors for COUNT: "
+ << column_vectors.size();
std::size_t count = 0;
InvokeOnColumnVector(
*column_vectors.front(),
[&](const auto &column_vector) -> void { // NOLINT(build/c++11)
- if (nullable_type) {
- // TODO(shoban): Iterating over the ColumnVector is a rather slow way to
- // do this. We should look at extending the ColumnVector interface to do
- // a quick count of the non-null values (i.e. the length minus the
- // population count of the null bitmap). We should do something similar
- // for ValueAccessor too.
- for (std::size_t pos = 0;
- pos < column_vector.size();
- ++pos) {
- count += !column_vector.getTypedValue(pos).isNull();
- }
- } else {
- count = column_vector.size();
- }
- });
+ if (nullable_type) {
+ // TODO(shoban): Iterating over the ColumnVector is a rather slow way
+ // to do this. We should look at extending the ColumnVector interface
+ // to do a quick count of the non-null values (i.e. the length minus
+ // the population count of the null bitmap). We should do something
+ // similar for ValueAccessor too.
+ for (std::size_t pos = 0; pos < column_vector.size(); ++pos) {
+ count += !column_vector.getTypedValue(pos).isNull();
+ }
+ } else {
+ count = column_vector.size();
+ }
+ });
return new AggregationStateCount(count);
}
@@ -98,9 +97,9 @@ AggregationState*
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
template <bool count_star, bool nullable_type>
AggregationState*
- AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
+AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &accessor_ids) const {
DCHECK(!count_star)
<< "Called non-nullary accumulation method on an AggregationHandleCount "
<< "set up for nullary COUNT(*)";
@@ -113,108 +112,91 @@ AggregationState*
InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
accessor,
[&accessor_id, &count](auto *accessor) -> void { // NOLINT(build/c++11)
- if (nullable_type) {
- while (accessor->next()) {
- count += !accessor->getTypedValue(accessor_id).isNull();
- }
- } else {
- count = accessor->getNumTuples();
- }
- });
+ if (nullable_type) {
+ while (accessor->next()) {
+ count += !accessor->getTypedValue(accessor_id).isNull();
+ }
+ } else {
+ count = accessor->getNumTuples();
+ }
+ });
return new AggregationStateCount(count);
}
#endif
template <bool count_star, bool nullable_type>
- void AggregationHandleCount<count_star, nullable_type>::aggregateValueAccessorIntoHashTable(
+void AggregationHandleCount<count_star, nullable_type>::
+ aggregateValueAccessorIntoHashTable(
ValueAccessor *accessor,
const std::vector<attribute_id> &argument_ids,
const std::vector<attribute_id> &group_by_key_ids,
AggregationStateHashTableBase *hash_table) const {
if (count_star) {
DCHECK_EQ(0u, argument_ids.size())
- << "Got wrong number of arguments for COUNT(*): " << argument_ids.size();
- aggregateValueAccessorIntoHashTableNullaryHelper<
- AggregationHandleCount<count_star, nullable_type>,
- AggregationStateCount,
- AggregationStateHashTable<AggregationStateCount>>(
- accessor,
- group_by_key_ids,
- AggregationStateCount(),
- hash_table);
+ << "Got wrong number of arguments for COUNT(*): "
+ << argument_ids.size();
} else {
DCHECK_EQ(1u, argument_ids.size())
<< "Got wrong number of arguments for COUNT: " << argument_ids.size();
- aggregateValueAccessorIntoHashTableUnaryHelper<
- AggregationHandleCount<count_star, nullable_type>,
- AggregationStateCount,
- AggregationStateHashTable<AggregationStateCount>>(
- accessor,
- argument_ids.front(),
- group_by_key_ids,
- AggregationStateCount(),
- hash_table);
}
}
template <bool count_star, bool nullable_type>
- void AggregationHandleCount<count_star, nullable_type>::mergeStates(
- const AggregationState &source,
- AggregationState *destination) const {
- const AggregationStateCount &count_source = static_cast<const AggregationStateCount&>(source);
- AggregationStateCount *count_destination = static_cast<AggregationStateCount*>(destination);
-
- count_destination->count_.fetch_add(count_source.count_.load(std::memory_order_relaxed),
- std::memory_order_relaxed);
+void AggregationHandleCount<count_star, nullable_type>::mergeStates(
+ const AggregationState &source, AggregationState *destination) const {
+ const AggregationStateCount &count_source =
+ static_cast<const AggregationStateCount &>(source);
+ AggregationStateCount *count_destination =
+ static_cast<AggregationStateCount *>(destination);
+
+ count_destination->count_.fetch_add(
+ count_source.count_.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
}
template <bool count_star, bool nullable_type>
- ColumnVector* AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
- return finalizeHashTableHelper<AggregationHandleCount<count_star, nullable_type>,
- AggregationStateHashTable<AggregationStateCount>>(
- TypeFactory::GetType(kLong),
- hash_table,
- group_by_keys);
+void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast(
+ const std::uint8_t *source, std::uint8_t *destination) const {
+ const std::int64_t *src_count_ptr =
+ reinterpret_cast<const std::int64_t *>(source);
+ std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(destination);
+ (*dst_count_ptr) += (*src_count_ptr);
}
template <bool count_star, bool nullable_type>
-AggregationState* AggregationHandleCount<count_star, nullable_type>
- ::aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const {
- DCHECK_EQ(count_star, false);
- return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ColumnVector*
+AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
+ const AggregationStateHashTableBase &hash_table,
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ return finalizeHashTableHelperFast<
AggregationHandleCount<count_star, nullable_type>,
- AggregationStateCount>(
- distinctify_hash_table);
+ AggregationStateFastHashTable>(
+ TypeFactory::GetType(kLong), hash_table, group_by_keys, index);
}
template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>
- ::aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const {
+AggregationState* AggregationHandleCount<count_star, nullable_type>::
+ aggregateOnDistinctifyHashTableForSingle(
+ const AggregationStateHashTableBase &distinctify_hash_table) const {
DCHECK_EQ(count_star, false);
- aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
AggregationHandleCount<count_star, nullable_type>,
- AggregationStateCount,
- AggregationStateHashTable<AggregationStateCount>>(
- distinctify_hash_table,
- AggregationStateCount(),
- aggregation_hash_table);
+ AggregationStateCount>(distinctify_hash_table);
}
template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const {
- mergeGroupByHashTablesHelper<
- AggregationHandleCount,
- AggregationStateCount,
- AggregationStateHashTable<AggregationStateCount>>(source_hash_table,
- destination_hash_table);
+void AggregationHandleCount<count_star, nullable_type>::
+ aggregateOnDistinctifyHashTableForGroupBy(
+ const AggregationStateHashTableBase &distinctify_hash_table,
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const {
+ DCHECK_EQ(count_star, false);
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+ AggregationHandleCount<count_star, nullable_type>,
+ AggregationStateFastHashTable>(
+ distinctify_hash_table, aggregation_hash_table, index);
}
// Explicitly instantiate and compile in the different versions of
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 1cd5bda..6aab0cd 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -29,6 +29,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
#include "storage/HashTableBase.hpp"
#include "types/TypedValue.hpp"
#include "utility/Macros.hpp"
@@ -40,7 +41,8 @@ class StorageManager;
class Type;
class ValueAccessor;
-template <bool, bool> class AggregationHandleCount;
+template <bool, bool>
+class AggregationHandleCount;
/** \addtogroup Expressions
* @{
@@ -62,19 +64,22 @@ class AggregationStateCount : public AggregationState {
*/
~AggregationStateCount() override {}
+ std::size_t getPayloadSize() const { return sizeof(count_); }
+
+ const std::uint8_t* getPayloadAddress() const {
+ return reinterpret_cast<const uint8_t *>(&count_);
+ }
+
private:
friend class AggregationHandleCount<false, false>;
friend class AggregationHandleCount<false, true>;
friend class AggregationHandleCount<true, false>;
friend class AggregationHandleCount<true, true>;
- AggregationStateCount()
- : count_(0) {
- }
+ AggregationStateCount() : count_(0) {}
explicit AggregationStateCount(const std::int64_t initial_count)
- : count_(initial_count) {
- }
+ : count_(initial_count) {}
std::atomic<std::int64_t> count_;
};
@@ -91,8 +96,7 @@ class AggregationStateCount : public AggregationState {
template <bool count_star, bool nullable_type>
class AggregationHandleCount : public AggregationConcreteHandle {
public:
- ~AggregationHandleCount() override {
- }
+ ~AggregationHandleCount() override {}
AggregationState* createInitialState() const override {
return new AggregationStateCount();
@@ -100,7 +104,7 @@ class AggregationHandleCount : public AggregationConcreteHandle {
AggregationStateHashTableBase* createGroupByHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
+ const std::vector<const Type *> &group_by_types,
const std::size_t estimated_num_groups,
StorageManager *storage_manager) const override;
@@ -108,21 +112,59 @@ class AggregationHandleCount : public AggregationConcreteHandle {
state->count_.fetch_add(1, std::memory_order_relaxed);
}
+ inline void iterateNullaryInlFast(std::uint8_t *byte_ptr) const {
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+ (*count_ptr)++;
+ }
+
/**
* @brief Iterate with count aggregation state.
*/
- inline void iterateUnaryInl(AggregationStateCount *state, const TypedValue &value) const {
+ inline void iterateUnaryInl(AggregationStateCount *state,
+ const TypedValue &value) const {
if ((!nullable_type) || (!value.isNull())) {
state->count_.fetch_add(1, std::memory_order_relaxed);
}
}
- AggregationState* accumulateNullary(const std::size_t num_tuples) const override {
+ inline void iterateUnaryInlFast(const TypedValue &value,
+ std::uint8_t *byte_ptr) const {
+ if ((!nullable_type) || (!value.isNull())) {
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+ (*count_ptr)++;
+ }
+ }
+
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
+ if (!block_update_) {
+ iterateUnaryInlFast(argument, byte_ptr);
+ }
+ }
+
+ inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
+ if (!block_update_) {
+ iterateNullaryInlFast(byte_ptr);
+ }
+ }
+
+ void blockUpdate() override { block_update_ = true; }
+
+ void allowUpdate() override { block_update_ = false; }
+
+ void initPayload(std::uint8_t *byte_ptr) const override {
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+ *count_ptr = 0;
+ }
+
+ AggregationState* accumulateNullary(
+ const std::size_t num_tuples) const override {
return new AggregationStateCount(num_tuples);
}
AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
+ const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+ const override;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
AggregationState* accumulateValueAccessor(
@@ -139,36 +181,54 @@ class AggregationHandleCount : public AggregationConcreteHandle {
void mergeStates(const AggregationState &source,
AggregationState *destination) const override;
+ void mergeStatesFast(const std::uint8_t *source,
+ std::uint8_t *destination) const override;
+
TypedValue finalize(const AggregationState &state) const override {
- return TypedValue(static_cast<const AggregationStateCount&>(state).count_.load(std::memory_order_relaxed));
+ return TypedValue(
+ static_cast<const AggregationStateCount &>(state).count_.load(
+ std::memory_order_relaxed));
+ }
+
+ inline TypedValue finalizeHashTableEntry(
+ const AggregationState &state) const {
+ return TypedValue(
+ static_cast<const AggregationStateCount &>(state).count_.load(
+ std::memory_order_relaxed));
}
- inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
- return TypedValue(static_cast<const AggregationStateCount&>(state).count_.load(std::memory_order_relaxed));
+ inline TypedValue finalizeHashTableEntryFast(
+ const std::uint8_t *byte_ptr) const {
+ const std::int64_t *count_ptr =
+ reinterpret_cast<const std::int64_t *>(byte_ptr);
+ return TypedValue(*count_ptr);
}
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const override;
/**
- * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
+ * @brief Implementation of
+ * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
* for SUM aggregation.
*/
AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const override;
+ const AggregationStateHashTableBase &distinctify_hash_table)
+ const override;
/**
- * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
+ * @brief Implementation of
+ * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
* for SUM aggregation.
*/
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const override;
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const override;
- void mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const override;
+ std::size_t getPayloadSize() const override { return sizeof(std::int64_t); }
private:
friend class AggregateFunctionCount;
@@ -176,8 +236,9 @@ class AggregationHandleCount : public AggregationConcreteHandle {
/**
* @brief Constructor.
**/
- AggregationHandleCount() {
- }
+ AggregationHandleCount() : block_update_(false) {}
+
+ bool block_update_;
DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
index 68fcd4c..0dc8b56 100644
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ b/expressions/aggregation/AggregationHandleDistinct.cpp
@@ -65,14 +65,15 @@ void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable(
ColumnVector* AggregationHandleDistinct::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
DCHECK(group_by_keys->empty());
const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
const bool &dumb_placeholder) -> void {
group_by_keys->emplace_back(std::move(group_by_key));
};
- static_cast<const AggregationStateHashTable<bool>&>(hash_table).forEachCompositeKey(&keys_retriever);
+ static_cast<const AggregationStateFastHashTable&>(hash_table).forEachCompositeKeyFast(&keys_retriever);
return nullptr;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
index 8524fcc..838bfdd 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -49,27 +49,32 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
/**
* @brief Constructor.
**/
- AggregationHandleDistinct() {
- }
+ AggregationHandleDistinct() {}
AggregationState* createInitialState() const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support createInitialState().";
+ LOG(FATAL)
+ << "AggregationHandleDistinct does not support createInitialState().";
}
- AggregationState* accumulateNullary(const std::size_t num_tuples) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support accumulateNullary().";
+ AggregationState* accumulateNullary(
+ const std::size_t num_tuples) const override {
+ LOG(FATAL)
+ << "AggregationHandleDistinct does not support accumulateNullary().";
}
AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support accumulateColumnVectors().";
+ const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+ const override {
+ LOG(FATAL) << "AggregationHandleDistinct does not support "
+ "accumulateColumnVectors().";
}
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
AggregationState* accumulateValueAccessor(
ValueAccessor *accessor,
const std::vector<attribute_id> &accessor_ids) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support accumulateValueAccessor().";
+ LOG(FATAL) << "AggregationHandleDistinct does not support "
+ "accumulateValueAccessor().";
}
#endif
@@ -83,21 +88,23 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
}
AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const override {
+ const AggregationStateHashTableBase &distinctify_hash_table)
+ const override {
LOG(FATAL) << "AggregationHandleDistinct does not support "
<< "aggregateOnDistinctifyHashTableForSingle().";
}
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *groupby_hash_table) const override {
+ AggregationStateHashTableBase *groupby_hash_table,
+ std::size_t index) const override {
LOG(FATAL) << "AggregationHandleDistinct does not support "
<< "aggregateOnDistinctifyHashTableForGroupBy().";
}
AggregationStateHashTableBase* createGroupByHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
+ const std::vector<const Type *> &group_by_types,
const std::size_t estimated_num_groups,
StorageManager *storage_manager) const override;
@@ -109,14 +116,8 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
-
- void mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const override {
- LOG(FATAL)
- << "AggregationHandleDistinct does not support mergeGroupByHashTables";
- }
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const override;
private:
DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index 435f5f2..c2d571b 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -39,22 +39,19 @@ namespace quickstep {
class StorageManager;
AggregationHandleMax::AggregationHandleMax(const Type &type)
- : type_(type) {
- fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kGreater)
- .makeUncheckedComparatorForTypes(type,
- type.getNonNullableVersion()));
+ : type_(type), block_update_(false) {
+ fast_comparator_.reset(
+ ComparisonFactory::GetComparison(ComparisonID::kGreater)
+ .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
}
AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable(
const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
+ const std::vector<const Type *> &group_by_types,
const std::size_t estimated_num_groups,
StorageManager *storage_manager) const {
return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable(
- hash_table_impl,
- group_by_types,
- estimated_num_groups,
- storage_manager);
+ hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
}
AggregationState* AggregationHandleMax::accumulateColumnVectors(
@@ -62,9 +59,8 @@ AggregationState* AggregationHandleMax::accumulateColumnVectors(
DCHECK_EQ(1u, column_vectors.size())
<< "Got wrong number of ColumnVectors for MAX: " << column_vectors.size();
- return new AggregationStateMax(
- fast_comparator_->accumulateColumnVector(type_.getNullableVersion().makeNullValue(),
- *column_vectors.front()));
+ return new AggregationStateMax(fast_comparator_->accumulateColumnVector(
+ type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
}
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -74,10 +70,10 @@ AggregationState* AggregationHandleMax::accumulateValueAccessor(
DCHECK_EQ(1u, accessor_ids.size())
<< "Got wrong number of attributes for MAX: " << accessor_ids.size();
- return new AggregationStateMax(
- fast_comparator_->accumulateValueAccessor(type_.getNullableVersion().makeNullValue(),
- accessor,
- accessor_ids.front()));
+ return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
+ type_.getNullableVersion().makeNullValue(),
+ accessor,
+ accessor_ids.front()));
}
#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -88,66 +84,54 @@ void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
AggregationStateHashTableBase *hash_table) const {
DCHECK_EQ(1u, argument_ids.size())
<< "Got wrong number of arguments for MAX: " << argument_ids.size();
-
- aggregateValueAccessorIntoHashTableUnaryHelper<
- AggregationHandleMax,
- AggregationStateMax,
- AggregationStateHashTable<AggregationStateMax>>(
- accessor,
- argument_ids.front(),
- group_by_key_ids,
- AggregationStateMax(type_),
- hash_table);
}
-void AggregationHandleMax::mergeStates(
- const AggregationState &source,
- AggregationState *destination) const {
- const AggregationStateMax &max_source = static_cast<const AggregationStateMax&>(source);
- AggregationStateMax *max_destination = static_cast<AggregationStateMax*>(destination);
+void AggregationHandleMax::mergeStates(const AggregationState &source,
+ AggregationState *destination) const {
+ const AggregationStateMax &max_source =
+ static_cast<const AggregationStateMax &>(source);
+ AggregationStateMax *max_destination =
+ static_cast<AggregationStateMax *>(destination);
if (!max_source.max_.isNull()) {
compareAndUpdate(max_destination, max_source.max_);
}
}
+void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source,
+ std::uint8_t *destination) const {
+ const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source);
+ TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination);
+ if (!(src_max_ptr->isNull())) {
+ compareAndUpdateFast(dst_max_ptr, *src_max_ptr);
+ }
+}
+
ColumnVector* AggregationHandleMax::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
- return finalizeHashTableHelper<AggregationHandleMax,
- AggregationStateHashTable<AggregationStateMax>>(
- type_.getNullableVersion(),
- hash_table,
- group_by_keys);
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ return finalizeHashTableHelperFast<AggregationHandleMax,
+ AggregationStateFastHashTable>(
+ type_.getNullableVersion(), hash_table, group_by_keys, index);
}
-AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
+AggregationState*
+AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
AggregationHandleMax,
- AggregationStateMax>(
- distinctify_hash_table);
+ AggregationStateMax>(distinctify_hash_table);
}
void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
AggregationHandleMax,
- AggregationStateMax,
- AggregationStateHashTable<AggregationStateMax>>(
- distinctify_hash_table,
- AggregationStateMax(type_),
- aggregation_hash_table);
-}
-
-void AggregationHandleMax::mergeGroupByHashTables(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const {
- mergeGroupByHashTablesHelper<AggregationHandleMax,
- AggregationStateMax,
- AggregationStateHashTable<AggregationStateMax>>(
- source_hash_table, destination_hash_table);
+ AggregationStateFastHashTable>(
+ distinctify_hash_table, aggregation_hash_table, index);
}
} // namespace quickstep