You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/06 01:47:15 UTC
[10/10] incubator-quickstep git commit: - Adds
CollisionFreeVectorTable to support specialized fast path aggregation for
range-bounded single integer group-by key. - Supports copy elision for
aggregation.
- Adds CollisionFreeVectorTable to support specialized fast path aggregation for range-bounded single integer group-by key.
- Supports copy elision for aggregation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c70485b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c70485b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c70485b1
Branch: refs/heads/collision-free-agg
Commit: c70485b1c116b1b64b1c1876ff2699f07716cb6c
Parents: 27a8055
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Jan 30 14:46:39 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Sun Feb 5 15:14:26 2017 -0600
----------------------------------------------------------------------
.../aggregation/AggregateFunctionCount.cpp | 6 +-
.../aggregation/AggregationConcreteHandle.cpp | 32 +-
.../aggregation/AggregationConcreteHandle.hpp | 158 +-
expressions/aggregation/AggregationHandle.hpp | 193 +-
.../aggregation/AggregationHandleAvg.cpp | 95 +-
.../aggregation/AggregationHandleAvg.hpp | 115 +-
.../aggregation/AggregationHandleCount.cpp | 147 +-
.../aggregation/AggregationHandleCount.hpp | 154 +-
.../aggregation/AggregationHandleDistinct.cpp | 81 -
.../aggregation/AggregationHandleDistinct.hpp | 130 -
.../aggregation/AggregationHandleMax.cpp | 91 +-
.../aggregation/AggregationHandleMax.hpp | 111 +-
.../aggregation/AggregationHandleMin.cpp | 91 +-
.../aggregation/AggregationHandleMin.hpp | 120 +-
.../aggregation/AggregationHandleSum.cpp | 93 +-
.../aggregation/AggregationHandleSum.hpp | 114 +-
expressions/aggregation/CMakeLists.txt | 58 +-
.../tests/AggregationHandleAvg_unittest.cpp | 110 +-
.../tests/AggregationHandleCount_unittest.cpp | 145 +-
.../tests/AggregationHandleMax_unittest.cpp | 158 +-
.../tests/AggregationHandleMin_unittest.cpp | 158 +-
.../tests/AggregationHandleSum_unittest.cpp | 109 +-
query_execution/QueryContext.hpp | 14 -
query_optimizer/CMakeLists.txt | 3 +
query_optimizer/ExecutionGenerator.cpp | 153 +-
query_optimizer/ExecutionGenerator.hpp | 20 +-
relational_operators/CMakeLists.txt | 15 +
.../DestroyAggregationStateOperator.cpp | 7 -
.../FinalizeAggregationOperator.cpp | 16 +-
.../FinalizeAggregationOperator.hpp | 14 +-
.../InitializeAggregationOperator.cpp | 72 +
.../InitializeAggregationOperator.hpp | 122 +
relational_operators/WorkOrderFactory.cpp | 1 +
storage/AggregationOperationState.cpp | 834 +++---
storage/AggregationOperationState.hpp | 178 +-
storage/CMakeLists.txt | 131 +-
storage/CollisionFreeVectorTable.cpp | 285 +++
storage/CollisionFreeVectorTable.hpp | 730 ++++++
storage/FastHashTable.hpp | 2403 ------------------
storage/FastHashTableFactory.hpp | 224 --
storage/FastSeparateChainingHashTable.hpp | 1551 -----------
storage/HashTable.proto | 7 +-
storage/HashTableBase.hpp | 42 +-
storage/HashTableFactory.hpp | 63 +-
storage/HashTablePool.hpp | 79 +-
storage/PackedPayloadHashTable.cpp | 463 ++++
storage/PackedPayloadHashTable.hpp | 995 ++++++++
storage/PartitionedHashTablePool.hpp | 56 +-
storage/StorageBlock.cpp | 274 +-
storage/StorageBlock.hpp | 167 --
storage/ValueAccessorMultiplexer.hpp | 145 ++
.../BarrieredReadWriteConcurrentBitVector.hpp | 282 ++
utility/CMakeLists.txt | 7 +
53 files changed, 4724 insertions(+), 7098 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c70485b1/expressions/aggregation/AggregateFunctionCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregateFunctionCount.cpp b/expressions/aggregation/AggregateFunctionCount.cpp
index 466ff2f..9795b4a 100644
--- a/expressions/aggregation/AggregateFunctionCount.cpp
+++ b/expressions/aggregation/AggregateFunctionCount.cpp
@@ -53,16 +53,16 @@ AggregationHandle* AggregateFunctionCount::createHandle(
if (argument_types.empty()) {
// COUNT(*)
- return new AggregationHandleCount<true, false>();
+ return new AggregationHandleCount<true, false>(nullptr);
} else if (argument_types.front()->isNullable()) {
// COUNT(some_nullable_argument)
- return new AggregationHandleCount<false, true>();
+ return new AggregationHandleCount<false, true>(argument_types.front());
} else {
// COUNT(non_nullable_argument)
//
// TODO(chasseur): Modify query optimizer to optimize-away COUNT with a
// non-nullable argument and convert it to COUNT(*).
- return new AggregationHandleCount<false, false>();
+ return new AggregationHandleCount<false, false>(argument_types.front());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c70485b1/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index e3fb520..fa21056 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -22,16 +22,14 @@
#include <cstddef>
#include <vector>
-#include "catalog/CatalogTypedefs.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTable.hpp"
#include "storage/HashTableFactory.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
namespace quickstep {
class StorageManager;
class Type;
-class ValueAccessor;
AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashTable(
const HashTableImplType hash_table_impl,
@@ -39,30 +37,26 @@ AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashT
const std::size_t estimated_num_distinct_keys,
StorageManager *storage_manager) const {
// Create a hash table with key types as key_types and value type as bool.
- return AggregationStateHashTableFactory<bool>::CreateResizable(
+ return AggregationStateHashTableFactory::CreateResizable(
hash_table_impl,
key_types,
estimated_num_distinct_keys,
+ {},
storage_manager);
}
void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &key_ids,
+ const std::vector<MultiSourceAttributeId> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ const ValueAccessorMultiplexer &accessor_mux,
AggregationStateHashTableBase *distinctify_hash_table) const {
- // If the key-value pair is already there, we don't need to update the value,
- // which should always be "true". I.e. the value is just a placeholder.
-
- AggregationStateFastHashTable *hash_table =
- static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
- if (key_ids.size() == 1) {
- hash_table->upsertValueAccessorFast(
- key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
- } else {
- std::vector<attribute_id> empty_args {kInvalidAttributeID};
- hash_table->upsertValueAccessorCompositeKeyFast(
- empty_args, accessor, key_ids, true /* check_for_null_keys */);
+ std::vector<MultiSourceAttributeId> concatenated_ids(key_ids);
+ for (const MultiSourceAttributeId &arg_id : argument_ids) {
+ concatenated_ids.emplace_back(arg_id);
}
+
+ static_cast<PackedPayloadHashTable *>(distinctify_hash_table)
+ ->upsertValueAccessorCompositeKey({}, concatenated_ids, accessor_mux);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c70485b1/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 398a032..8f47105 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -21,14 +21,15 @@
#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
#include <cstddef>
+#include <cstdint>
#include <utility>
#include <vector>
-#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTable.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
#include "storage/HashTableBase.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
#include "threading/SpinMutex.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
@@ -40,7 +41,6 @@ namespace quickstep {
class StorageManager;
class Type;
-class ValueAccessor;
/** \addtogroup Expressions
* @{
@@ -51,7 +51,7 @@ class ValueAccessor;
* merging two group by hash tables.
**/
template <typename HandleT>
-class HashTableStateUpserterFast {
+class HashTableStateUpserter {
public:
/**
* @brief Constructor.
@@ -61,7 +61,7 @@ class HashTableStateUpserterFast {
* table. The corresponding state (for the same key) in the destination
* hash table will be upserted.
**/
- HashTableStateUpserterFast(const HandleT &handle,
+ HashTableStateUpserter(const HandleT &handle,
const std::uint8_t *source_state)
: handle_(handle), source_state_(source_state) {}
@@ -72,14 +72,14 @@ class HashTableStateUpserterFast {
* table that is being upserted.
**/
void operator()(std::uint8_t *destination_state) {
- handle_.mergeStatesFast(source_state_, destination_state);
+ handle_.mergeStates(source_state_, destination_state);
}
private:
const HandleT &handle_;
const std::uint8_t *source_state_;
- DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast);
+ DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter);
};
/**
@@ -93,74 +93,62 @@ class HashTableStateUpserterFast {
**/
class AggregationConcreteHandle : public AggregationHandle {
public:
- /**
- * @brief Default implementaion for AggregationHandle::accumulateNullary().
- */
- AggregationState* accumulateNullary(
- const std::size_t num_tuples) const override {
- LOG(FATAL) << "Called accumulateNullary on an AggregationHandle that "
- << "takes at least one argument.";
- }
-
- /**
- * @brief Implementaion for AggregationHandle::createDistinctifyHashTable()
- * that creates a new HashTable for the distinctify step for
- * DISTINCT aggregation.
- */
AggregationStateHashTableBase* createDistinctifyHashTable(
const HashTableImplType hash_table_impl,
const std::vector<const Type *> &key_types,
const std::size_t estimated_num_distinct_keys,
StorageManager *storage_manager) const override;
- /**
- * @brief Implementaion for
- * AggregationHandle::insertValueAccessorIntoDistinctifyHashTable()
- * that inserts the GROUP BY expressions and aggregation arguments together
- * as keys into the distinctify hash table.
- */
void insertValueAccessorIntoDistinctifyHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &key_ids,
+ const std::vector<MultiSourceAttributeId> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ const ValueAccessorMultiplexer &accessor_mux,
AggregationStateHashTableBase *distinctify_hash_table) const override;
+ void blockUpdate() override {
+ block_update_ = true;
+ }
+
+ void allowUpdate() override {
+ block_update_ = false;
+ }
+
protected:
- AggregationConcreteHandle() {}
+ explicit AggregationConcreteHandle(const AggregationID agg_id)
+ : AggregationHandle(agg_id),
+ block_update_(false) {}
template <typename HandleT, typename StateT>
- StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+ StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelper(
const AggregationStateHashTableBase &distinctify_hash_table) const;
- template <typename HandleT, typename HashTableT>
- void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+ template <typename HandleT>
+ void aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *hash_table,
- std::size_t index) const;
+ const std::size_t index,
+ AggregationStateHashTableBase *hash_table) const;
- template <typename HandleT, typename HashTableT>
- ColumnVector* finalizeHashTableHelperFast(
+ template <typename HandleT>
+ ColumnVector* finalizeHashTableHelper(
const Type &result_type,
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const;
+ const std::size_t index,
+ std::vector<std::vector<TypedValue>> *group_by_keys) const;
- template <typename HandleT, typename HashTableT>
- inline TypedValue finalizeGroupInHashTableFast(
+ template <typename HandleT>
+ inline TypedValue finalizeGroupInHashTable(
const AggregationStateHashTableBase &hash_table,
- const std::vector<TypedValue> &group_key,
- int index) const {
+ const std::size_t index,
+ const std::vector<TypedValue> &group_key) const {
const std::uint8_t *group_state =
- static_cast<const HashTableT &>(hash_table).getSingleCompositeKey(group_key, index);
+ static_cast<const PackedPayloadHashTable &>(hash_table)
+ .getSingleCompositeKey(group_key, index);
DCHECK(group_state != nullptr)
<< "Could not find entry for specified group_key in HashTable";
- return static_cast<const HandleT *>(this)->finalizeHashTableEntryFast(
- group_state);
+ return static_cast<const HandleT *>(this)->finalizeHashTableEntry(group_state);
}
- template <typename HandleT, typename HashTableT>
- void mergeGroupByHashTablesHelperFast(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const;
+ bool block_update_;
private:
DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle);
@@ -185,17 +173,10 @@ class HashTableAggregateFinalizer {
output_column_vector_(output_column_vector) {}
inline void operator()(const std::vector<TypedValue> &group_by_key,
- const AggregationState &group_state) {
- group_by_keys_->emplace_back(group_by_key);
- output_column_vector_->appendTypedValue(
- handle_.finalizeHashTableEntry(group_state));
- }
-
- inline void operator()(const std::vector<TypedValue> &group_by_key,
- const unsigned char *byte_ptr) {
+ const std::uint8_t *byte_ptr) {
group_by_keys_->emplace_back(group_by_key);
output_column_vector_->appendTypedValue(
- handle_.finalizeHashTableEntryFast(byte_ptr));
+ handle_.finalizeHashTableEntry(byte_ptr));
}
private:
@@ -211,7 +192,7 @@ class HashTableAggregateFinalizer {
template <typename HandleT, typename StateT>
StateT* AggregationConcreteHandle::
- aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+ aggregateOnDistinctifyHashTableForSingleUnaryHelper(
const AggregationStateHashTableBase &distinctify_hash_table) const {
const HandleT &handle = static_cast<const HandleT &>(*this);
StateT *state = static_cast<StateT *>(createInitialState());
@@ -219,15 +200,14 @@ StateT* AggregationConcreteHandle::
// A lambda function which will be called on each key from the distinctify
// hash table.
const auto aggregate_functor = [&handle, &state](
- const TypedValue &key, const std::uint8_t &dumb_placeholder) {
+ const TypedValue &key, const std::uint8_t *dumb_placeholder) {
// For each (unary) key in the distinctify hash table, aggregate the key
// into "state".
handle.iterateUnaryInl(state, key);
};
- const AggregationStateFastHashTable &hash_table =
- static_cast<const AggregationStateFastHashTable &>(
- distinctify_hash_table);
+ const auto &hash_table =
+ static_cast<const PackedPayloadHashTable &>(distinctify_hash_table);
// Invoke the lambda function "aggregate_functor" on each key from the
// distinctify hash table.
hash_table.forEach(&aggregate_functor);
@@ -235,20 +215,20 @@ StateT* AggregationConcreteHandle::
return state;
}
-template <typename HandleT, typename HashTableT>
+template <typename HandleT>
void AggregationConcreteHandle::
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
+ const std::size_t index,
+ AggregationStateHashTableBase *aggregation_hash_table) const {
const HandleT &handle = static_cast<const HandleT &>(*this);
- HashTableT *target_hash_table =
- static_cast<HashTableT *>(aggregation_hash_table);
+ PackedPayloadHashTable *target_hash_table =
+ static_cast<PackedPayloadHashTable *>(aggregation_hash_table);
// A lambda function which will be called on each key-value pair from the
// distinctify hash table.
const auto aggregate_functor = [&handle, &target_hash_table, &index](
- std::vector<TypedValue> &key, const bool &dumb_placeholder) {
+ std::vector<TypedValue> &key, const std::uint8_t *dumb_placeholder) {
// For each (composite) key vector in the distinctify hash table with size N.
// The first N-1 entries are GROUP BY columns and the last entry is the
// argument to be aggregated on.
@@ -258,28 +238,28 @@ void AggregationConcreteHandle::
// An upserter as lambda function for aggregating the argument into its
// GROUP BY group's entry inside aggregation_hash_table.
const auto upserter = [&handle, &argument](std::uint8_t *state) {
- handle.iterateUnaryInlFast(argument, state);
+ handle.iterateUnaryInl(argument, state);
};
- target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index);
+ target_hash_table->upsertCompositeKey(key, &upserter, index);
};
- const HashTableT &source_hash_table =
- static_cast<const HashTableT &>(distinctify_hash_table);
+ const PackedPayloadHashTable &source_hash_table =
+ static_cast<const PackedPayloadHashTable &>(distinctify_hash_table);
// Invoke the lambda function "aggregate_functor" on each composite key vector
// from the distinctify hash table.
- source_hash_table.forEachCompositeKeyFast(&aggregate_functor);
+ source_hash_table.forEachCompositeKey(&aggregate_functor);
}
-template <typename HandleT, typename HashTableT>
-ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
+template <typename HandleT>
+ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
const Type &result_type,
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
+ const std::size_t index,
+ std::vector<std::vector<TypedValue>> *group_by_keys) const {
const HandleT &handle = static_cast<const HandleT &>(*this);
- const HashTableT &hash_table_concrete =
- static_cast<const HashTableT &>(hash_table);
+ const PackedPayloadHashTable &hash_table_concrete =
+ static_cast<const PackedPayloadHashTable &>(hash_table);
if (group_by_keys->empty()) {
if (NativeColumnVector::UsableForType(result_type)) {
@@ -287,14 +267,14 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
new NativeColumnVector(result_type, hash_table_concrete.numEntries());
HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer(
handle, group_by_keys, result);
- hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
+ hash_table_concrete.forEachCompositeKey(&finalizer, index);
return result;
} else {
IndirectColumnVector *result = new IndirectColumnVector(
result_type, hash_table_concrete.numEntries());
HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer(
handle, group_by_keys, result);
- hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
+ hash_table_concrete.forEachCompositeKey(&finalizer, index);
return result;
}
} else {
@@ -303,8 +283,8 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
new NativeColumnVector(result_type, group_by_keys->size());
for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
result->appendTypedValue(
- finalizeGroupInHashTableFast<HandleT, HashTableT>(
- hash_table, group_by_key, index));
+ finalizeGroupInHashTable<HandleT>(
+ hash_table, index, group_by_key));
}
return result;
} else {
@@ -312,8 +292,8 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
result_type, hash_table_concrete.numEntries());
for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
result->appendTypedValue(
- finalizeGroupInHashTableFast<HandleT, HashTableT>(
- hash_table, group_by_key, index));
+ finalizeGroupInHashTable<HandleT>(
+ hash_table, index, group_by_key));
}
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c70485b1/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 4b51179..9c7b166 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -21,20 +21,21 @@
#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_HPP_
#include <cstddef>
-#include <memory>
#include <vector>
-#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
#include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
#include "types/TypedValue.hpp"
#include "utility/Macros.hpp"
+#include "glog/logging.h"
+
namespace quickstep {
class ColumnVector;
class StorageManager;
class Type;
-class ValueAccessor;
/** \addtogroup Expressions
* @{
@@ -109,34 +110,34 @@ class AggregationHandle {
virtual ~AggregationHandle() {}
/**
- * @brief Create an initial "blank" state for this aggregation.
+ * @brief Get the ID of this aggregation.
*
- * @return An initial "blank" state for this particular aggregation.
+ * @return The AggregationID of this AggregationHandle.
**/
- virtual AggregationState* createInitialState() const = 0;
+ inline AggregationID getAggregationID() const {
+ return agg_id_;
+ }
/**
- * @brief Create a new HashTable for aggregation with GROUP BY.
+ * @brief Get the list of Types (in order) for arguments to this aggregation.
*
- * @param hash_table_impl The choice of which concrete HashTable
- * implementation to use.
- * @param group_by_types The types of the GROUP BY columns/expressions. These
- * correspond to the (composite) key type for the HashTable.
- * @param estimated_num_groups The estimated number of distinct groups for
- * the GROUP BY aggregation. This is used to size the initial
- * HashTable. This is an estimate only, and the HashTable will be
- * resized if it becomes over-full.
- * @param storage_manager The StorageManager to use to create the HashTable.
- * A StorageBlob will be allocated to serve as the HashTable's
- * in-memory storage.
- * @return A new HashTable instance with the appropriate state type for this
- * aggregate.
+ * @return The list of Types for arguments to this aggregation.
+ */
+ virtual std::vector<const Type *> getArgumentTypes() const = 0;
+
+ /**
+ * @brief Get the result Type of this aggregation.
+ *
+ * @return The result Type of this aggregation.
+ */
+ virtual const Type* getResultType() const = 0;
+
+ /**
+ * @brief Create an initial "blank" state for this aggregation.
+ *
+ * @return An initial "blank" state for this particular aggregation.
**/
- virtual AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const = 0;
+ virtual AggregationState* createInitialState() const = 0;
/**
* @brief Accumulate over tuples for a nullary aggregate function (one that
@@ -146,70 +147,31 @@ class AggregationHandle {
* data is accessed, the only thing that a nullary aggeregate can know
* about input is its cardinality.
* @return A new AggregationState which contains the accumulated results from
- * applying the (nullary) aggregate to the specified number of
- * tuples.
+ * applying the (nullary) aggregate to the specified number of tuples.
**/
virtual AggregationState* accumulateNullary(
- const std::size_t num_tuples) const = 0;
-
- /**
- * @brief Accumulate (iterate over) all values in one or more ColumnVectors
- * and return a new AggregationState which can be merged with other
- * states or finalized.
- *
- * @param column_vectors One or more ColumnVectors that the aggregate will be
- * applied to. These correspond to the aggregate function's arguments,
- * in order.
- * @return A new AggregationState which contains the accumulated results from
- * applying the aggregate to column_vectors. Caller is responsible
- * for deleting the returned AggregationState.
- **/
- virtual AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const = 0;
+ const std::size_t num_tuples) const {
+ LOG(FATAL) << "Called accumulateNullary on an AggregationHandle that "
+ << "takes at least one argument.";
+ }
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
/**
* @brief Accumulate (iterate over) all values in columns accessible through
- * a ValueAccessor and return a new AggregationState which can be
- * merged with other states or finalized.
+ * the ValueAccessors from a ValueAccessorMultiplexer and return a new
+ * AggregationState which can be merged with other states or finalized.
*
- * @param accessor A ValueAccessor that the columns to be aggregated can be
- * accessed through.
- * @param accessor_ids The attribute_ids that correspond to the columns in
- * accessor to aggeregate. These correspond to the aggregate
- * function's arguments, in order.
+ * @param argument_ids The multi-source attribute ids that correspond to the
+ * columns in \p accessor_mux to aggeregate. These correspond to the
+ * aggregate function's arguments, in order.
+ * @param accessor_mux A ValueAccessorMultiplexer object that contains the
+ * ValueAccessors.
* @return A new AggregationState which contains the accumulated results from
- * applying the aggregate to the specified columns in accessor.
+ * applying the aggregate to the specified columns.
* Caller is responsible for deleting the returned AggregationState.
**/
virtual AggregationState* accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const = 0;
-#endif
-
- /**
- * @brief Perform an aggregation with GROUP BY over all the tuples accessible
- * through a ValueAccessor, upserting states in a HashTable.
- *
- * @note Implementations of this method are threadsafe with respect to
- * hash_table, and can be called concurrently from multiple threads
- * with the same HashTable object.
- *
- * @param accessor The ValueAccessor that will be iterated over to read
- * tuples.
- * @param argument_ids The attribute_ids of the arguments to this aggregate
- * in accessor, in order.
- * @param group_by_key_ids The attribute_ids of the group-by
- * columns/expressions in accessor.
- * @param hash_table The HashTable to upsert AggregationStates in. This
- * should have been created by calling createGroupByHashTable() on
- * this same AggregationHandle.
- **/
- virtual void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const = 0;
+ const std::vector<MultiSourceAttributeId> &argument_ids,
+ const ValueAccessorMultiplexer &accessor_mux) const = 0;
/**
* @brief Merge two AggregationStates, updating one in-place. This computes a
@@ -253,24 +215,24 @@ class AggregationHandle {
* @param hash_table The HashTable to finalize states from. This should have
* have been created by calling createGroupByHashTable() on this same
* AggregationHandle.
+ * @param index The index of the AggregationHandle to be finalized.
* @param group_by_keys A pointer to a vector of vectors of GROUP BY keys. If
* this is initially empty, it will be filled in with the GROUP BY
* keys visited by this method in the same order as the finalized
* values returned in the ColumnVector. If this is already filled in,
* then this method will visit the GROUP BY keys in the exact order
* specified.
- * @param index The index of the AggregationHandle to be finalized.
*
* @return A ColumnVector containing each group's finalized aggregate value.
**/
virtual ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const = 0;
+ const std::size_t index,
+ std::vector<std::vector<TypedValue>> *group_by_keys) const = 0;
/**
* @brief Create a new HashTable for the distinctify step for DISTINCT
- * aggregation.
+ * aggregation.
*
* Distinctify is the first step for DISTINCT aggregation. This step inserts
* the GROUP BY expression values and aggregation arguments together as keys
@@ -283,8 +245,8 @@ class AggregationHandle {
* we simply treat it as a special GROUP BY case that the GROUP BY expression
* vector is empty.
*
- * @param hash_table_impl The choice of which concrete HashTable
- * implementation to use.
+ * @param hash_table_impl The choice of which concrete HashTable implementation
+ * to use.
* @param key_types The types of the GROUP BY expressions together with the
* types of the aggregation arguments.
* @param estimated_num_distinct_keys The estimated number of distinct keys
@@ -307,13 +269,14 @@ class AggregationHandle {
/**
* @brief Inserts the GROUP BY expressions and aggregation arguments together
- * as keys into the distinctify hash table.
+ * as keys into the distinctify hash table.
*
- * @param accessor The ValueAccessor that will be iterated over to read
- * tuples.
- * @param key_ids The attribute_ids of the GROUP BY expressions in accessor
- * together with the attribute_ids of the arguments to this aggregate
- * in accessor, in order.
+ * @param argument_ids The argument ids that correspond to the columns in
+ * \p accessor_mux.
+ * @param key_ids The group-by key ids that correspond to the columns in
+ * \p accessor_mux.
+ * @param accessor_mux A ValueAccessorMultiplexer object that contains the
+ * ValueAccessors to be iterated over to read tuples.
* @param distinctify_hash_table The HashTable to store the GROUP BY
* expressions and the aggregation arguments together as hash table
* keys and a bool constant \c true as hash table value (So the hash
@@ -321,13 +284,14 @@ class AggregationHandle {
* by calling createDistinctifyHashTable();
*/
virtual void insertValueAccessorIntoDistinctifyHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &key_ids,
+ const std::vector<MultiSourceAttributeId> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ const ValueAccessorMultiplexer &accessor_mux,
AggregationStateHashTableBase *distinctify_hash_table) const = 0;
/**
* @brief Perform single (i.e. without GROUP BY) aggregation on the keys from
- * the distinctify hash table to actually compute the aggregated results.
+ * the distinctify hash table to actually compute the aggregated results.
*
* @param distinctify_hash_table Hash table which stores the distinctified
* aggregation arguments as hash table keys. This should have been
@@ -346,25 +310,26 @@ class AggregationHandle {
* @param distinctify_hash_table Hash table which stores the GROUP BY
* expression values and aggregation arguments together as hash table
* keys.
+ * @param index The index of the AggregationHandle to perform aggregation.
* @param aggregation_hash_table The HashTable to upsert AggregationStates in.
* This should have been created by calling createGroupByHashTable() on
* this same AggregationHandle.
- * @param index The index of the distinctify hash table for which we perform
- * the DISTINCT aggregation.
*/
virtual void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const = 0;
+ const std::size_t index,
+ AggregationStateHashTableBase *aggregation_hash_table) const = 0;
/**
* @brief Get the number of bytes needed to store the aggregation handle's
* state.
**/
- virtual std::size_t getPayloadSize() const { return 1; }
+ virtual std::size_t getPayloadSize() const {
+ return 1u;
+ }
/**
- * @brief Update the aggregation state for nullary aggregation function e.g.
+ * @brief Update the aggregation state for nullary aggregation function, e.g.
* COUNT(*).
*
* @note This function should be overloaded by those aggregation function
@@ -372,7 +337,10 @@ class AggregationHandle {
*
* @param byte_ptr The pointer where the aggregation state is stored.
**/
- virtual void updateStateNullary(std::uint8_t *byte_ptr) const {}
+ virtual void updateStateNullary(std::uint8_t *byte_ptr) const {
+ LOG(FATAL) << "Called updateStateNullary on an AggregationHandle that "
+ << "takes at least one argument.";
+ }
/**
* @brief Update the aggregation state for unary aggregation function e.g.
@@ -383,7 +351,7 @@ class AggregationHandle {
* @param byte_ptr The pointer where the aggregation state is stored.
**/
virtual void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const {}
+ std::uint8_t *byte_ptr) const = 0;
/**
* @brief Merge two aggregation states for this aggregation handle.
@@ -394,8 +362,8 @@ class AggregationHandle {
* @param src A pointer to the source aggregation state.
* @param dst A pointer to the destination aggregation state.
**/
- virtual void mergeStatesFast(const std::uint8_t *src,
- std::uint8_t *dst) const {}
+ virtual void mergeStates(const std::uint8_t *src,
+ std::uint8_t *dst) const = 0;
/**
* @brief Initialize the payload (in the aggregation hash table) for the given
@@ -403,7 +371,7 @@ class AggregationHandle {
*
* @param byte_ptr The pointer to the aggregation state in the hash table.
**/
- virtual void initPayload(std::uint8_t *byte_ptr) const {}
+ virtual void initPayload(std::uint8_t *byte_ptr) const = 0;
/**
* @brief Destroy the payload (in the aggregation hash table) for the given
@@ -411,22 +379,25 @@ class AggregationHandle {
*
* @param byte_ptr The pointer to the aggregation state in the hash table.
**/
- virtual void destroyPayload(std::uint8_t *byte_ptr) const {}
+ virtual void destroyPayload(std::uint8_t *byte_ptr) const = 0;
/**
* @brief Inform the aggregation handle to block (prohibit) updates on the
* aggregation state.
**/
- virtual void blockUpdate() {}
+ virtual void blockUpdate() = 0;
/**
- * @brief Inform the aggregation handle to allow updates on the
- * aggregation state.
+ * @brief Inform the aggregation handle to allow updates on the aggregation
+ * state.
**/
- virtual void allowUpdate() {}
+ virtual void allowUpdate() = 0;
protected:
- AggregationHandle() {}
+ explicit AggregationHandle(const AggregationID agg_id)
+ : agg_id_(agg_id) {}
+
+ const AggregationID agg_id_;
private:
DISALLOW_COPY_AND_ASSIGN(AggregationHandle);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c70485b1/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 2481092..46bec1e 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -20,12 +20,13 @@
#include "expressions/aggregation/AggregationHandleAvg.hpp"
#include <cstddef>
+#include <cstdint>
#include <memory>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypeFactory.hpp"
@@ -39,10 +40,11 @@
namespace quickstep {
-class StorageManager;
+class ColumnVector;
AggregationHandleAvg::AggregationHandleAvg(const Type &type)
- : argument_type_(type), block_update_(false) {
+ : AggregationConcreteHandle(AggregationID::kAvg),
+ argument_type_(type) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_precision_id;
@@ -87,52 +89,29 @@ AggregationHandleAvg::AggregationHandleAvg(const Type &type)
->getNullableVersion());
}
-AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable(
- hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
+AggregationState* AggregationHandleAvg::accumulateValueAccessor(
+ const std::vector<MultiSourceAttributeId> &argument_ids,
+ const ValueAccessorMultiplexer &accessor_mux) const {
+ DCHECK_EQ(1u, argument_ids.size())
+ << "Got wrong number of attributes for AVG: " << argument_ids.size();
-AggregationState* AggregationHandleAvg::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size();
+ const ValueAccessorSource argument_source = argument_ids.front().source;
+ const attribute_id argument_id = argument_ids.front().attr_id;
- AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
- std::size_t count = 0;
- state->sum_ = fast_add_operator_->accumulateColumnVector(
- state->sum_, *column_vectors.front(), &count);
- state->count_ = count;
- return state;
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleAvg::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for AVG: " << accessor_ids.size();
+ DCHECK(argument_source != ValueAccessorSource::kInvalid);
+ DCHECK_NE(argument_id, kInvalidAttributeID);
AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
std::size_t count = 0;
- state->sum_ = fast_add_operator_->accumulateValueAccessor(
- state->sum_, accessor, accessor_ids.front(), &count);
+ state->sum_ =
+ fast_add_operator_->accumulateValueAccessor(
+ state->sum_,
+ accessor_mux.getValueAccessorBySource(argument_source),
+ argument_id,
+ &count);
state->count_ = count;
return state;
}
-#endif
-
-void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- DCHECK_EQ(1u, argument_ids.size())
- << "Got wrong number of arguments for AVG: " << argument_ids.size();
-}
void AggregationHandleAvg::mergeStates(const AggregationState &source,
AggregationState *destination) const {
@@ -147,8 +126,8 @@ void AggregationHandleAvg::mergeStates(const AggregationState &source,
avg_destination->sum_, avg_source.sum_);
}
-void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const {
+void AggregationHandleAvg::mergeStates(const std::uint8_t *source,
+ std::uint8_t *destination) const {
const TypedValue *src_sum_ptr =
reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(
@@ -177,29 +156,25 @@ TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
ColumnVector* AggregationHandleAvg::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- return finalizeHashTableHelperFast<AggregationHandleAvg,
- AggregationStateFastHashTable>(
- *result_type_, hash_table, group_by_keys, index);
+ const std::size_t index,
+ std::vector<std::vector<TypedValue>> *group_by_keys) const {
+ return finalizeHashTableHelper<AggregationHandleAvg>(
+ *result_type_, hash_table, index, group_by_keys);
}
-AggregationState*
-AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
+AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
- AggregationHandleAvg,
- AggregationStateAvg>(distinctify_hash_table);
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ AggregationHandleAvg, AggregationStateAvg>(
+ distinctify_hash_table);
}
void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
- AggregationHandleAvg,
- AggregationStateFastHashTable>(
- distinctify_hash_table, aggregation_hash_table, index);
+ const std::size_t index,
+ AggregationStateHashTableBase *aggregation_hash_table) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleAvg>(
+ distinctify_hash_table, index, aggregation_hash_table);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c70485b1/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 47132c6..970561c 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -25,11 +25,9 @@
#include <memory>
#include <vector>
-#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
@@ -40,9 +38,8 @@
namespace quickstep {
+class AggregationStateHashTableBase;
class ColumnVector;
-class StorageManager;
-class ValueAccessor;
/** \addtogroup Expressions
* @{
@@ -106,19 +103,18 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
public:
~AggregationHandleAvg() override {}
+ std::vector<const Type *> getArgumentTypes() const override {
+ return {&argument_type_};
+ }
+
+ const Type* getResultType() const override {
+ return result_type_;
+ }
+
AggregationState* createInitialState() const override {
return new AggregationStateAvg(blank_state_);
}
- AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const override;
-
- /**
- * @brief Iterate method with average aggregation state.
- **/
inline void iterateUnaryInl(AggregationStateAvg *state,
const TypedValue &value) const {
DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
@@ -129,8 +125,8 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
++state->count_;
}
- inline void iterateUnaryInlFast(const TypedValue &value,
- std::uint8_t *byte_ptr) const {
+ inline void iterateUnaryInl(const TypedValue &value,
+ std::uint8_t *byte_ptr) const {
DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
if (value.isNull()) return;
TypedValue *sum_ptr =
@@ -141,16 +137,18 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
++(*count_ptr);
}
- inline void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const override {
- if (!block_update_) {
- iterateUnaryInlFast(argument, byte_ptr);
- }
- }
+ AggregationState* accumulateValueAccessor(
+ const std::vector<MultiSourceAttributeId> &argument_ids,
+ const ValueAccessorMultiplexer &accessor_mux) const override;
- void blockUpdate() override { block_update_ = true; }
+ void mergeStates(const AggregationState &source,
+ AggregationState *destination) const override;
- void allowUpdate() override { block_update_ = false; }
+ TypedValue finalize(const AggregationState &state) const override;
+
+ std::size_t getPayloadSize() const override {
+ return blank_state_.getPayloadSize();
+ }
void initPayload(std::uint8_t *byte_ptr) const override {
TypedValue *sum_ptr =
@@ -169,43 +167,17 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
}
}
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_id) const override;
-#endif
-
- void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const override;
-
- void mergeStates(const AggregationState &source,
- AggregationState *destination) const override;
-
- void mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const override;
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
+ if (!block_update_) {
+ iterateUnaryInl(argument, byte_ptr);
+ }
+ }
- TypedValue finalize(const AggregationState &state) const override;
+ void mergeStates(const std::uint8_t *source,
+ std::uint8_t *destination) const override;
inline TypedValue finalizeHashTableEntry(
- const AggregationState &state) const {
- const AggregationStateAvg &agg_state =
- static_cast<const AggregationStateAvg &>(state);
- // TODO(chasseur): Could improve performance further if we made a special
- // version of finalizeHashTable() that collects all the sums into one
- // ColumnVector and all the counts into another and then applies
- // '*divide_operator_' to them in bulk.
- return divide_operator_->applyToTypedValues(
- agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
- }
-
- inline TypedValue finalizeHashTableEntryFast(
const std::uint8_t *byte_ptr) const {
std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
TypedValue *sum_ptr =
@@ -218,31 +190,16 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const override;
+ const std::size_t index,
+ std::vector<std::vector<TypedValue>> *group_by_keys) const override;
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
- * for AVG aggregation.
- */
AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table)
- const override;
+ const AggregationStateHashTableBase &distinctify_hash_table) const override;
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
- * for AVG aggregation.
- */
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const override;
-
- std::size_t getPayloadSize() const override {
- return blank_state_.getPayloadSize();
- }
+ const std::size_t index,
+ AggregationStateHashTableBase *aggregation_hash_table) const override;
private:
friend class AggregateFunctionAvg;
@@ -261,8 +218,6 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
- bool block_update_;
-
DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c70485b1/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 034c942..cf92ec7 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -21,100 +21,48 @@
#include <atomic>
#include <cstddef>
-#include <memory>
+#include <cstdint>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
#include "storage/ValueAccessorUtil.hpp"
-#endif
-
#include "types/TypeFactory.hpp"
#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
-#include "types/containers/ColumnVector.hpp"
-#include "types/containers/ColumnVectorUtil.hpp"
#include "glog/logging.h"
namespace quickstep {
-class StorageManager;
-class Type;
-class ValueAccessor;
+class ColumnVector;
template <bool count_star, bool nullable_type>
-AggregationStateHashTableBase*
-AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return AggregationStateHashTableFactory<
- AggregationStateCount>::CreateResizable(hash_table_impl,
- group_by_types,
- estimated_num_groups,
- storage_manager);
-}
-
-template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+AggregationState* AggregationHandleCount<count_star, nullable_type>
+ ::accumulateValueAccessor(
+ const std::vector<MultiSourceAttributeId> &argument_ids,
+ const ValueAccessorMultiplexer &accessor_mux) const {
DCHECK(!count_star)
<< "Called non-nullary accumulation method on an AggregationHandleCount "
<< "set up for nullary COUNT(*)";
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for COUNT: "
- << column_vectors.size();
+ DCHECK_EQ(1u, argument_ids.size())
+ << "Got wrong number of attributes for COUNT: " << argument_ids.size();
- std::size_t count = 0;
- InvokeOnColumnVector(
- *column_vectors.front(),
- [&](const auto &column_vector) -> void { // NOLINT(build/c++11)
- if (nullable_type) {
- // TODO(shoban): Iterating over the ColumnVector is a rather slow way
- // to do this. We should look at extending the ColumnVector interface
- // to do a quick count of the non-null values (i.e. the length minus
- // the population count of the null bitmap). We should do something
- // similar for ValueAccessor too.
- for (std::size_t pos = 0; pos < column_vector.size(); ++pos) {
- count += !column_vector.getTypedValue(pos).isNull();
- }
- } else {
- count = column_vector.size();
- }
- });
+ const ValueAccessorSource argument_source = argument_ids.front().source;
+ const attribute_id argument_id = argument_ids.front().attr_id;
- return new AggregationStateCount(count);
-}
+ DCHECK(argument_source != ValueAccessorSource::kInvalid);
+ DCHECK_NE(argument_id, kInvalidAttributeID);
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK(!count_star)
- << "Called non-nullary accumulation method on an AggregationHandleCount "
- << "set up for nullary COUNT(*)";
-
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for COUNT: " << accessor_ids.size();
-
- const attribute_id accessor_id = accessor_ids.front();
std::size_t count = 0;
InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
- accessor,
- [&accessor_id, &count](auto *accessor) -> void { // NOLINT(build/c++11)
+ accessor_mux.getValueAccessorBySource(argument_source),
+ [&argument_id, &count](auto *accessor) -> void { // NOLINT(build/c++11)
if (nullable_type) {
while (accessor->next()) {
- count += !accessor->getTypedValue(accessor_id).isNull();
+ count += !accessor->getTypedValue(argument_id).isNull();
}
} else {
count = accessor->getNumTuples();
@@ -123,24 +71,6 @@ AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
return new AggregationStateCount(count);
}
-#endif
-
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::
- aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- if (count_star) {
- DCHECK_EQ(0u, argument_ids.size())
- << "Got wrong number of arguments for COUNT(*): "
- << argument_ids.size();
- } else {
- DCHECK_EQ(1u, argument_ids.size())
- << "Got wrong number of arguments for COUNT: " << argument_ids.size();
- }
-}
template <bool count_star, bool nullable_type>
void AggregationHandleCount<count_star, nullable_type>::mergeStates(
@@ -156,7 +86,7 @@ void AggregationHandleCount<count_star, nullable_type>::mergeStates(
}
template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast(
+void AggregationHandleCount<count_star, nullable_type>::mergeStates(
const std::uint8_t *source, std::uint8_t *destination) const {
const std::int64_t *src_count_ptr =
reinterpret_cast<const std::int64_t *>(source);
@@ -165,38 +95,35 @@ void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast(
}
template <bool count_star, bool nullable_type>
-ColumnVector*
-AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- return finalizeHashTableHelperFast<
- AggregationHandleCount<count_star, nullable_type>,
- AggregationStateFastHashTable>(
- TypeFactory::GetType(kLong), hash_table, group_by_keys, index);
+ColumnVector* AggregationHandleCount<count_star, nullable_type>
+ ::finalizeHashTable(
+ const AggregationStateHashTableBase &hash_table,
+ const std::size_t index,
+ std::vector<std::vector<TypedValue>> *group_by_keys) const {
+ return finalizeHashTableHelper<
+ AggregationHandleCount<count_star, nullable_type>>(
+ TypeFactory::GetType(kLong), hash_table, index, group_by_keys);
}
template <bool count_star, bool nullable_type>
-AggregationState* AggregationHandleCount<count_star, nullable_type>::
- aggregateOnDistinctifyHashTableForSingle(
+AggregationState* AggregationHandleCount<count_star, nullable_type>
+ ::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
- DCHECK_EQ(count_star, false);
- return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
AggregationHandleCount<count_star, nullable_type>,
- AggregationStateCount>(distinctify_hash_table);
+ AggregationStateCount>(
+ distinctify_hash_table);
}
template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::
- aggregateOnDistinctifyHashTableForGroupBy(
+void AggregationHandleCount<count_star, nullable_type>
+ ::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
- DCHECK_EQ(count_star, false);
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
- AggregationHandleCount<count_star, nullable_type>,
- AggregationStateFastHashTable>(
- distinctify_hash_table, aggregation_hash_table, index);
+ const std::size_t index,
+ AggregationStateHashTableBase *aggregation_hash_table) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+ AggregationHandleCount<count_star, nullable_type>>(
+ distinctify_hash_table, index, aggregation_hash_table);
}
// Explicitly instantiate and compile in the different versions of
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c70485b1/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 6aab0cd..72ea923 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -23,23 +23,21 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
-#include <memory>
#include <vector>
-#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTableBase.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "types/LongType.hpp"
#include "types/TypedValue.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
+class AggregationStateHashTableBase;
class ColumnVector;
-class StorageManager;
class Type;
-class ValueAccessor;
template <bool, bool>
class AggregationHandleCount;
@@ -98,28 +96,31 @@ class AggregationHandleCount : public AggregationConcreteHandle {
public:
~AggregationHandleCount() override {}
+ std::vector<const Type *> getArgumentTypes() const override {
+ if (argument_type_ == nullptr) {
+ return {};
+ } else {
+ return {argument_type_};
+ }
+ }
+
+ const Type* getResultType() const override {
+ return &LongType::InstanceNonNullable();
+ }
+
AggregationState* createInitialState() const override {
return new AggregationStateCount();
}
- AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const override;
-
inline void iterateNullaryInl(AggregationStateCount *state) const {
state->count_.fetch_add(1, std::memory_order_relaxed);
}
- inline void iterateNullaryInlFast(std::uint8_t *byte_ptr) const {
+ inline void iterateNullaryInl(std::uint8_t *byte_ptr) const {
std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
- (*count_ptr)++;
+ ++(*count_ptr);
}
- /**
- * @brief Iterate with count aggregation state.
- */
inline void iterateUnaryInl(AggregationStateCount *state,
const TypedValue &value) const {
if ((!nullable_type) || (!value.isNull())) {
@@ -127,118 +128,89 @@ class AggregationHandleCount : public AggregationConcreteHandle {
}
}
- inline void iterateUnaryInlFast(const TypedValue &value,
- std::uint8_t *byte_ptr) const {
+ inline void iterateUnaryInl(const TypedValue &value,
+ std::uint8_t *byte_ptr) const {
if ((!nullable_type) || (!value.isNull())) {
- std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
- (*count_ptr)++;
- }
- }
-
- inline void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const override {
- if (!block_update_) {
- iterateUnaryInlFast(argument, byte_ptr);
+ ++(*reinterpret_cast<std::int64_t *>(byte_ptr));
}
}
- inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
- if (!block_update_) {
- iterateNullaryInlFast(byte_ptr);
- }
- }
-
- void blockUpdate() override { block_update_ = true; }
-
- void allowUpdate() override { block_update_ = false; }
-
- void initPayload(std::uint8_t *byte_ptr) const override {
- std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
- *count_ptr = 0;
- }
-
AggregationState* accumulateNullary(
const std::size_t num_tuples) const override {
return new AggregationStateCount(num_tuples);
}
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
AggregationState* accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_id) const override;
-#endif
-
- void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const override;
+ const std::vector<MultiSourceAttributeId> &argument_ids,
+ const ValueAccessorMultiplexer &accessor_mux) const override;
void mergeStates(const AggregationState &source,
AggregationState *destination) const override;
- void mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const override;
-
TypedValue finalize(const AggregationState &state) const override {
return TypedValue(
static_cast<const AggregationStateCount &>(state).count_.load(
std::memory_order_relaxed));
}
- inline TypedValue finalizeHashTableEntry(
- const AggregationState &state) const {
- return TypedValue(
- static_cast<const AggregationStateCount &>(state).count_.load(
- std::memory_order_relaxed));
+ std::size_t getPayloadSize() const override {
+ return sizeof(std::int64_t);
+ }
+
+ void initPayload(std::uint8_t *byte_ptr) const override {
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+ *count_ptr = 0;
}
- inline TypedValue finalizeHashTableEntryFast(
- const std::uint8_t *byte_ptr) const {
- const std::int64_t *count_ptr =
- reinterpret_cast<const std::int64_t *>(byte_ptr);
- return TypedValue(*count_ptr);
+ void destroyPayload(std::uint8_t *byte_ptr) const override {}
+
+ inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
+ if (!block_update_) {
+ iterateNullaryInl(byte_ptr);
+ }
+ }
+
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
+ if (!block_update_) {
+ iterateUnaryInl(argument, byte_ptr);
+ }
+ }
+
+ void mergeStates(const std::uint8_t *source,
+ std::uint8_t *destination) const override;
+
+ inline TypedValue finalizeHashTableEntry(const std::uint8_t *byte_ptr) const {
+ return TypedValue(*reinterpret_cast<const std::int64_t *>(byte_ptr));
}
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const override;
+ const std::size_t index,
+ std::vector<std::vector<TypedValue>> *group_by_keys) const override;
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
- * for SUM aggregation.
- */
AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table)
- const override;
+ const AggregationStateHashTableBase &distinctify_hash_table) const override;
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
- * for SUM aggregation.
- */
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const override;
-
- std::size_t getPayloadSize() const override { return sizeof(std::int64_t); }
+ const std::size_t index,
+ AggregationStateHashTableBase *aggregation_hash_table) const override;
private:
friend class AggregateFunctionCount;
/**
- * @brief Constructor.
+ * @brief Initialize handle for count.
+ *
+ * @param argument_type Type of the value to be counted. The parameter should
+ * be nullptr for nullary aggregation (i.e. COUNT(*)).
**/
- AggregationHandleCount() : block_update_(false) {}
+ explicit AggregationHandleCount(const Type *argument_type)
+ : AggregationConcreteHandle(AggregationID::kCount),
+ argument_type_(argument_type) {}
- bool block_update_;
+ const Type *argument_type_;
DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c70485b1/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
deleted file mode 100644
index 0dc8b56..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-#include <utility>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-
-#include "types/TypedValue.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class ColumnVector;
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-AggregationStateHashTableBase* AggregationHandleDistinct::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return createDistinctifyHashTable(
- hash_table_impl,
- group_by_types,
- estimated_num_groups,
- storage_manager);
-}
-
-void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- DCHECK_EQ(argument_ids.size(), 0u);
-
- insertValueAccessorIntoDistinctifyHashTable(
- accessor,
- group_by_key_ids,
- hash_table);
-}
-
-ColumnVector* AggregationHandleDistinct::finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- DCHECK(group_by_keys->empty());
-
- const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
- const bool &dumb_placeholder) -> void {
- group_by_keys->emplace_back(std::move(group_by_key));
- };
- static_cast<const AggregationStateFastHashTable&>(hash_table).forEachCompositeKeyFast(&keys_retriever);
-
- return nullptr;
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c70485b1/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
deleted file mode 100644
index 838bfdd..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
-#include "storage/HashTableBase.hpp"
-#include "types/TypedValue.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class AggregationState;
-class ColumnVector;
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-/** \addtogroup Expressions
- * @{
- */
-
-class AggregationHandleDistinct : public AggregationConcreteHandle {
- public:
- /**
- * @brief Constructor.
- **/
- AggregationHandleDistinct() {}
-
- AggregationState* createInitialState() const override {
- LOG(FATAL)
- << "AggregationHandleDistinct does not support createInitialState().";
- }
-
- AggregationState* accumulateNullary(
- const std::size_t num_tuples) const override {
- LOG(FATAL)
- << "AggregationHandleDistinct does not support accumulateNullary().";
- }
-
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support "
- "accumulateColumnVectors().";
- }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support "
- "accumulateValueAccessor().";
- }
-#endif
-
- void mergeStates(const AggregationState &source,
- AggregationState *destination) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support mergeStates().";
- }
-
- TypedValue finalize(const AggregationState &state) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support finalize().";
- }
-
- AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table)
- const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support "
- << "aggregateOnDistinctifyHashTableForSingle().";
- }
-
- void aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *groupby_hash_table,
- std::size_t index) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support "
- << "aggregateOnDistinctifyHashTableForGroupBy().";
- }
-
- AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const override;
-
- void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const override;
-
- ColumnVector* finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const override;
-
- private:
- DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c70485b1/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index c2d571b..fe1773f 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -19,15 +19,16 @@
#include "expressions/aggregation/AggregationHandleMax.hpp"
+#include <cstddef>
+#include <cstdint>
#include <memory>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
-#include "types/containers/ColumnVector.hpp"
#include "types/operations/comparisons/Comparison.hpp"
#include "types/operations/comparisons/ComparisonFactory.hpp"
#include "types/operations/comparisons/ComparisonID.hpp"
@@ -36,54 +37,32 @@
namespace quickstep {
-class StorageManager;
+class ColumnVector;
AggregationHandleMax::AggregationHandleMax(const Type &type)
- : type_(type), block_update_(false) {
+ : AggregationConcreteHandle(AggregationID::kMax),
+ type_(type) {
fast_comparator_.reset(
ComparisonFactory::GetComparison(ComparisonID::kGreater)
.makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
}
-AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable(
- hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
+AggregationState* AggregationHandleMax::accumulateValueAccessor(
+ const std::vector<MultiSourceAttributeId> &argument_ids,
+ const ValueAccessorMultiplexer &accessor_mux) const {
+ DCHECK_EQ(1u, argument_ids.size())
+ << "Got wrong number of attributes for MAX: " << argument_ids.size();
-AggregationState* AggregationHandleMax::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size();
+ const ValueAccessorSource argument_source = argument_ids.front().source;
+ const attribute_id argument_id = argument_ids.front().attr_id;
- return new AggregationStateMax(fast_comparator_->accumulateColumnVector(
- type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMax::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for MAX: " << accessor_ids.size();
+ DCHECK(argument_source != ValueAccessorSource::kInvalid);
+ DCHECK_NE(argument_id, kInvalidAttributeID);
return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
type_.getNullableVersion().makeNullValue(),
- accessor,
- accessor_ids.front()));
-}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
-void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- DCHECK_EQ(1u, argument_ids.size())
- << "Got wrong number of arguments for MAX: " << argument_ids.size();
+ accessor_mux.getValueAccessorBySource(argument_source),
+ argument_id));
}
void AggregationHandleMax::mergeStates(const AggregationState &source,
@@ -98,40 +77,36 @@ void AggregationHandleMax::mergeStates(const AggregationState &source,
}
}
-void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const {
+void AggregationHandleMax::mergeStates(const std::uint8_t *source,
+ std::uint8_t *destination) const {
const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source);
TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination);
if (!(src_max_ptr->isNull())) {
- compareAndUpdateFast(dst_max_ptr, *src_max_ptr);
+ compareAndUpdate(dst_max_ptr, *src_max_ptr);
}
}
ColumnVector* AggregationHandleMax::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- return finalizeHashTableHelperFast<AggregationHandleMax,
- AggregationStateFastHashTable>(
- type_.getNullableVersion(), hash_table, group_by_keys, index);
+ const std::size_t index,
+ std::vector<std::vector<TypedValue>> *group_by_keys) const {
+ return finalizeHashTableHelper<AggregationHandleMax>(
+ type_, hash_table, index, group_by_keys);
}
-AggregationState*
-AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
+AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
- AggregationHandleMax,
- AggregationStateMax>(distinctify_hash_table);
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ AggregationHandleMax, AggregationStateMax>(
+ distinctify_hash_table);
}
void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
- AggregationHandleMax,
- AggregationStateFastHashTable>(
- distinctify_hash_table, aggregation_hash_table, index);
+ const std::size_t index,
+ AggregationStateHashTableBase *aggregation_hash_table) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleMax>(
+ distinctify_hash_table, index, aggregation_hash_table);
}
} // namespace quickstep