You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/17 21:56:04 UTC
[6/6] incubator-quickstep git commit: Updates
Updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9ccd5a31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9ccd5a31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9ccd5a31
Branch: refs/heads/untyped-agg
Commit: 9ccd5a31189823f0e63b0a34ec02978c8695b25f
Parents: 140069b
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Oct 17 16:55:49 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Mon Oct 17 16:55:49 2016 -0500
----------------------------------------------------------------------
.../aggregation/AggregationConcreteHandle.cpp | 68 -
.../aggregation/AggregationConcreteHandle.hpp | 325 ---
expressions/aggregation/AggregationHandle.hpp | 405 +--
.../aggregation/AggregationHandleAvg.cpp | 321 ++-
.../aggregation/AggregationHandleAvg.hpp | 200 +-
.../aggregation/AggregationHandleCount.cpp | 134 +-
.../aggregation/AggregationHandleCount.hpp | 183 +-
.../aggregation/AggregationHandleDistinct.cpp | 81 -
.../aggregation/AggregationHandleDistinct.hpp | 130 -
.../aggregation/AggregationHandleMax.cpp | 190 +-
.../aggregation/AggregationHandleMax.hpp | 174 +-
.../aggregation/AggregationHandleMin.cpp | 192 +-
.../aggregation/AggregationHandleMin.hpp | 176 +-
.../aggregation/AggregationHandleSum.cpp | 128 +-
.../aggregation/AggregationHandleSum.hpp | 187 +-
expressions/aggregation/CMakeLists.txt | 49 +-
storage/AggregationHashTable.hpp | 330 ---
storage/AggregationOperationState.cpp | 317 +--
storage/AggregationOperationState.hpp | 20 +-
storage/AggregationResultIterator.hpp | 104 +
storage/AggregationStateHashTable.hpp | 338 +++
storage/AggregationStateManager.hpp | 181 ++
storage/CMakeLists.txt | 98 +-
storage/FastHashTable.hpp | 2515 ------------------
storage/FastHashTableFactory.hpp | 257 --
storage/FastSeparateChainingHashTable.hpp | 1734 ------------
storage/HashTableBase.hpp | 43 +-
storage/HashTablePool.hpp | 109 +-
storage/HashTableUntypedKeyManager.hpp | 34 +-
storage/InsertDestination.cpp | 18 +
storage/InsertDestination.hpp | 3 +
storage/PackedRowStoreTupleStorageSubBlock.cpp | 30 +
storage/PackedRowStoreTupleStorageSubBlock.hpp | 3 +
storage/StorageBlock.cpp | 152 +-
storage/StorageBlock.hpp | 55 +-
storage/TupleStorageSubBlock.hpp | 7 +
types/CharType.hpp | 12 +
types/DateType.hpp | 13 +-
types/DatetimeIntervalType.hpp | 12 +-
types/DatetimeLit.hpp | 4 +
types/DatetimeType.hpp | 12 +-
types/NullType.hpp | 4 +
types/NumericSuperType.hpp | 11 +-
types/Type.hpp | 9 +-
types/TypeFunctors.cpp | 54 +-
types/TypeFunctors.hpp | 15 +-
types/VarCharType.hpp | 12 +
types/YearMonthIntervalType.hpp | 12 +-
.../ArithmeticBinaryOperators.hpp | 20 +
.../binary_operations/BinaryOperation.hpp | 7 +
50 files changed, 1592 insertions(+), 7896 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
deleted file mode 100644
index e3fb520..0000000
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
-
-#include <cstddef>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
-
-namespace quickstep {
-
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &key_types,
- const std::size_t estimated_num_distinct_keys,
- StorageManager *storage_manager) const {
- // Create a hash table with key types as key_types and value type as bool.
- return AggregationStateHashTableFactory<bool>::CreateResizable(
- hash_table_impl,
- key_types,
- estimated_num_distinct_keys,
- storage_manager);
-}
-
-void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &key_ids,
- AggregationStateHashTableBase *distinctify_hash_table) const {
- // If the key-value pair is already there, we don't need to update the value,
- // which should always be "true". I.e. the value is just a placeholder.
-
- AggregationStateFastHashTable *hash_table =
- static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
- if (key_ids.size() == 1) {
- hash_table->upsertValueAccessorFast(
- key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
- } else {
- std::vector<attribute_id> empty_args {kInvalidAttributeID};
- hash_table->upsertValueAccessorCompositeKeyFast(
- empty_args, accessor, key_ids, true /* check_for_null_keys */);
- }
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
deleted file mode 100644
index 398a032..0000000
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
-#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
-
-#include <cstddef>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableBase.hpp"
-#include "threading/SpinMutex.hpp"
-#include "types/TypedValue.hpp"
-#include "types/containers/ColumnVector.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-/** \addtogroup Expressions
- * @{
- */
-
-/**
- * @brief An upserter class for modifying the destination hash table while
- * merging two group by hash tables.
- **/
-template <typename HandleT>
-class HashTableStateUpserterFast {
- public:
- /**
- * @brief Constructor.
- *
- * @param handle The aggregation handle being used.
- * @param source_state The aggregation state in the source aggregation hash
- * table. The corresponding state (for the same key) in the destination
- * hash table will be upserted.
- **/
- HashTableStateUpserterFast(const HandleT &handle,
- const std::uint8_t *source_state)
- : handle_(handle), source_state_(source_state) {}
-
- /**
- * @brief The operator for the functor required for the upsert.
- *
- * @param destination_state The aggregation state in the aggregation hash
- * table that is being upserted.
- **/
- void operator()(std::uint8_t *destination_state) {
- handle_.mergeStatesFast(source_state_, destination_state);
- }
-
- private:
- const HandleT &handle_;
- const std::uint8_t *source_state_;
-
- DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast);
-};
-
-/**
- * @brief The helper intermediate subclass of AggregationHandle that provides
- * virtual method implementations as well as helper methods that are
- * shared among all its subclasses.
- *
- * @note The reason that we have this intermediate class instead of putting
- * everything inside AggregationHandle is to avoid cyclic dependency, e.g.
- * when HashTable has to be used.
- **/
-class AggregationConcreteHandle : public AggregationHandle {
- public:
- /**
- * @brief Default implementaion for AggregationHandle::accumulateNullary().
- */
- AggregationState* accumulateNullary(
- const std::size_t num_tuples) const override {
- LOG(FATAL) << "Called accumulateNullary on an AggregationHandle that "
- << "takes at least one argument.";
- }
-
- /**
- * @brief Implementaion for AggregationHandle::createDistinctifyHashTable()
- * that creates a new HashTable for the distinctify step for
- * DISTINCT aggregation.
- */
- AggregationStateHashTableBase* createDistinctifyHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &key_types,
- const std::size_t estimated_num_distinct_keys,
- StorageManager *storage_manager) const override;
-
- /**
- * @brief Implementaion for
- * AggregationHandle::insertValueAccessorIntoDistinctifyHashTable()
- * that inserts the GROUP BY expressions and aggregation arguments together
- * as keys into the distinctify hash table.
- */
- void insertValueAccessorIntoDistinctifyHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &key_ids,
- AggregationStateHashTableBase *distinctify_hash_table) const override;
-
- protected:
- AggregationConcreteHandle() {}
-
- template <typename HandleT, typename StateT>
- StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
- const AggregationStateHashTableBase &distinctify_hash_table) const;
-
- template <typename HandleT, typename HashTableT>
- void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *hash_table,
- std::size_t index) const;
-
- template <typename HandleT, typename HashTableT>
- ColumnVector* finalizeHashTableHelperFast(
- const Type &result_type,
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const;
-
- template <typename HandleT, typename HashTableT>
- inline TypedValue finalizeGroupInHashTableFast(
- const AggregationStateHashTableBase &hash_table,
- const std::vector<TypedValue> &group_key,
- int index) const {
- const std::uint8_t *group_state =
- static_cast<const HashTableT &>(hash_table).getSingleCompositeKey(group_key, index);
- DCHECK(group_state != nullptr)
- << "Could not find entry for specified group_key in HashTable";
- return static_cast<const HandleT *>(this)->finalizeHashTableEntryFast(
- group_state);
- }
-
- template <typename HandleT, typename HashTableT>
- void mergeGroupByHashTablesHelperFast(
- const AggregationStateHashTableBase &source_hash_table,
- AggregationStateHashTableBase *destination_hash_table) const;
-
- private:
- DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle);
-};
-
-/**
- * @brief Templated helper class used to implement
- * AggregationHandle::finalizeHashTable() by visiting each entry (i.e.
- * GROUP) in a HashTable, finalizing the aggregation for the GROUP, and
- * collecting the GROUP BY key values and the final aggregate values in
- * a ColumnVector.
- **/
-template <typename HandleT, typename ColumnVectorT>
-class HashTableAggregateFinalizer {
- public:
- HashTableAggregateFinalizer(
- const HandleT &handle,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- ColumnVectorT *output_column_vector)
- : handle_(handle),
- group_by_keys_(group_by_keys),
- output_column_vector_(output_column_vector) {}
-
- inline void operator()(const std::vector<TypedValue> &group_by_key,
- const AggregationState &group_state) {
- group_by_keys_->emplace_back(group_by_key);
- output_column_vector_->appendTypedValue(
- handle_.finalizeHashTableEntry(group_state));
- }
-
- inline void operator()(const std::vector<TypedValue> &group_by_key,
- const unsigned char *byte_ptr) {
- group_by_keys_->emplace_back(group_by_key);
- output_column_vector_->appendTypedValue(
- handle_.finalizeHashTableEntryFast(byte_ptr));
- }
-
- private:
- const HandleT &handle_;
- std::vector<std::vector<TypedValue>> *group_by_keys_;
- ColumnVectorT *output_column_vector_;
-};
-
-/** @} */
-
-// ----------------------------------------------------------------------------
-// Implementations of templated methods follow:
-
-template <typename HandleT, typename StateT>
-StateT* AggregationConcreteHandle::
- aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
- const AggregationStateHashTableBase &distinctify_hash_table) const {
- const HandleT &handle = static_cast<const HandleT &>(*this);
- StateT *state = static_cast<StateT *>(createInitialState());
-
- // A lambda function which will be called on each key from the distinctify
- // hash table.
- const auto aggregate_functor = [&handle, &state](
- const TypedValue &key, const std::uint8_t &dumb_placeholder) {
- // For each (unary) key in the distinctify hash table, aggregate the key
- // into "state".
- handle.iterateUnaryInl(state, key);
- };
-
- const AggregationStateFastHashTable &hash_table =
- static_cast<const AggregationStateFastHashTable &>(
- distinctify_hash_table);
- // Invoke the lambda function "aggregate_functor" on each key from the
- // distinctify hash table.
- hash_table.forEach(&aggregate_functor);
-
- return state;
-}
-
-template <typename HandleT, typename HashTableT>
-void AggregationConcreteHandle::
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
- const HandleT &handle = static_cast<const HandleT &>(*this);
- HashTableT *target_hash_table =
- static_cast<HashTableT *>(aggregation_hash_table);
-
- // A lambda function which will be called on each key-value pair from the
- // distinctify hash table.
- const auto aggregate_functor = [&handle, &target_hash_table, &index](
- std::vector<TypedValue> &key, const bool &dumb_placeholder) {
- // For each (composite) key vector in the distinctify hash table with size N.
- // The first N-1 entries are GROUP BY columns and the last entry is the
- // argument to be aggregated on.
- const TypedValue argument(std::move(key.back()));
- key.pop_back();
-
- // An upserter as lambda function for aggregating the argument into its
- // GROUP BY group's entry inside aggregation_hash_table.
- const auto upserter = [&handle, &argument](std::uint8_t *state) {
- handle.iterateUnaryInlFast(argument, state);
- };
-
- target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index);
- };
-
- const HashTableT &source_hash_table =
- static_cast<const HashTableT &>(distinctify_hash_table);
- // Invoke the lambda function "aggregate_functor" on each composite key vector
- // from the distinctify hash table.
- source_hash_table.forEachCompositeKeyFast(&aggregate_functor);
-}
-
-template <typename HandleT, typename HashTableT>
-ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
- const Type &result_type,
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- const HandleT &handle = static_cast<const HandleT &>(*this);
- const HashTableT &hash_table_concrete =
- static_cast<const HashTableT &>(hash_table);
-
- if (group_by_keys->empty()) {
- if (NativeColumnVector::UsableForType(result_type)) {
- NativeColumnVector *result =
- new NativeColumnVector(result_type, hash_table_concrete.numEntries());
- HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer(
- handle, group_by_keys, result);
- hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
- return result;
- } else {
- IndirectColumnVector *result = new IndirectColumnVector(
- result_type, hash_table_concrete.numEntries());
- HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer(
- handle, group_by_keys, result);
- hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
- return result;
- }
- } else {
- if (NativeColumnVector::UsableForType(result_type)) {
- NativeColumnVector *result =
- new NativeColumnVector(result_type, group_by_keys->size());
- for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
- result->appendTypedValue(
- finalizeGroupInHashTableFast<HandleT, HashTableT>(
- hash_table, group_by_key, index));
- }
- return result;
- } else {
- IndirectColumnVector *result = new IndirectColumnVector(
- result_type, hash_table_concrete.numEntries());
- for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
- result->appendTypedValue(
- finalizeGroupInHashTableFast<HandleT, HashTableT>(
- hash_table, group_by_key, index));
- }
- return result;
- }
- }
-}
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 19f28ff..40d5e26 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -21,14 +21,19 @@
#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_HPP_
#include <cstddef>
+#include <cstring>
+#include <functional>
#include <memory>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTableBase.hpp"
+#include "types/Type.hpp"
#include "types/TypedValue.hpp"
+#include "utility/ScopedBuffer.hpp"
#include "utility/Macros.hpp"
+#include "glog/logging.h"
+
namespace quickstep {
class ColumnVector;
@@ -40,66 +45,10 @@ class ValueAccessor;
* @{
*/
-/**
- * @brief Abstract base class for aggregation state.
- **/
-class AggregationState {
- public:
- /**
- * @brief Default constructor.
- **/
- AggregationState() {}
+typedef std::function<void (void *, const void *)> AggregationStateAccumulateFunctor;
+typedef std::function<void (void *, const void *)> AggregationStateMergeFunctor;
+typedef std::function<void (void *, const void *)> AggregationStateFinalizeFunctor;
- /**
- * @brief Pure virtual destructor.
- **/
- virtual ~AggregationState() = 0;
-};
-
-// Destructor should be defined. This will be called when derived class
-// destructor is called.
-inline AggregationState::~AggregationState() {}
-
-/**
- * @brief AggregationHandle encapsulates logic for actually computing
- * aggregates with particular argument(s).
- * @note See also AggregateFunction, which represents a SQL aggregate function
- * in the abstract sense.
- *
- * An AggregationHandle is created by calling
- * AggregateFunction::createHandle(). The AggregationHandle object provides
- * methods that are used to actually compute the aggregate, storing
- * intermediate results in AggregationState objects.
- *
- * I. The work-flow for computing an aggregate without GROUP BY is as follows:
- * 1. Create a global state for the aggregate with createInitialState().
- * 2. For each block in a relation (parallelizable):
- * a. Call StorageBlock::aggregate() to accumulate results from the
- * block (under the covers, this calls either
- * accumulateColumnVectors() or accumulateValueAccessor() to do the
- * actual per-block aggregation in a vectorized fashion).
- * b. Merge the per-block results back with the global state by calling
- * mergeStates() (this is threadsafe).
- * 3. Generate the final result by calling finalize() on the global state.
- *
- * II. The work-flow for computing an aggregate with GROUP BY is as follows:
- * 1. Create a HashTable to hold per-group states by calling
- * createGroupByHashTable().
- * 2. For each block in a relation (parallelizable):
- * a. Call StorageBlock::aggregateGroupBy() to update the states in the
- * HashTable according to the values in the block (under the covers,
- * this calls aggregateValueAccessorIntoHashTable() to aggregate over
- * all the values/groups in a block in one shot; this is threadsafe).
- * 3. Generate the final set of groups and their corresponding results by
- * calling finalizeHashTable().
- *
- * See also AggregationOperationState, which holds 1 or more global states or
- * HashTables for an aggregate query, and has some logic to re-use common
- * information across multiple aggregates being computed on the same block
- * (e.g. the set of matches for a predicate, or the values of computed GROUP BY
- * expressions). AggregationOperationState also has a method to write out
- * finalized aggregate values to an InsertDestination.
- **/
class AggregationHandle {
public:
/**
@@ -109,67 +58,25 @@ class AggregationHandle {
virtual ~AggregationHandle() {}
/**
- * @brief Create an initial "blank" state for this aggregation.
- *
- * @return An initial "blank" state for this particular aggregation.
- **/
- virtual AggregationState* createInitialState() const = 0;
-
- virtual std::size_t getStateSize() const {
- return 0;
- }
-
- /**
- * @brief Create a new HashTable for aggregation with GROUP BY.
- *
- * @param hash_table_impl The choice of which concrete HashTable
- * implementation to use.
- * @param group_by_types The types of the GROUP BY columns/expressions. These
- * correspond to the (composite) key type for the HashTable.
- * @param estimated_num_groups The estimated number of distinct groups for
- * the GROUP BY aggregation. This is used to size the initial
- * HashTable. This is an estimate only, and the HashTable will be
- * resized if it becomes over-full.
- * @param storage_manager The StorageManager to use to create the HashTable.
- * A StorageBlob will be allocated to serve as the HashTable's
- * in-memory storage.
- * @return A new HashTable instance with the appropriate state type for this
- * aggregate.
- **/
- virtual AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const = 0;
-
- /**
* @brief Accumulate over tuples for a nullary aggregate function (one that
* has zero arguments, i.e. COUNT(*)).
- *
- * @param num_tuples The number of tuples to "accumulate". No actual tuple
- * data is accessed, the only thing that a nullary aggeregate can know
- * about input is its cardinality.
- * @return A new AggregationState which contains the accumulated results from
- * applying the (nullary) aggregate to the specified number of
- * tuples.
**/
- virtual AggregationState* accumulateNullary(
- const std::size_t num_tuples) const = 0;
+ virtual void accumulateNullary(void *state,
+ const std::size_t num_tuples) const {
+ LOG(FATAL) << "Called accumulateNullary on an AggregationHandle that "
+ << "takes at least one argument.";
+ }
/**
* @brief Accumulate (iterate over) all values in one or more ColumnVectors
* and return a new AggregationState which can be merged with other
* states or finalized.
- *
- * @param column_vectors One or more ColumnVectors that the aggregate will be
- * applied to. These correspond to the aggregate function's arguments,
- * in order.
- * @return A new AggregationState which contains the accumulated results from
- * applying the aggregate to column_vectors. Caller is responsible
- * for deleting the returned AggregationState.
**/
- virtual AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const = 0;
+ virtual void accumulateColumnVectors(
+ void *state,
+ const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+ LOG(FATAL) << "Not implemented\n";
+ }
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
/**
@@ -186,243 +93,79 @@ class AggregationHandle {
* applying the aggregate to the specified columns in accessor.
* Caller is responsible for deleting the returned AggregationState.
**/
- virtual AggregationState* accumulateValueAccessor(
+ virtual void accumulateValueAccessor(
+ void *state,
ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const = 0;
+ const std::vector<attribute_id> &accessor_ids) const {
+ LOG(FATAL) << "Not implemented\n";
+ }
#endif
/**
- * @brief Perform an aggregation with GROUP BY over all the tuples accessible
- * through a ValueAccessor, upserting states in a HashTable.
- *
- * @note Implementations of this method are threadsafe with respect to
- * hash_table, and can be called concurrently from multiple threads
- * with the same HashTable object.
- *
- * @param accessor The ValueAccessor that will be iterated over to read
- * tuples.
- * @param argument_ids The attribute_ids of the arguments to this aggregate
- * in accessor, in order.
- * @param group_by_key_ids The attribute_ids of the group-by
- * columns/expressions in accessor.
- * @param hash_table The HashTable to upsert AggregationStates in. This
- * should have been created by calling createGroupByHashTable() on
- * this same AggregationHandle.
- **/
- virtual void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const = 0;
-
- /**
- * @brief Merge two AggregationStates, updating one in-place. This computes a
- * new intermediate result and stores it in the destination
- * AggregationState.
- *
- * @note The order of arguments (i.e. which AggregationState is source and
- * which is destination) is not important for the result of the merging
- * process, but it does determine which of the two states is
- * overwritten (destination) and which remains unchanged (source).
- * @note Implementations of this method are threadsafe with respect to the
- * destination state, and can be called concurrently from multiple
- * threads with the same state object.
- *
- * @param source The AggregationState to merge "from".
- * @param destination The AggregationState to merge "to". The internal state
- * will be overwritten with the merged result.
- **/
- virtual void mergeStates(const AggregationState &source,
- AggregationState *destination) const = 0;
-
- /**
- * @brief Computes and returns the resulting aggregate by using intermediate
- * result saved in this handle.
- *
- * @note Except for count, SQL89 aggeregates return NULL when no rows are
- * selected.
- * @warning It is dangerous to assume that calling mergeStates() or some
- * iterate method after previous finalize call will work correctly
- * when the aggregate function is not one of SQL89 aggregates (SUM,
- * COUNT, MIN, MAX, AVG).
- *
- * @return The result of this aggregation.
+ * @brief Get the number of bytes needed to store the aggregation handle's
+ * state.
**/
- virtual TypedValue finalize(const AggregationState &state) const = 0;
+ inline std::size_t getStateSize() const {
+ return state_size_;
+ }
- /**
- * @brief Compute and return finalized aggregates for all groups in a
- * HashTable.
- *
- * @param hash_table The HashTable to finalize states from. This should have
- * have been created by calling createGroupByHashTable() on this same
- * AggregationHandle.
- * @param group_by_keys A pointer to a vector of vectors of GROUP BY keys. If
- * this is initially empty, it will be filled in with the GROUP BY
- * keys visited by this method in the same order as the finalized
- * values returned in the ColumnVector. If this is already filled in,
- * then this method will visit the GROUP BY keys in the exact order
- * specified.
- * @param index The index of the AggregationHandle to be finalized.
- *
- * @return A ColumnVector containing each group's finalized aggregate value.
- **/
- virtual ColumnVector* finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const = 0;
+ inline void initializeState(void *state) const {
+ std::memcpy(state, blank_state_.get(), state_size_);
+ }
- /**
- * @brief Create a new HashTable for the distinctify step for DISTINCT
- * aggregation.
- *
- * Distinctify is the first step for DISTINCT aggregation. This step inserts
- * the GROUP BY expression values and aggregation arguments together as keys
- * into the distinctify hash table, so that arguments are distinctified within
- * each GROUP BY group. Later, a second-round aggregation on the distinctify
- * hash table will be performed to actually compute the aggregated result for
- * each GROUP BY group.
- *
- * In the case of single aggregation where there is no GROUP BY expressions,
- * we simply treat it as a special GROUP BY case that the GROUP BY expression
- * vector is empty.
- *
- * @param hash_table_impl The choice of which concrete HashTable
- * implementation to use.
- * @param key_types The types of the GROUP BY expressions together with the
- * types of the aggregation arguments.
- * @param estimated_num_distinct_keys The estimated number of distinct keys
- * (i.e. GROUP BY expressions together with aggregation arguments) for
- * the distinctify step. This is used to size the initial HashTable.
- * This is an estimate only, and the HashTable will be resized if it
- * becomes over-full.
- * @param storage_manager The StorageManager to use to create the HashTable.
- * A StorageBlob will be allocated to serve as the HashTable's
- * in-memory storage.
- *
- * @return A new HashTable instance with the appropriate state type for this
- * aggregate.
- */
- virtual AggregationStateHashTableBase* createDistinctifyHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &key_types,
- const std::size_t estimated_num_distinct_keys,
- StorageManager *storage_manager) const = 0;
+ inline ScopedBuffer createInitialState() const {
+ ScopedBuffer state(state_size_, false);
+ initializeState(state.get());
+ return state;
+ }
- /**
- * @brief Inserts the GROUP BY expressions and aggregation arguments together
- * as keys into the distinctify hash table.
- *
- * @param accessor The ValueAccessor that will be iterated over to read
- * tuples.
- * @param key_ids The attribute_ids of the GROUP BY expressions in accessor
- * together with the attribute_ids of the arguments to this aggregate
- * in accessor, in order.
- * @param distinctify_hash_table The HashTable to store the GROUP BY
- * expressions and the aggregation arguments together as hash table
- * keys and a bool constant \c true as hash table value (So the hash
- * table actually serves as a hash set). This should have been created
- * by calling createDistinctifyHashTable();
- */
- virtual void insertValueAccessorIntoDistinctifyHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &key_ids,
- AggregationStateHashTableBase *distinctify_hash_table) const = 0;
+ inline void mergeStates(void *destination_state,
+ const void *source_state) const {
+ merge_functor_(destination_state, source_state);
+ }
- /**
- * @brief Perform single (i.e. without GROUP BY) aggregation on the keys from
- * the distinctify hash table to actually compute the aggregated results.
- *
- * @param distinctify_hash_table Hash table which stores the distinctified
- * aggregation arguments as hash table keys. This should have been
- * created by calling createDistinctifyHashTable();
- * @return A new AggregationState which contains the aggregated results from
- * applying the aggregate to the distinctify hash table.
- * Caller is responsible for deleting the returned AggregationState.
- */
- virtual AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const = 0;
+ inline std::size_t getResultSize() const {
+ return result_type_->maximumByteLength();
+ }
- /**
- * @brief Perform GROUP BY aggregation on the keys from the distinctify hash
- * table and upserts states into the aggregation hash table.
- *
- * @param distinctify_hash_table Hash table which stores the GROUP BY
- * expression values and aggregation arguments together as hash table
- * keys.
- * @param aggregation_hash_table The HashTable to upsert AggregationStates in.
- * This should have been created by calling createGroupByHashTable() on
- * this same AggregationHandle.
- * @param index The index of the distinctify hash table for which we perform
- * the DISTINCT aggregation.
- */
- virtual void aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const = 0;
+ inline void finalize(void *value, const void *state) const {
+ finalize_functor_(value, state);
+ }
- /**
- * @brief Get the number of bytes needed to store the aggregation handle's
- * state.
- **/
- virtual std::size_t getPayloadSize() const { return 1; }
+ inline TypedValue finalize(const void *state) const {
+ ScopedBuffer value(state_size_, false);
+ finalize(value.get(), state);
+ TypedValue result = result_type_->makeValue(value.get());
+ result.ensureNotReference();
+ return result;
+ }
- /**
- * @brief Update the aggregation state for nullary aggregation function e.g.
- * COUNT(*).
- *
- * @note This function should be overloaded by those aggregation function
- * which can perform nullary operations, e.g. COUNT.
- *
- * @param byte_ptr The pointer where the aggregation state is stored.
- **/
- virtual void updateStateNullary(std::uint8_t *byte_ptr) const {}
+ inline const AggregationStateMergeFunctor& getStateAccumulateFunctor() const {
+ DCHECK(accumulate_functor_);
+ return accumulate_functor_;
+ }
- /**
- * @brief Update the aggregation state for unary aggregation function e.g.
- * SUM(a).
- *
- * @param argument The argument which will be used to update the state of the
- * aggregation function.
- * @param byte_ptr The pointer where the aggregation state is stored.
- **/
- virtual void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const {}
+ inline const AggregationStateMergeFunctor& getStateMergeFunctor() const {
+ DCHECK(merge_functor_);
+ return merge_functor_;
+ }
- /**
- * @brief Merge two aggregation states for this aggregation handle.
- *
- * @note This function should be used with the hash table specifically meant
- * for aggregations only.
- *
- * @param src A pointer to the source aggregation state.
- * @param dst A pointer to the destination aggregation state.
- **/
- virtual void mergeStatesFast(const std::uint8_t *src,
- std::uint8_t *dst) const {}
+ inline const AggregationStateFinalizeFunctor& getStateFinalizeFunctor() const {
+ DCHECK(finalize_functor_);
+ return finalize_functor_;
+ }
- /**
- * @brief Initialize the payload (in the aggregation hash table) for the given
- * aggregation handle.
- *
- * @param byte_ptr The pointer to the aggregation state in the hash table.
- **/
- virtual void initPayload(std::uint8_t *byte_ptr) const {}
+ protected:
+ AggregationHandle() {}
- /**
- * @brief Inform the aggregation handle to block (prohibit) updates on the
- * aggregation state.
- **/
- virtual void blockUpdate() {}
+ std::size_t state_size_;
+ ScopedBuffer blank_state_;
- /**
- * @brief Inform the aggregation handle to allow updates on the
- * aggregation state.
- **/
- virtual void allowUpdate() {}
+ AggregationStateAccumulateFunctor accumulate_functor_;
+ AggregationStateMergeFunctor merge_functor_;
- protected:
- AggregationHandle() {}
+ const Type *result_type_;
+ AggregationStateFinalizeFunctor finalize_functor_;
private:
DISALLOW_COPY_AND_ASSIGN(AggregationHandle);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 2481092..47f3f41 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -26,7 +26,6 @@
#include "catalog/CatalogTypedefs.hpp"
#include "storage/HashTable.hpp"
#include "storage/HashTableFactory.hpp"
-#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypeFactory.hpp"
#include "types/TypeID.hpp"
@@ -41,165 +40,165 @@ namespace quickstep {
class StorageManager;
-AggregationHandleAvg::AggregationHandleAvg(const Type &type)
- : argument_type_(type), block_update_(false) {
- // We sum Int as Long and Float as Double so that we have more headroom when
- // adding many values.
- TypeID type_precision_id;
- switch (type.getTypeID()) {
- case kInt:
- case kLong:
- type_precision_id = kLong;
- break;
- case kFloat:
- case kDouble:
- type_precision_id = kDouble;
- break;
- default:
- type_precision_id = type.getTypeID();
- break;
- }
-
- const Type &sum_type = TypeFactory::GetType(type_precision_id);
- blank_state_.sum_ = sum_type.makeZeroValue();
- blank_state_.count_ = 0;
-
- // Make operators to do arithmetic:
- // Add operator for summing argument values.
- fast_add_operator_.reset(
- BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
- .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_));
- // Add operator for merging states.
- merge_add_operator_.reset(
- BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
- .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
- // Divide operator for dividing sum by count to get final average.
- divide_operator_.reset(
- BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
- .makeUncheckedBinaryOperatorForTypes(sum_type,
- TypeFactory::GetType(kDouble)));
-
- // Result is nullable, because AVG() over 0 values (or all NULL values) is
- // NULL.
- result_type_ =
- &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
- .resultTypeForArgumentTypes(sum_type, TypeFactory::GetType(kDouble))
- ->getNullableVersion());
-}
-
-AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable(
- hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
-
-AggregationState* AggregationHandleAvg::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size();
-
- AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
- std::size_t count = 0;
- state->sum_ = fast_add_operator_->accumulateColumnVector(
- state->sum_, *column_vectors.front(), &count);
- state->count_ = count;
- return state;
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleAvg::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for AVG: " << accessor_ids.size();
-
- AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
- std::size_t count = 0;
- state->sum_ = fast_add_operator_->accumulateValueAccessor(
- state->sum_, accessor, accessor_ids.front(), &count);
- state->count_ = count;
- return state;
-}
-#endif
-
-void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- DCHECK_EQ(1u, argument_ids.size())
- << "Got wrong number of arguments for AVG: " << argument_ids.size();
-}
-
-void AggregationHandleAvg::mergeStates(const AggregationState &source,
- AggregationState *destination) const {
- const AggregationStateAvg &avg_source =
- static_cast<const AggregationStateAvg &>(source);
- AggregationStateAvg *avg_destination =
- static_cast<AggregationStateAvg *>(destination);
-
- SpinMutexLock lock(avg_destination->mutex_);
- avg_destination->count_ += avg_source.count_;
- avg_destination->sum_ = merge_add_operator_->applyToTypedValues(
- avg_destination->sum_, avg_source.sum_);
-}
-
-void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const {
- const TypedValue *src_sum_ptr =
- reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
- const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(
- source + blank_state_.count_offset_);
- TypedValue *dst_sum_ptr =
- reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_);
- std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(
- destination + blank_state_.count_offset_);
- (*dst_count_ptr) += (*src_count_ptr);
- *dst_sum_ptr =
- merge_add_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
-}
-
-TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
- const AggregationStateAvg &agg_state =
- static_cast<const AggregationStateAvg &>(state);
- if (agg_state.count_ == 0) {
- // AVG() over no values is NULL.
- return result_type_->makeNullValue();
- } else {
- // Divide sum by count to get final average.
- return divide_operator_->applyToTypedValues(
- agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
- }
-}
-
-ColumnVector* AggregationHandleAvg::finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- return finalizeHashTableHelperFast<AggregationHandleAvg,
- AggregationStateFastHashTable>(
- *result_type_, hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
- AggregationHandleAvg,
- AggregationStateAvg>(distinctify_hash_table);
-}
-
-void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
- AggregationHandleAvg,
- AggregationStateFastHashTable>(
- distinctify_hash_table, aggregation_hash_table, index);
-}
+AggregationHandleAvg::AggregationHandleAvg(const Type &type) {}
+// : argument_type_(type), block_update_(false) {
+// // We sum Int as Long and Float as Double so that we have more headroom when
+// // adding many values.
+// TypeID type_precision_id;
+// switch (type.getTypeID()) {
+// case kInt:
+// case kLong:
+// type_precision_id = kLong;
+// break;
+// case kFloat:
+// case kDouble:
+// type_precision_id = kDouble;
+// break;
+// default:
+// type_precision_id = type.getTypeID();
+// break;
+// }
+//
+// const Type &sum_type = TypeFactory::GetType(type_precision_id);
+// blank_state_.sum_ = sum_type.makeZeroValue();
+// blank_state_.count_ = 0;
+//
+// // Make operators to do arithmetic:
+// // Add operator for summing argument values.
+// fast_add_operator_.reset(
+// BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+// .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_));
+// // Add operator for merging states.
+// merge_add_operator_.reset(
+// BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+// .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
+// // Divide operator for dividing sum by count to get final average.
+// divide_operator_.reset(
+// BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+// .makeUncheckedBinaryOperatorForTypes(sum_type,
+// TypeFactory::GetType(kDouble)));
+//
+// // Result is nullable, because AVG() over 0 values (or all NULL values) is
+// // NULL.
+// result_type_ =
+// &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+// .resultTypeForArgumentTypes(sum_type, TypeFactory::GetType(kDouble))
+// ->getNullableVersion());
+//}
+//
+//AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
+// const HashTableImplType hash_table_impl,
+// const std::vector<const Type *> &group_by_types,
+// const std::size_t estimated_num_groups,
+// StorageManager *storage_manager) const {
+// return AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable(
+// hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
+//}
+//
+//AggregationState* AggregationHandleAvg::accumulateColumnVectors(
+// const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+// DCHECK_EQ(1u, column_vectors.size())
+// << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size();
+//
+// AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
+// std::size_t count = 0;
+// state->sum_ = fast_add_operator_->accumulateColumnVector(
+// state->sum_, *column_vectors.front(), &count);
+// state->count_ = count;
+// return state;
+//}
+//
+//#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+//AggregationState* AggregationHandleAvg::accumulateValueAccessor(
+// ValueAccessor *accessor,
+// const std::vector<attribute_id> &accessor_ids) const {
+// DCHECK_EQ(1u, accessor_ids.size())
+// << "Got wrong number of attributes for AVG: " << accessor_ids.size();
+//
+// AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
+// std::size_t count = 0;
+// state->sum_ = fast_add_operator_->accumulateValueAccessor(
+// state->sum_, accessor, accessor_ids.front(), &count);
+// state->count_ = count;
+// return state;
+//}
+//#endif
+//
+//void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
+// ValueAccessor *accessor,
+// const std::vector<attribute_id> &argument_ids,
+// const std::vector<attribute_id> &group_by_key_ids,
+// AggregationStateHashTableBase *hash_table) const {
+// DCHECK_EQ(1u, argument_ids.size())
+// << "Got wrong number of arguments for AVG: " << argument_ids.size();
+//}
+//
+//void AggregationHandleAvg::mergeStates(const AggregationState &source,
+// AggregationState *destination) const {
+// const AggregationStateAvg &avg_source =
+// static_cast<const AggregationStateAvg &>(source);
+// AggregationStateAvg *avg_destination =
+// static_cast<AggregationStateAvg *>(destination);
+//
+// SpinMutexLock lock(avg_destination->mutex_);
+// avg_destination->count_ += avg_source.count_;
+// avg_destination->sum_ = merge_add_operator_->applyToTypedValues(
+// avg_destination->sum_, avg_source.sum_);
+//}
+//
+//void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source,
+// std::uint8_t *destination) const {
+// const TypedValue *src_sum_ptr =
+// reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
+// const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(
+// source + blank_state_.count_offset_);
+// TypedValue *dst_sum_ptr =
+// reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_);
+// std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(
+// destination + blank_state_.count_offset_);
+// (*dst_count_ptr) += (*src_count_ptr);
+// *dst_sum_ptr =
+// merge_add_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
+//}
+//
+//TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
+// const AggregationStateAvg &agg_state =
+// static_cast<const AggregationStateAvg &>(state);
+// if (agg_state.count_ == 0) {
+// // AVG() over no values is NULL.
+// return result_type_->makeNullValue();
+// } else {
+// // Divide sum by count to get final average.
+// return divide_operator_->applyToTypedValues(
+// agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
+// }
+//}
+//
+//ColumnVector* AggregationHandleAvg::finalizeHashTable(
+// const AggregationStateHashTableBase &hash_table,
+// std::vector<std::vector<TypedValue>> *group_by_keys,
+// int index) const {
+// return finalizeHashTableHelperFast<AggregationHandleAvg,
+// AggregationStateFastHashTable>(
+// *result_type_, hash_table, group_by_keys, index);
+//}
+//
+//AggregationState*
+//AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
+// const AggregationStateHashTableBase &distinctify_hash_table) const {
+// return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
+// AggregationHandleAvg,
+// AggregationStateAvg>(distinctify_hash_table);
+//}
+//
+//void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
+// const AggregationStateHashTableBase &distinctify_hash_table,
+// AggregationStateHashTableBase *aggregation_hash_table,
+// std::size_t index) const {
+// aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+// AggregationHandleAvg,
+// AggregationStateFastHashTable>(
+// distinctify_hash_table, aggregation_hash_table, index);
+//}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 3c6e0c2..cc5adc8 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -26,11 +26,8 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
#include "storage/HashTableBase.hpp"
-#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "types/operations/binary_operations/BinaryOperation.hpp"
@@ -49,193 +46,12 @@ class ValueAccessor;
*/
/**
- * @brief Aggregation state for average.
- */
-class AggregationStateAvg : public AggregationState {
- public:
- /**
- * @brief Copy constructor (ignores mutex).
- */
- AggregationStateAvg(const AggregationStateAvg &orig)
- : sum_(orig.sum_),
- count_(orig.count_),
- sum_offset_(orig.sum_offset_),
- count_offset_(orig.count_offset_),
- mutex_offset_(orig.mutex_offset_) {}
-
- /**
- * @brief Destructor.
- */
- ~AggregationStateAvg() override {}
-
- std::size_t getPayloadSize() const {
- std::size_t p1 = reinterpret_cast<std::size_t>(&sum_);
- std::size_t p2 = reinterpret_cast<std::size_t>(&mutex_);
- return (p2 - p1);
- }
-
- const std::uint8_t *getPayloadAddress() const {
- return reinterpret_cast<const uint8_t *>(&sum_);
- }
-
- private:
- friend class AggregationHandleAvg;
-
- AggregationStateAvg()
- : sum_(0),
- count_(0),
- sum_offset_(0),
- count_offset_(reinterpret_cast<std::uint8_t *>(&count_) -
- reinterpret_cast<std::uint8_t *>(&sum_)),
- mutex_offset_(reinterpret_cast<std::uint8_t *>(&mutex_) -
- reinterpret_cast<std::uint8_t *>(&sum_)) {}
-
- // TODO(shoban): We might want to specialize sum_ and count_ to use atomics
- // for int types similar to in AggregationStateCount.
- TypedValue sum_;
- std::int64_t count_;
- SpinMutex mutex_;
-
- int sum_offset_, count_offset_, mutex_offset_;
-};
-
-/**
* @brief An aggregationhandle for avg.
**/
-class AggregationHandleAvg : public AggregationConcreteHandle {
+class AggregationHandleAvg : public AggregationHandle {
public:
~AggregationHandleAvg() override {}
- AggregationState* createInitialState() const override {
- return new AggregationStateAvg(blank_state_);
- }
-
- AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const override;
-
- /**
- * @brief Iterate method with average aggregation state.
- **/
- inline void iterateUnaryInl(AggregationStateAvg *state,
- const TypedValue &value) const {
- DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
- if (value.isNull()) return;
-
- SpinMutexLock lock(state->mutex_);
- state->sum_ = fast_add_operator_->applyToTypedValues(state->sum_, value);
- ++state->count_;
- }
-
- inline void iterateUnaryInlFast(const TypedValue &value,
- std::uint8_t *byte_ptr) const {
- DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
- if (value.isNull()) return;
- TypedValue *sum_ptr =
- reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
- std::int64_t *count_ptr =
- reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset_);
- *sum_ptr = fast_add_operator_->applyToTypedValues(*sum_ptr, value);
- ++(*count_ptr);
- }
-
- inline void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const override {
- if (!block_update_) {
- iterateUnaryInlFast(argument, byte_ptr);
- }
- }
-
- void blockUpdate() override { block_update_ = true; }
-
- void allowUpdate() override { block_update_ = false; }
-
- void initPayload(std::uint8_t *byte_ptr) const override {
- TypedValue *sum_ptr =
- reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
- std::int64_t *count_ptr =
- reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset_);
- *sum_ptr = blank_state_.sum_;
- *count_ptr = blank_state_.count_;
- }
-
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_id) const override;
-#endif
-
- void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const override;
-
- void mergeStates(const AggregationState &source,
- AggregationState *destination) const override;
-
- void mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const override;
-
- TypedValue finalize(const AggregationState &state) const override;
-
- inline TypedValue finalizeHashTableEntry(
- const AggregationState &state) const {
- const AggregationStateAvg &agg_state =
- static_cast<const AggregationStateAvg &>(state);
- // TODO(chasseur): Could improve performance further if we made a special
- // version of finalizeHashTable() that collects all the sums into one
- // ColumnVector and all the counts into another and then applies
- // '*divide_operator_' to them in bulk.
- return divide_operator_->applyToTypedValues(
- agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
- }
-
- inline TypedValue finalizeHashTableEntryFast(
- const std::uint8_t *byte_ptr) const {
- std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
- TypedValue *sum_ptr =
- reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_);
- std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(
- value_ptr + blank_state_.count_offset_);
- return divide_operator_->applyToTypedValues(
- *sum_ptr, TypedValue(static_cast<double>(*count_ptr)));
- }
-
- ColumnVector* finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
- * for AVG aggregation.
- */
- AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table)
- const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
- * for AVG aggregation.
- */
- void aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const override;
-
- std::size_t getPayloadSize() const override {
- return blank_state_.getPayloadSize();
- }
-
private:
friend class AggregateFunctionAvg;
@@ -246,14 +62,12 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
**/
explicit AggregationHandleAvg(const Type &type);
- const Type &argument_type_;
- const Type *result_type_;
- AggregationStateAvg blank_state_;
- std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
- std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
- std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
-
- bool block_update_;
+// const Type &argument_type_;
+// const Type *result_type_;
+// AggregationStateAvg blank_state_;
+// std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
+// std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
+// std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 034c942..c095a82 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -21,12 +21,11 @@
#include <atomic>
#include <cstddef>
+#include <cstdint>
#include <memory>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
#include "storage/ValueAccessor.hpp"
@@ -48,22 +47,37 @@ class Type;
class ValueAccessor;
template <bool count_star, bool nullable_type>
-AggregationStateHashTableBase*
-AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return AggregationStateHashTableFactory<
- AggregationStateCount>::CreateResizable(hash_table_impl,
- group_by_types,
- estimated_num_groups,
- storage_manager);
+AggregationHandleCount<count_star, nullable_type>::AggregationHandleCount() {
+ state_size_ = sizeof(ResultCppType);
+ blank_state_.reset(state_size_, true);
+
+ accumulate_functor_ = [](void *state, const void *value) {
+ *static_cast<ResultCppType *>(state) += 1;
+ };
+
+ merge_functor_ = [](void *state, const void *value) {
+ *static_cast<ResultCppType *>(state) +=
+ *static_cast<const ResultCppType *>(value);
+ };
+
+ finalize_functor_ = [](void *result, const void *state) {
+ *static_cast<ResultCppType *>(result) =
+ *static_cast<const ResultCppType *>(state);
+ };
+
+ result_type_ = &TypeFactory::GetType(ResultType::kStaticTypeID);
+}
+
+template <bool count_star, bool nullable_type>
+void AggregationHandleCount<count_star, nullable_type>::accumulateNullary(
+ void *state,
+ const std::size_t num_tuples) const {
+ *static_cast<ResultCppType *>(state) = num_tuples;
}
template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
+void AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
+ void *state,
const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
DCHECK(!count_star)
<< "Called non-nullary accumulation method on an AggregationHandleCount "
@@ -84,20 +98,22 @@ AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
// the population count of the null bitmap). We should do something
// similar for ValueAccessor too.
for (std::size_t pos = 0; pos < column_vector.size(); ++pos) {
- count += !column_vector.getTypedValue(pos).isNull();
+ if (column_vector.getUntypedValue(pos) != nullptr) {
+ ++count;
+ }
}
} else {
count = column_vector.size();
}
});
- return new AggregationStateCount(count);
+ *static_cast<ResultCppType *>(state) = count;
}
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
+void AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
+ void *state,
ValueAccessor *accessor,
const std::vector<attribute_id> &accessor_ids) const {
DCHECK(!count_star)
@@ -114,91 +130,19 @@ AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
[&accessor_id, &count](auto *accessor) -> void { // NOLINT(build/c++11)
if (nullable_type) {
while (accessor->next()) {
- count += !accessor->getTypedValue(accessor_id).isNull();
+ if (accessor->getUntypedValue(accessor_id) != nullptr) {
+ ++count;
+ }
}
} else {
count = accessor->getNumTuples();
}
});
- return new AggregationStateCount(count);
+ *static_cast<ResultCppType *>(state) = count;
}
#endif
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::
- aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- if (count_star) {
- DCHECK_EQ(0u, argument_ids.size())
- << "Got wrong number of arguments for COUNT(*): "
- << argument_ids.size();
- } else {
- DCHECK_EQ(1u, argument_ids.size())
- << "Got wrong number of arguments for COUNT: " << argument_ids.size();
- }
-}
-
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::mergeStates(
- const AggregationState &source, AggregationState *destination) const {
- const AggregationStateCount &count_source =
- static_cast<const AggregationStateCount &>(source);
- AggregationStateCount *count_destination =
- static_cast<AggregationStateCount *>(destination);
-
- count_destination->count_.fetch_add(
- count_source.count_.load(std::memory_order_relaxed),
- std::memory_order_relaxed);
-}
-
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast(
- const std::uint8_t *source, std::uint8_t *destination) const {
- const std::int64_t *src_count_ptr =
- reinterpret_cast<const std::int64_t *>(source);
- std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(destination);
- (*dst_count_ptr) += (*src_count_ptr);
-}
-
-template <bool count_star, bool nullable_type>
-ColumnVector*
-AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- return finalizeHashTableHelperFast<
- AggregationHandleCount<count_star, nullable_type>,
- AggregationStateFastHashTable>(
- TypeFactory::GetType(kLong), hash_table, group_by_keys, index);
-}
-
-template <bool count_star, bool nullable_type>
-AggregationState* AggregationHandleCount<count_star, nullable_type>::
- aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const {
- DCHECK_EQ(count_star, false);
- return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
- AggregationHandleCount<count_star, nullable_type>,
- AggregationStateCount>(distinctify_hash_table);
-}
-
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::
- aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
- DCHECK_EQ(count_star, false);
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
- AggregationHandleCount<count_star, nullable_type>,
- AggregationStateFastHashTable>(
- distinctify_hash_table, aggregation_hash_table, index);
-}
-
// Explicitly instantiate and compile in the different versions of
// AggregationHandleCount we need. Note that we do not compile a version with
// 'count_star == true' and 'nullable_type == true', as that combination is
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 6aab0cd..629cf11 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -27,11 +27,8 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTableBase.hpp"
-#include "types/TypedValue.hpp"
+#include "types/LongType.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
@@ -49,42 +46,6 @@ class AggregationHandleCount;
*/
/**
- * @brief Aggregation state of count.
- */
-class AggregationStateCount : public AggregationState {
- public:
- /**
- * @brief Copy constructor.
- */
- AggregationStateCount(const AggregationStateCount &state)
- : count_(state.count_.load(std::memory_order_relaxed)) {}
-
- /**
- * @brief Destructor.
- */
- ~AggregationStateCount() override {}
-
- std::size_t getPayloadSize() const { return sizeof(count_); }
-
- const std::uint8_t* getPayloadAddress() const {
- return reinterpret_cast<const uint8_t *>(&count_);
- }
-
- private:
- friend class AggregationHandleCount<false, false>;
- friend class AggregationHandleCount<false, true>;
- friend class AggregationHandleCount<true, false>;
- friend class AggregationHandleCount<true, true>;
-
- AggregationStateCount() : count_(0) {}
-
- explicit AggregationStateCount(const std::int64_t initial_count)
- : count_(initial_count) {}
-
- std::atomic<std::int64_t> count_;
-};
-
-/**
* @brief An aggregationhandle for count.
*
* @param count_star If true, this AggregationHandleCount is for nullary
@@ -94,151 +55,35 @@ class AggregationStateCount : public AggregationState {
* not nullable and NULL-checks can safely be skipped.
**/
template <bool count_star, bool nullable_type>
-class AggregationHandleCount : public AggregationConcreteHandle {
+class AggregationHandleCount : public AggregationHandle {
public:
~AggregationHandleCount() override {}
- AggregationState* createInitialState() const override {
- return new AggregationStateCount();
- }
-
- AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const override;
+ void accumulateNullary(
+ void *state,
+ const std::size_t num_tuples) const override;
- inline void iterateNullaryInl(AggregationStateCount *state) const {
- state->count_.fetch_add(1, std::memory_order_relaxed);
- }
-
- inline void iterateNullaryInlFast(std::uint8_t *byte_ptr) const {
- std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
- (*count_ptr)++;
- }
-
- /**
- * @brief Iterate with count aggregation state.
- */
- inline void iterateUnaryInl(AggregationStateCount *state,
- const TypedValue &value) const {
- if ((!nullable_type) || (!value.isNull())) {
- state->count_.fetch_add(1, std::memory_order_relaxed);
- }
- }
-
- inline void iterateUnaryInlFast(const TypedValue &value,
- std::uint8_t *byte_ptr) const {
- if ((!nullable_type) || (!value.isNull())) {
- std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
- (*count_ptr)++;
- }
- }
-
- inline void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const override {
- if (!block_update_) {
- iterateUnaryInlFast(argument, byte_ptr);
- }
- }
-
- inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
- if (!block_update_) {
- iterateNullaryInlFast(byte_ptr);
- }
- }
-
- void blockUpdate() override { block_update_ = true; }
-
- void allowUpdate() override { block_update_ = false; }
-
- void initPayload(std::uint8_t *byte_ptr) const override {
- std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
- *count_ptr = 0;
- }
-
- AggregationState* accumulateNullary(
- const std::size_t num_tuples) const override {
- return new AggregationStateCount(num_tuples);
- }
-
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
+ void accumulateColumnVectors(
+ void *state,
+ const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
+ void accumulateValueAccessor(
+ void *state,
ValueAccessor *accessor,
const std::vector<attribute_id> &accessor_id) const override;
#endif
- void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const override;
-
- void mergeStates(const AggregationState &source,
- AggregationState *destination) const override;
-
- void mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const override;
-
- TypedValue finalize(const AggregationState &state) const override {
- return TypedValue(
- static_cast<const AggregationStateCount &>(state).count_.load(
- std::memory_order_relaxed));
- }
-
- inline TypedValue finalizeHashTableEntry(
- const AggregationState &state) const {
- return TypedValue(
- static_cast<const AggregationStateCount &>(state).count_.load(
- std::memory_order_relaxed));
- }
-
- inline TypedValue finalizeHashTableEntryFast(
- const std::uint8_t *byte_ptr) const {
- const std::int64_t *count_ptr =
- reinterpret_cast<const std::int64_t *>(byte_ptr);
- return TypedValue(*count_ptr);
- }
-
- ColumnVector* finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
- * for SUM aggregation.
- */
- AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table)
- const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
- * for SUM aggregation.
- */
- void aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const override;
-
- std::size_t getPayloadSize() const override { return sizeof(std::int64_t); }
-
private:
friend class AggregateFunctionCount;
+ typedef LongType ResultType;
+ typedef ResultType::cpptype ResultCppType;
+
/**
* @brief Constructor.
**/
- AggregationHandleCount() : block_update_(false) {}
-
- bool block_update_;
+ AggregationHandleCount();
DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
deleted file mode 100644
index 0dc8b56..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-#include <utility>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-
-#include "types/TypedValue.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class ColumnVector;
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-AggregationStateHashTableBase* AggregationHandleDistinct::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type*> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return createDistinctifyHashTable(
- hash_table_impl,
- group_by_types,
- estimated_num_groups,
- storage_manager);
-}
-
-void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- DCHECK_EQ(argument_ids.size(), 0u);
-
- insertValueAccessorIntoDistinctifyHashTable(
- accessor,
- group_by_key_ids,
- hash_table);
-}
-
-ColumnVector* AggregationHandleDistinct::finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- DCHECK(group_by_keys->empty());
-
- const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
- const bool &dumb_placeholder) -> void {
- group_by_keys->emplace_back(std::move(group_by_key));
- };
- static_cast<const AggregationStateFastHashTable&>(hash_table).forEachCompositeKeyFast(&keys_retriever);
-
- return nullptr;
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
deleted file mode 100644
index 838bfdd..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
-#include "storage/HashTableBase.hpp"
-#include "types/TypedValue.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class AggregationState;
-class ColumnVector;
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-/** \addtogroup Expressions
- * @{
- */
-
-class AggregationHandleDistinct : public AggregationConcreteHandle {
- public:
- /**
- * @brief Constructor.
- **/
- AggregationHandleDistinct() {}
-
- AggregationState* createInitialState() const override {
- LOG(FATAL)
- << "AggregationHandleDistinct does not support createInitialState().";
- }
-
- AggregationState* accumulateNullary(
- const std::size_t num_tuples) const override {
- LOG(FATAL)
- << "AggregationHandleDistinct does not support accumulateNullary().";
- }
-
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support "
- "accumulateColumnVectors().";
- }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support "
- "accumulateValueAccessor().";
- }
-#endif
-
- void mergeStates(const AggregationState &source,
- AggregationState *destination) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support mergeStates().";
- }
-
- TypedValue finalize(const AggregationState &state) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support finalize().";
- }
-
- AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table)
- const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support "
- << "aggregateOnDistinctifyHashTableForSingle().";
- }
-
- void aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *groupby_hash_table,
- std::size_t index) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support "
- << "aggregateOnDistinctifyHashTableForGroupBy().";
- }
-
- AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const override;
-
- void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const override;
-
- ColumnVector* finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const override;
-
- private:
- DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_