You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/31 20:13:58 UTC
[1/2] incubator-quickstep git commit: Initial update
Repository: incubator-quickstep
Updated Branches:
refs/heads/collision-free-agg [created] c1baa0150
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index ea74ee6..44c5989 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -373,94 +373,6 @@ void StorageBlock::selectSimple(const std::vector<attribute_id> &selection,
accessor.get());
}
-AggregationState* StorageBlock::aggregate(
- const AggregationHandle &handle,
- const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<attribute_id> *arguments_as_attributes,
- const TupleIdSequence *filter) const {
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all the arguments to this aggregate are plain relation attributes,
- // aggregate directly on a ValueAccessor from this block to avoid a copy.
- if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
- DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
- << "Mismatch between number of arguments and number of attribute_ids";
- return aggregateHelperValueAccessor(handle, *arguments_as_attributes, filter);
- }
- // TODO(shoban): We may want to optimize for ScalarLiteral here.
-#endif
-
- // Call aggregateHelperColumnVector() to materialize each argument as a
- // ColumnVector, then aggregate over those.
- return aggregateHelperColumnVector(handle, arguments, filter);
-}
-
-void StorageBlock::aggregateGroupBy(
- const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const TupleIdSequence *filter,
- AggregationStateHashTableBase *hash_table,
- std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
- DCHECK_GT(group_by.size(), 0u)
- << "Called aggregateGroupBy() with zero GROUP BY expressions";
-
- SubBlocksReference sub_blocks_ref(*tuple_store_,
- indices_,
- indices_consistent_);
-
- // IDs of 'arguments' as attributes in the ValueAccessor we create below.
- std::vector<attribute_id> argument_ids;
-
- // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
- std::vector<attribute_id> key_ids;
-
- // An intermediate ValueAccessor that stores the materialized 'arguments' for
- // this aggregate, as well as the GROUP BY expression values.
- ColumnVectorsValueAccessor temp_result;
- {
- std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
- attribute_id attr_id = 0;
-
- // First, put GROUP BY keys into 'temp_result'.
- if (reuse_group_by_vectors->empty()) {
- // Compute GROUP BY values from group_by Scalars, and store them in
- // reuse_group_by_vectors for reuse by other aggregates on this same
- // block.
- reuse_group_by_vectors->reserve(group_by.size());
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
- reuse_group_by_vectors->emplace_back(
- group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
- temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
- key_ids.push_back(attr_id++);
- }
- } else {
- // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
- DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
- << "Wrong number of reuse_group_by_vectors";
- for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
- temp_result.addColumn(reuse_cv.get(), false);
- key_ids.push_back(attr_id++);
- }
- }
-
- // Compute argument vectors and add them to 'temp_result'.
- for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
- for (const std::unique_ptr<const Scalar> &args : argument) {
- temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
- argument_ids.push_back(attr_id++);
- }
- if (argument.empty()) {
- argument_ids.push_back(kInvalidAttributeID);
- }
- }
- }
-
- hash_table->upsertValueAccessorCompositeKeyFast(argument_ids,
- &temp_result,
- key_ids,
- true);
-}
-
-
void StorageBlock::aggregateDistinct(
const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
@@ -1245,61 +1157,6 @@ std::unordered_map<attribute_id, TypedValue>* StorageBlock::generateUpdatedValue
return update_map;
}
-AggregationState* StorageBlock::aggregateHelperColumnVector(
- const AggregationHandle &handle,
- const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const TupleIdSequence *matches) const {
- if (arguments.empty()) {
- // Special case. This is a nullary aggregate (i.e. COUNT(*)).
- return handle.accumulateNullary(matches == nullptr ? tuple_store_->numTuples()
- : matches->size());
- } else {
- // Set up a ValueAccessor that will be used when materializing argument
- // values below (possibly filtered based on the '*matches' to a filter
- // predicate).
- std::unique_ptr<ValueAccessor> accessor;
- if (matches == nullptr) {
- accessor.reset(tuple_store_->createValueAccessor());
- } else {
- accessor.reset(tuple_store_->createValueAccessor(matches));
- }
-
- SubBlocksReference sub_blocks_ref(*tuple_store_,
- indices_,
- indices_consistent_);
-
- // Materialize each argument's values for this block as a ColumnVector.
- std::vector<std::unique_ptr<ColumnVector>> column_vectors;
- for (const std::unique_ptr<const Scalar> &argument : arguments) {
- column_vectors.emplace_back(argument->getAllValues(accessor.get(), &sub_blocks_ref));
- }
-
- // Have the AggregationHandle actually do the aggregation.
- return handle.accumulateColumnVectors(column_vectors);
- }
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* StorageBlock::aggregateHelperValueAccessor(
- const AggregationHandle &handle,
- const std::vector<attribute_id> &argument_ids,
- const TupleIdSequence *matches) const {
- // Set up a ValueAccessor to aggregate over (possibly filtered based on the
- // '*matches' to a filter predicate).
- std::unique_ptr<ValueAccessor> accessor;
- if (matches == nullptr) {
- accessor.reset(tuple_store_->createValueAccessor());
- } else {
- accessor.reset(tuple_store_->createValueAccessor(matches));
- }
-
- // Have the AggregationHandle actually do the aggregation.
- return handle.accumulateValueAccessor(
- accessor.get(),
- argument_ids);
-}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
void StorageBlock::updateHeader() {
DEBUG_ASSERT(*static_cast<const int*>(block_memory_) == block_header_.ByteSize());
@@ -1329,59 +1186,4 @@ const std::size_t StorageBlock::getNumTuples() const {
return tuple_store_->numTuples();
}
-void StorageBlock::aggregateGroupByPartitioned(
- const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const TupleIdSequence *filter,
- const std::size_t num_partitions,
- ColumnVectorsValueAccessor *temp_result,
- std::vector<attribute_id> *argument_ids,
- std::vector<attribute_id> *key_ids,
- std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
- DCHECK(!group_by.empty())
- << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions";
-
- SubBlocksReference sub_blocks_ref(*tuple_store_,
- indices_,
- indices_consistent_);
-
- std::unique_ptr<ValueAccessor> accessor(
- tuple_store_->createValueAccessor(filter));
-
- attribute_id attr_id = 0;
-
- // First, put GROUP BY keys into 'temp_result'.
- if (reuse_group_by_vectors->empty()) {
- // Compute GROUP BY values from group_by Scalars, and store them in
- // reuse_group_by_vectors for reuse by other aggregates on this same
- // block.
- reuse_group_by_vectors->reserve(group_by.size());
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
- reuse_group_by_vectors->emplace_back(
- group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
- temp_result->addColumn(reuse_group_by_vectors->back().get(), false);
- key_ids->push_back(attr_id++);
- }
- } else {
- // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
- DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
- << "Wrong number of reuse_group_by_vectors";
- for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
- temp_result->addColumn(reuse_cv.get(), false);
- key_ids->push_back(attr_id++);
- }
- }
-
- // Compute argument vectors and add them to 'temp_result'.
- for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
- for (const std::unique_ptr<const Scalar> &args : argument) {
- temp_result->addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
- argument_ids->push_back(attr_id++);
- }
- if (argument.empty()) {
- argument_ids->push_back(kInvalidAttributeID);
- }
- }
-}
-
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 56b3bdc..67aa1bf 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -387,121 +387,6 @@ class StorageBlock : public StorageBlockBase {
InsertDestinationInterface *destination) const;
/**
- * @brief Perform non GROUP BY aggregation on the tuples in the this storage
- * block, returning the aggregated result (for this block) in an
- * AggregationState.
- *
- * @param handle Aggregation handle that will be used to compute aggregate.
- * @param arguments The arguments of the aggregate function as expressions.
- * @param arguments_as_attributes If non-NULL, indicates a valid attribute_id
- * for each of the elements in arguments, and is used to elide a copy.
- * Has no effect if NULL, or if VECTOR_COPY_ELISION_LEVEL is NONE.
- * @param filter If non-NULL, then only tuple IDs which are set in the
- * filter will be checked (all others will be assumed to be false).
- *
- * @return Aggregated state for this block in the form of an
- * AggregationState. AggregationHandle::mergeStates() can be called
- * to merge with states from other blocks, and
- * AggregationHandle::finalize() can be used to generate a final
- * result.
- **/
- AggregationState* aggregate(
- const AggregationHandle &handle,
- const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<attribute_id> *arguments_as_attributes,
- const TupleIdSequence *filter) const;
-
- /**
- * @brief Perform GROUP BY aggregation on the tuples in the this storage
- * block.
- *
- * @param arguments The arguments to the aggregation function as Scalars.
- * @param group_by The list of GROUP BY attributes/expressions. The tuples in
- * this storage block are grouped by these attributes before
- * aggregation.
- * @param filter If non-NULL, then only tuple IDs which are set in the
- * filter will be checked (all others will be assumed to be false).
- * @param hash_table Hash table to store aggregation state mapped based on
- * GROUP BY value list (defined by \c group_by).
- * @param reuse_group_by_vectors This parameter is used to store and reuse
- * GROUP BY attribute vectors pre-computed in an earlier invocation of
- * aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
- * for ease of use. Current invocation of aggregateGroupBy() will reuse
- * ColumnVectors if non-empty, otherwise computes ColumnVectors based
- * on \c group_by and stores them in \c reuse_group_by_vectors.
- *
- * For sample usage of aggregateGroupBy, see this relevant pseudo-C++ code:
- * \code
- * std::vector<std::unique_ptr<ColumnVector>> group_by_vectors;
- * for each aggregate {
- * block.aggregateGroupBy(..., &group_by_vectors);
- * }
- * \endcode
- **/
- /*
- * TODO(shoban): Currently, we use ColumnVectorsValueAccessor to compute
- * temporary result for Scalars of aggregation attributes and GROUP BY
- * attributes. We will have to support specifying aggregation and GROUP BY
- * attributes as std::vector<attribute_id> (like in selectSimple()) for fast
- * path when there are no expressions specified in the query.
- */
- void aggregateGroupBy(
- const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const TupleIdSequence *filter,
- AggregationStateHashTableBase *hash_table,
- std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
-
-
- /**
- * @brief Perform the GROUP BY aggregation for the case when aggregation is
- * partitioned.
- *
- * TODO(harshad) - Refactor this class to use only one function
- * aggregateGroupBy.
- * @note The difference between this method and the aggregateGroupBy method
- * is that in this method, the tuples are routed to different HashTables
- * based on the partition to which they belong to. The partition is
- * determined by the GROUP BY attributes. Right now hash based
- * partitioning is performed.
- *
- * @note This function only creates the ColumnVectorsValueAccessor needed for
- * the insertion in the hash table. The actual insertion in respective
- * hash tables should be handled by the caller. See
- * AggregationOperationState::aggregateHashTable() for one such
- * implementation.
- *
- * @param arguments The arguments to the aggregation function as Scalars.
- * @param group_by The list of GROUP BY attributes/expressions. The tuples in
- * this storage block are grouped by these attributes before
- * aggregation.
- * @param filter If non-NULL, then only tuple IDs which are set in the
- * filter will be checked (all others will be assumed to be false).
- * @param num_partitions The number of partitions used for the aggregation.
- * @param temp_result The ColumnVectorsValueAccessor used for collecting
- * the attribute values from this StorageBlock.
- * @param arguments_ids The attribute IDs used for the aggregation, which
- * come from the arguments vector. If arguments is empty, this vector
- * is filled with invalid attribute IDs.
- * @param key_ids The attribute IDs of the group by attributes.
- * @param reuse_group_by_vectors This parameter is used to store and reuse
- * GROUP BY attribute vectors pre-computed in an earlier invocation of
- * aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
- * for ease of use. Current invocation of aggregateGroupBy() will reuse
- * ColumnVectors if non-empty, otherwise computes ColumnVectors based
- * on \c group_by and stores them in \c reuse_group_by_vectors.
- **/
- void aggregateGroupByPartitioned(
- const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const TupleIdSequence *filter,
- const std::size_t num_partitions,
- ColumnVectorsValueAccessor *temp_result,
- std::vector<attribute_id> *argument_ids,
- std::vector<attribute_id> *key_ids,
- std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
-
- /**
* @brief Inserts the GROUP BY expressions and aggregation arguments together
* as keys into the distinctify hash table.
*
[2/2] incubator-quickstep git commit: Initial update
Posted by ji...@apache.org.
Initial update
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c1baa015
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c1baa015
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c1baa015
Branch: refs/heads/collision-free-agg
Commit: c1baa0150bd949b3bff3af39c262106836c4a168
Parents: bc3d8a5
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Oct 27 12:23:03 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Mon Oct 31 15:12:53 2016 -0500
----------------------------------------------------------------------
.../aggregation/AggregationConcreteHandle.cpp | 22 +-
expressions/aggregation/AggregationHandle.hpp | 32 +-
.../aggregation/AggregationHandleAvg.cpp | 32 +-
.../aggregation/AggregationHandleAvg.hpp | 11 +-
.../aggregation/AggregationHandleCount.cpp | 57 +--
.../aggregation/AggregationHandleCount.hpp | 11 +-
.../aggregation/AggregationHandleDistinct.hpp | 16 +-
.../aggregation/AggregationHandleMax.cpp | 30 +-
.../aggregation/AggregationHandleMax.hpp | 11 +-
.../aggregation/AggregationHandleMin.cpp | 30 +-
.../aggregation/AggregationHandleMin.hpp | 11 +-
.../aggregation/AggregationHandleSum.cpp | 31 +-
.../aggregation/AggregationHandleSum.hpp | 12 +-
query_execution/QueryContext.hpp | 14 -
.../DestroyAggregationStateOperator.cpp | 7 -
storage/AggregationOperationState.cpp | 395 +++++++++----------
storage/AggregationOperationState.hpp | 68 ++--
storage/FastHashTable.hpp | 364 ++++++-----------
storage/HashTableBase.hpp | 7 +-
storage/StorageBlock.cpp | 198 ----------
storage/StorageBlock.hpp | 115 ------
21 files changed, 460 insertions(+), 1014 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index e3fb520..fd23c7e 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -52,17 +52,17 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
AggregationStateHashTableBase *distinctify_hash_table) const {
// If the key-value pair is already there, we don't need to update the value,
// which should always be "true". I.e. the value is just a placeholder.
-
- AggregationStateFastHashTable *hash_table =
- static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
- if (key_ids.size() == 1) {
- hash_table->upsertValueAccessorFast(
- key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
- } else {
- std::vector<attribute_id> empty_args {kInvalidAttributeID};
- hash_table->upsertValueAccessorCompositeKeyFast(
- empty_args, accessor, key_ids, true /* check_for_null_keys */);
- }
+ LOG(FATAL) << "Not supported";
+// AggregationStateFastHashTable *hash_table =
+// static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
+// if (key_ids.size() == 1) {
+// hash_table->upsertValueAccessorFast(
+// key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
+// } else {
+// std::vector<attribute_id> empty_args {kInvalidAttributeID};
+// hash_table->upsertValueAccessorCompositeKeyFast(
+// empty_args, accessor, key_ids, true /* check_for_null_keys */);
+// }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 4b51179..751b390 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -32,6 +32,7 @@
namespace quickstep {
class ColumnVector;
+class ColumnVectorsValueAccessor;
class StorageManager;
class Type;
class ValueAccessor;
@@ -153,39 +154,16 @@ class AggregationHandle {
const std::size_t num_tuples) const = 0;
/**
- * @brief Accumulate (iterate over) all values in one or more ColumnVectors
- * and return a new AggregationState which can be merged with other
- * states or finalized.
+ * @brief TODO
*
- * @param column_vectors One or more ColumnVectors that the aggregate will be
- * applied to. These correspond to the aggregate function's arguments,
- * in order.
* @return A new AggregationState which contains the accumulated results from
* applying the aggregate to column_vectors. Caller is responsible
* for deleting the returned AggregationState.
**/
- virtual AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const = 0;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- /**
- * @brief Accumulate (iterate over) all values in columns accessible through
- * a ValueAccessor and return a new AggregationState which can be
- * merged with other states or finalized.
- *
- * @param accessor A ValueAccessor that the columns to be aggregated can be
- * accessed through.
- * @param accessor_ids The attribute_ids that correspond to the columns in
- * accessor to aggeregate. These correspond to the aggregate
- * function's arguments, in order.
- * @return A new AggregationState which contains the accumulated results from
- * applying the aggregate to the specified columns in accessor.
- * Caller is responsible for deleting the returned AggregationState.
- **/
- virtual AggregationState* accumulateValueAccessor(
+ virtual AggregationState* accumulate(
ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const = 0;
-#endif
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const = 0;
/**
* @brief Perform an aggregation with GROUP BY over all the tuples accessible
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 2481092..672db1f 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -96,34 +96,28 @@ AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
}
-AggregationState* AggregationHandleAvg::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size();
+AggregationState* AggregationHandleAvg::accumulate(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const {
+ DCHECK_EQ(1u, argument_ids.size())
+ << "Got wrong number of attributes for AVG: " << argument_ids.size();
- AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
- std::size_t count = 0;
- state->sum_ = fast_add_operator_->accumulateColumnVector(
- state->sum_, *column_vectors.front(), &count);
- state->count_ = count;
- return state;
-}
+ const attribute_id argument_id = argument_ids.front();
+ DCHECK_NE(argument_id, kInvalidAttributeID);
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleAvg::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for AVG: " << accessor_ids.size();
+ ValueAccessor *target_accessor =
+ argument_id >= 0 ? accessor : aux_accessor;
+ const attribute_id target_argument_id =
+ argument_id >= 0 ? argument_id : -(argument_id+2);
AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
std::size_t count = 0;
state->sum_ = fast_add_operator_->accumulateValueAccessor(
- state->sum_, accessor, accessor_ids.front(), &count);
+ state->sum_, target_accessor, target_argument_id, &count);
state->count_ = count;
return state;
}
-#endif
void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
ValueAccessor *accessor,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 47132c6..46983be 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -169,15 +169,10 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
}
}
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
+ AggregationState* accumulate(
ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_id) const override;
-#endif
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const override;
void aggregateValueAccessorIntoHashTable(
ValueAccessor *accessor,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 034c942..9c00449 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -63,58 +63,32 @@ AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable(
template <bool count_star, bool nullable_type>
AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+AggregationHandleCount<count_star, nullable_type>::accumulate(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const {
DCHECK(!count_star)
<< "Called non-nullary accumulation method on an AggregationHandleCount "
<< "set up for nullary COUNT(*)";
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for COUNT: "
- << column_vectors.size();
+ DCHECK_EQ(1u, argument_ids.size())
+ << "Got wrong number of attributes for COUNT: " << argument_ids.size();
- std::size_t count = 0;
- InvokeOnColumnVector(
- *column_vectors.front(),
- [&](const auto &column_vector) -> void { // NOLINT(build/c++11)
- if (nullable_type) {
- // TODO(shoban): Iterating over the ColumnVector is a rather slow way
- // to do this. We should look at extending the ColumnVector interface
- // to do a quick count of the non-null values (i.e. the length minus
- // the population count of the null bitmap). We should do something
- // similar for ValueAccessor too.
- for (std::size_t pos = 0; pos < column_vector.size(); ++pos) {
- count += !column_vector.getTypedValue(pos).isNull();
- }
- } else {
- count = column_vector.size();
- }
- });
+ const attribute_id argument_id = argument_ids.front();
+ DCHECK_NE(argument_id, kInvalidAttributeID);
- return new AggregationStateCount(count);
-}
+ ValueAccessor *target_accessor =
+ argument_id >= 0 ? accessor : aux_accessor;
+ const attribute_id target_argument_id =
+ argument_id >= 0 ? argument_id : -(argument_id+2);
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK(!count_star)
- << "Called non-nullary accumulation method on an AggregationHandleCount "
- << "set up for nullary COUNT(*)";
-
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for COUNT: " << accessor_ids.size();
-
- const attribute_id accessor_id = accessor_ids.front();
std::size_t count = 0;
InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
- accessor,
- [&accessor_id, &count](auto *accessor) -> void { // NOLINT(build/c++11)
+ target_accessor,
+ [&target_argument_id, &count](auto *accessor) -> void { // NOLINT(build/c++11)
if (nullable_type) {
while (accessor->next()) {
- count += !accessor->getTypedValue(accessor_id).isNull();
+ count += !accessor->getTypedValue(target_argument_id).isNull();
}
} else {
count = accessor->getNumTuples();
@@ -123,7 +97,6 @@ AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
return new AggregationStateCount(count);
}
-#endif
template <bool count_star, bool nullable_type>
void AggregationHandleCount<count_star, nullable_type>::
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 6aab0cd..52e63f5 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -162,15 +162,10 @@ class AggregationHandleCount : public AggregationConcreteHandle {
return new AggregationStateCount(num_tuples);
}
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
+ AggregationState* accumulate(
ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_id) const override;
-#endif
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const override;
void aggregateValueAccessorIntoHashTable(
ValueAccessor *accessor,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
index 838bfdd..5d45219 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -62,21 +62,13 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
<< "AggregationHandleDistinct does not support accumulateNullary().";
}
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support "
- "accumulateColumnVectors().";
- }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
+ AggregationState* accumulate(
ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const override {
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const override {
LOG(FATAL) << "AggregationHandleDistinct does not support "
- "accumulateValueAccessor().";
+ "accumulate().";
}
-#endif
void mergeStates(const AggregationState &source,
AggregationState *destination) const override {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index c2d571b..aa12ed2 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -54,28 +54,26 @@ AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable(
hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
}
-AggregationState* AggregationHandleMax::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size();
+AggregationState* AggregationHandleMax::accumulate(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const {
+ DCHECK_EQ(1u, argument_ids.size())
+ << "Got wrong number of attributes for MAX: " << argument_ids.size();
- return new AggregationStateMax(fast_comparator_->accumulateColumnVector(
- type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
+ const attribute_id argument_id = argument_ids.front();
+ DCHECK_NE(argument_id, kInvalidAttributeID);
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMax::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for MAX: " << accessor_ids.size();
+ ValueAccessor *target_accessor =
+ argument_id >= 0 ? accessor : aux_accessor;
+ const attribute_id target_argument_id =
+ argument_id >= 0 ? argument_id : -(argument_id+2);
return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
type_.getNullableVersion().makeNullValue(),
- accessor,
- accessor_ids.front()));
+ target_accessor,
+ target_argument_id));
}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
ValueAccessor *accessor,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index d851a0c..ab7da0a 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -136,15 +136,10 @@ class AggregationHandleMax : public AggregationConcreteHandle {
}
}
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
+ AggregationState* accumulate(
ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const override;
-#endif
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const override;
void aggregateValueAccessorIntoHashTable(
ValueAccessor *accessor,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index a07f299..12701f2 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -54,28 +54,26 @@ AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable(
hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
}
-AggregationState* AggregationHandleMin::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size();
+AggregationState* AggregationHandleMin::accumulate(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const {
+ DCHECK_EQ(1u, argument_ids.size())
+ << "Got wrong number of attributes for MIN: " << argument_ids.size();
- return new AggregationStateMin(fast_comparator_->accumulateColumnVector(
- type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
+ const attribute_id argument_id = argument_ids.front();
+ DCHECK_NE(argument_id, kInvalidAttributeID);
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMin::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for MIN: " << accessor_ids.size();
+ ValueAccessor *target_accessor =
+ argument_id >= 0 ? accessor : aux_accessor;
+ const attribute_id target_argument_id =
+ argument_id >= 0 ? argument_id : -(argument_id+2);
return new AggregationStateMin(fast_comparator_->accumulateValueAccessor(
type_.getNullableVersion().makeNullValue(),
- accessor,
- accessor_ids.front()));
+ target_accessor,
+ target_argument_id));
}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
ValueAccessor *accessor,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index e3472ec..dcadb80 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -138,15 +138,10 @@ class AggregationHandleMin : public AggregationConcreteHandle {
}
}
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
+ AggregationState* accumulate(
ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const override;
-#endif
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const override;
void aggregateValueAccessorIntoHashTable(
ValueAccessor *accessor,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 642d88d..f00e00c 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -88,29 +88,26 @@ AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable(
hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
}
-AggregationState* AggregationHandleSum::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size();
- std::size_t num_tuples = 0;
- TypedValue cv_sum = fast_operator_->accumulateColumnVector(
- blank_state_.sum_, *column_vectors.front(), &num_tuples);
- return new AggregationStateSum(std::move(cv_sum), num_tuples == 0);
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleSum::accumulateValueAccessor(
+AggregationState* AggregationHandleSum::accumulate(
ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for SUM: " << accessor_ids.size();
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const {
+ DCHECK_EQ(1u, argument_ids.size())
+ << "Got wrong number of attributes for SUM: " << argument_ids.size();
+
+ const attribute_id argument_id = argument_ids.front();
+ DCHECK_NE(argument_id, kInvalidAttributeID);
+
+ ValueAccessor *target_accessor =
+ argument_id >= 0 ? accessor : aux_accessor;
+ const attribute_id target_argument_id =
+ argument_id >= 0 ? argument_id : -(argument_id+2);
std::size_t num_tuples = 0;
TypedValue va_sum = fast_operator_->accumulateValueAccessor(
- blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples);
+ blank_state_.sum_, target_accessor, target_argument_id, &num_tuples);
return new AggregationStateSum(std::move(va_sum), num_tuples == 0);
}
-#endif
void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
ValueAccessor *accessor,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index f0d23e1..cc24335 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -41,6 +41,7 @@
namespace quickstep {
class ColumnVector;
+class ColumnVectorsValueAccessor;
class StorageManager;
class ValueAccessor;
@@ -161,15 +162,10 @@ class AggregationHandleSum : public AggregationConcreteHandle {
}
}
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
+ AggregationState* accumulate(
ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_id) const override;
-#endif
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const override;
void aggregateValueAccessorIntoHashTable(
ValueAccessor *accessor,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 7ad8fa1..8733093 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -200,20 +200,6 @@ class QueryContext {
}
/**
- * @brief Destroy the payloads from the aggregation hash tables.
- *
- * @warning After calling these methods, the hash table will be in an invalid
- * state. No other operation should be performed on them.
- *
- * @param id The ID of the AggregationOperationState.
- **/
- inline void destroyAggregationHashTablePayload(const aggregation_state_id id) {
- DCHECK_LT(id, aggregation_states_.size());
- DCHECK(aggregation_states_[id]);
- aggregation_states_[id]->destroyAggregationHashTablePayload();
- }
-
- /**
* @brief Whether the given GeneratorFunctionHandle id is valid.
*
* @param id The GeneratorFunctionHandle id.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
index 49be43d..62ca9e7 100644
--- a/relational_operators/DestroyAggregationStateOperator.cpp
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -58,13 +58,6 @@ bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosConta
}
void DestroyAggregationStateWorkOrder::execute() {
- // NOTE(harshad) : The destroyAggregationHashTablePayload call is separate
- // from the destroyAggregationState call. The reason is that the aggregation
- // hash tables don't own the AggregationHandle objects. However the hash table
- // class requires the handles for destroying the payload (see the
- // destroyPayload methods in AggregationHandle classes). Therefore, we first
- // destroy the payloads in the hash table and then destroy the hash table.
- query_context_->destroyAggregationHashTablePayload(aggr_state_index_);
query_context_->destroyAggregationState(aggr_state_index_);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index b942c1b..6470f33 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -39,13 +39,14 @@
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "storage/AggregationOperationState.pb.h"
-#include "storage/HashTable.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/FastHashTableFactory.hpp"
#include "storage/HashTableBase.hpp"
-#include "storage/HashTableFactory.hpp"
#include "storage/InsertDestination.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
#include "storage/TupleIdSequence.hpp"
#include "storage/ValueAccessor.hpp"
#include "types/TypedValue.hpp"
@@ -83,33 +84,42 @@ AggregationOperationState::AggregationOperationState(
is_aggregate_partitioned_(checkAggregatePartitioned(
estimated_num_entries, is_distinct, group_by, aggregate_functions)),
predicate_(predicate),
- group_by_list_(std::move(group_by)),
- arguments_(std::move(arguments)),
is_distinct_(std::move(is_distinct)),
storage_manager_(storage_manager) {
// Sanity checks: each aggregate has a corresponding list of arguments.
- DCHECK(aggregate_functions.size() == arguments_.size());
+ DCHECK(aggregate_functions.size() == arguments.size());
// Get the types of GROUP BY expressions for creating HashTables below.
- std::vector<const Type *> group_by_types;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- group_by_types.emplace_back(&group_by_element->getType());
+ for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ group_by_types_.emplace_back(&group_by_element->getType());
+ }
+
+ // Prepare group-by element attribute ids and non-trivial expressions.
+ for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ const attribute_id attr_id =
+ group_by_element->getAttributeIdForValueAccessor();
+ if (attr_id == kInvalidAttributeID) {
+ const attribute_id non_trivial_attr_id =
+ -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
+ non_trivial_expressions_.emplace_back(group_by_element.release());
+ group_by_key_ids_.emplace_back(non_trivial_attr_id);
+ } else {
+ group_by_key_ids_.emplace_back(attr_id);
+ }
}
std::vector<AggregationHandle *> group_by_handles;
- group_by_handles.clear();
if (aggregate_functions.size() == 0) {
// If there is no aggregation function, then it is a distinctify operation
// on the group-by expressions.
- DCHECK_GT(group_by_list_.size(), 0u);
+ DCHECK_GT(group_by_key_ids_.size(), 0u);
handles_.emplace_back(new AggregationHandleDistinct());
- arguments_.push_back({});
is_distinct_.emplace_back(false);
group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
hash_table_impl_type,
- group_by_types,
+ group_by_types_,
{1},
handles_,
storage_manager));
@@ -117,8 +127,8 @@ AggregationOperationState::AggregationOperationState(
// Set up each individual aggregate in this operation.
std::vector<const AggregateFunction *>::const_iterator agg_func_it =
aggregate_functions.begin();
- std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
- args_it = arguments_.begin();
+ std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
+ args_it = arguments.begin();
std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
std::vector<HashTableImplType>::const_iterator
distinctify_hash_table_impl_types_it =
@@ -133,6 +143,22 @@ AggregationOperationState::AggregationOperationState(
argument_types.emplace_back(&argument->getType());
}
+ // Prepare argument attribute ids and non-trivial expressions.
+ std::vector<attribute_id> argument_ids;
+ for (std::unique_ptr<const Scalar> &argument : *args_it) {
+ const attribute_id attr_id =
+ argument->getAttributeIdForValueAccessor();
+ if (attr_id == kInvalidAttributeID) {
+ const attribute_id non_trivial_attr_id =
+ -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
+ non_trivial_expressions_.emplace_back(argument.release());
+ argument_ids.emplace_back(non_trivial_attr_id);
+ } else {
+ argument_ids.emplace_back(attr_id);
+ }
+ }
+ argument_ids_.emplace_back(std::move(argument_ids));
+
// Sanity checks: aggregate function exists and can apply to the specified
// arguments.
DCHECK(*agg_func_it != nullptr);
@@ -142,7 +168,7 @@ AggregationOperationState::AggregationOperationState(
// to do actual aggregate computation.
handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
- if (!group_by_list_.empty()) {
+ if (!group_by_key_ids_.empty()) {
// Aggregation with GROUP BY: combined payload is partially updated in
// the presence of DISTINCT.
if (*is_distinct_it) {
@@ -153,35 +179,12 @@ AggregationOperationState::AggregationOperationState(
} else {
// Aggregation without GROUP BY: create a single global state.
single_states_.emplace_back(handles_.back()->createInitialState());
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // See if all of this aggregate's arguments are attributes in the input
- // relation. If so, remember the attribute IDs so that we can do copy
- // elision when actually performing the aggregation.
- std::vector<attribute_id> local_arguments_as_attributes;
- local_arguments_as_attributes.reserve(args_it->size());
- for (const std::unique_ptr<const Scalar> &argument : *args_it) {
- const attribute_id argument_id =
- argument->getAttributeIdForValueAccessor();
- if (argument_id == -1) {
- local_arguments_as_attributes.clear();
- break;
- } else {
- DCHECK_EQ(input_relation_.getID(),
- argument->getRelationIdForValueAccessor());
- local_arguments_as_attributes.push_back(argument_id);
- }
- }
-
- arguments_as_attributes_.emplace_back(
- std::move(local_arguments_as_attributes));
-#endif
}
// Initialize the corresponding distinctify hash table if this is a
// DISTINCT aggregation.
if (*is_distinct_it) {
- std::vector<const Type *> key_types(group_by_types);
+ std::vector<const Type *> key_types(group_by_types_);
key_types.insert(
key_types.end(), argument_types.begin(), argument_types.end());
// TODO(jianqiao): estimated_num_entries is quite inaccurate for
@@ -203,12 +206,12 @@ AggregationOperationState::AggregationOperationState(
}
}
- if (!group_by_handles.empty()) {
- // Aggregation with GROUP BY: create a HashTable pool.
+ // Aggregation with GROUP BY: create a HashTable pool.
+ if (!group_by_key_ids_.empty()) {
if (!is_aggregate_partitioned_) {
group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
hash_table_impl_type,
- group_by_types,
+ group_by_types_,
payload_sizes,
group_by_handles,
storage_manager));
@@ -217,7 +220,7 @@ AggregationOperationState::AggregationOperationState(
new PartitionedHashTablePool(estimated_num_entries,
FLAGS_num_aggregation_partitions,
hash_table_impl_type,
- group_by_types,
+ group_by_types_,
payload_sizes,
group_by_handles,
storage_manager));
@@ -355,7 +358,7 @@ bool AggregationOperationState::ProtoIsValid(
void AggregationOperationState::aggregateBlock(const block_id input_block,
LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
- if (group_by_list_.empty()) {
+ if (group_by_key_ids_.empty()) {
aggregateBlockSingleState(input_block);
} else {
aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
@@ -364,7 +367,7 @@ void AggregationOperationState::aggregateBlock(const block_id input_block,
void AggregationOperationState::finalizeAggregate(
InsertDestination *output_destination) {
- if (group_by_list_.empty()) {
+ if (group_by_key_ids_.empty()) {
finalizeSingleState(output_destination);
} else {
finalizeHashTable(output_destination);
@@ -392,37 +395,53 @@ void AggregationOperationState::aggregateBlockSingleState(
std::unique_ptr<TupleIdSequence> matches;
if (predicate_ != nullptr) {
- std::unique_ptr<ValueAccessor> accessor(
- block->getTupleStorageSubBlock().createValueAccessor());
matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
}
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all arguments are attributes of the input relation, elide a copy.
- if (!arguments_as_attributes_[agg_idx].empty()) {
- local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
+ const auto &tuple_store = block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> accessor(
+ tuple_store.createValueAccessor(matches.get()));
+
+ ColumnVectorsValueAccessor non_trivial_results;
+ if (!non_trivial_expressions_.empty()) {
+ SubBlocksReference sub_blocks_ref(tuple_store,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ for (const auto &expression : non_trivial_expressions_) {
+ non_trivial_results.addColumn(
+ expression->getAllValues(accessor.get(), &sub_blocks_ref));
}
-#endif
+ }
+
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
+ LOG(FATAL) << "Distinct aggregation not supported";
// Call StorageBlock::aggregateDistinct() to put the arguments as keys
// directly into the (threadsafe) shared global distinctify HashTable
// for this aggregate.
- block->aggregateDistinct(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- {}, /* group_by */
- matches.get(),
- distinctify_hashtables_[agg_idx].get(),
- nullptr /* reuse_group_by_vectors */);
+// block->aggregateDistinct(*handles_[agg_idx],
+// arguments_[agg_idx],
+// local_arguments_as_attributes,
+// {}, /* group_by */
+// matches.get(),
+// distinctify_hashtables_[agg_idx].get(),
+// nullptr /* reuse_group_by_vectors */);
+ // TODO(jianqiao): handle distinct
local_state.emplace_back(nullptr);
} else {
- // Call StorageBlock::aggregate() to actually do the aggregation.
- local_state.emplace_back(block->aggregate(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- matches.get()));
+ const auto &argument_ids = argument_ids_[agg_idx];
+ const auto *handle = handles_[agg_idx];
+
+ AggregationState *state;
+ if (argument_ids.empty()) {
+ // Special case. This is a nullary aggregate (i.e. COUNT(*)).
+ state = handle->accumulateNullary(matches == nullptr ? tuple_store.numTuples()
+ : matches->size());
+ } else {
+ // Have the AggregationHandle actually do the aggregation.
+ state = handle->accumulate(accessor.get(), &non_trivial_results, argument_ids);
+ }
+ local_state.emplace_back(state);
}
}
@@ -435,104 +454,107 @@ void AggregationOperationState::aggregateBlockHashTable(
LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
BlockReference block(
storage_manager_->getBlock(input_block, input_relation_));
+ const auto &tuple_store = block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor());
+ std::unique_ptr<ValueAccessor> shared_accessor;
+ ValueAccessor *accessor = base_accessor.get();
// Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
// as the existence map for the tuples.
std::unique_ptr<TupleIdSequence> matches;
if (predicate_ != nullptr) {
matches.reset(block->getMatchesForPredicate(predicate_.get()));
+ shared_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+ accessor = shared_accessor.get();
}
if (lip_filter_adaptive_prober != nullptr) {
- std::unique_ptr<ValueAccessor> accessor(
- block->getTupleStorageSubBlock().createValueAccessor(matches.get()));
- matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
+ matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor));
+ shared_accessor.reset(
+ base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+ accessor = shared_accessor.get();
}
- // This holds values of all the GROUP BY attributes so that the can be reused
- // across multiple aggregates (i.e. we only pay the cost of evaluatin the
- // GROUP BY expressions once).
- std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
+ ColumnVectorsValueAccessor non_trivial_results;
+ if (!non_trivial_expressions_.empty()) {
+ SubBlocksReference sub_blocks_ref(tuple_store,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ for (const auto &expression : non_trivial_expressions_) {
+ non_trivial_results.addColumn(
+ expression->getAllValues(accessor, &sub_blocks_ref));
+ }
+ }
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
+ LOG(FATAL) << "Distinct aggregation not supported";
// Call StorageBlock::aggregateDistinct() to insert the GROUP BY
// expression
// values and the aggregation arguments together as keys directly into the
// (threadsafe) shared global distinctify HashTable for this aggregate.
- block->aggregateDistinct(*handles_[agg_idx],
- arguments_[agg_idx],
- nullptr, /* arguments_as_attributes */
- group_by_list_,
- matches.get(),
- distinctify_hashtables_[agg_idx].get(),
- &reuse_group_by_vectors);
+// block->aggregateDistinct(*handles_[agg_idx],
+// arguments_[agg_idx],
+// nullptr, /* arguments_as_attributes */
+// group_by_list_,
+// matches.get(),
+// distinctify_hashtables_[agg_idx].get(),
+// &reuse_group_by_vectors);
+ // TODO(jianqiao): handle distinct
}
}
if (!is_aggregate_partitioned_) {
- // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
- // directly into the (threadsafe) shared global HashTable for this
- // aggregate.
DCHECK(group_by_hashtable_pool_ != nullptr);
+
AggregationStateHashTableBase *agg_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
+ group_by_hashtable_pool_->getHashTableFast();
DCHECK(agg_hash_table != nullptr);
- block->aggregateGroupBy(arguments_,
- group_by_list_,
- matches.get(),
- agg_hash_table,
- &reuse_group_by_vectors);
+
+ accessor->beginIterationVirtual();
+ agg_hash_table->upsertValueAccessorCompositeKeyFast(accessor,
+ &non_trivial_results,
+ argument_ids_,
+ group_by_key_ids_,
+ true /* check_for_null_keys */);
group_by_hashtable_pool_->returnHashTable(agg_hash_table);
} else {
- ColumnVectorsValueAccessor temp_result;
- // IDs of 'arguments' as attributes in the ValueAccessor we create below.
- std::vector<attribute_id> argument_ids;
-
- // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
- std::vector<attribute_id> key_ids;
- const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
- block->aggregateGroupByPartitioned(
- arguments_,
- group_by_list_,
- matches.get(),
- num_partitions,
- &temp_result,
- &argument_ids,
- &key_ids,
- &reuse_group_by_vectors);
- // Compute the partitions for the tuple formed by group by values.
- std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
- partition_membership.resize(num_partitions);
-
- // Create a tuple-id sequence for each partition.
- for (std::size_t partition = 0; partition < num_partitions; ++partition) {
- partition_membership[partition].reset(
- new TupleIdSequence(temp_result.getEndPosition()));
- }
-
- // Iterate over ValueAccessor for each tuple,
- // set a bit in the appropriate TupleIdSequence.
- temp_result.beginIteration();
- while (temp_result.next()) {
- // We need a unique_ptr because getTupleWithAttributes() uses "new".
- std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
- const std::size_t curr_tuple_partition_id =
- curr_tuple->getTupleHash() % num_partitions;
- partition_membership[curr_tuple_partition_id]->set(
- temp_result.getCurrentPosition(), true);
- }
- // For each partition, create an adapter around Value Accessor and
- // TupleIdSequence.
- std::vector<std::unique_ptr<
- TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
- adapter.resize(num_partitions);
- for (std::size_t partition = 0; partition < num_partitions; ++partition) {
- adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
- *(partition_membership)[partition]));
- partitioned_group_by_hashtable_pool_->getHashTable(partition)
- ->upsertValueAccessorCompositeKeyFast(
- argument_ids, adapter[partition].get(), key_ids, true);
- }
+ LOG(FATAL) << "Partitioned aggregation not supported";
+// const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
+//
+// // Compute the partitions for the tuple formed by group by values.
+// std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
+// partition_membership.resize(num_partitions);
+//
+// // Create a tuple-id sequence for each partition.
+// for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+// partition_membership[partition].reset(
+// new TupleIdSequence(temp_result.getEndPosition()));
+// }
+//
+// // Iterate over ValueAccessor for each tuple,
+// // set a bit in the appropriate TupleIdSequence.
+// temp_result.beginIteration();
+// while (temp_result.next()) {
+// // We need a unique_ptr because getTupleWithAttributes() uses "new".
+// std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
+// const std::size_t curr_tuple_partition_id =
+// curr_tuple->getTupleHash() % num_partitions;
+// partition_membership[curr_tuple_partition_id]->set(
+// temp_result.getCurrentPosition(), true);
+// }
+// // For each partition, create an adapter around Value Accessor and
+// // TupleIdSequence.
+// std::vector<std::unique_ptr<
+// TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
+// adapter.resize(num_partitions);
+// for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+// adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
+// *(partition_membership)[partition]));
+// partitioned_group_by_hashtable_pool_->getHashTable(partition)
+// ->upsertValueAccessorCompositeKeyFast(
+// argument_ids, adapter[partition].get(), key_ids, true);
+// }
}
}
@@ -577,59 +599,36 @@ void AggregationOperationState::finalizeHashTable(
// e.g. Keep merging entries from smaller hash tables to larger.
auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- if (hash_tables->size() > 1) {
- for (int hash_table_index = 0;
- hash_table_index < static_cast<int>(hash_tables->size() - 1);
- ++hash_table_index) {
- // Merge each hash table to the last hash table.
- mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
- hash_tables->back().get());
- }
+ DCHECK(hash_tables != nullptr);
+ if (hash_tables->empty()) {
+ return;
+ }
+
+ std::unique_ptr<AggregationStateHashTableBase> final_hash_table(
+ hash_tables->back().release());
+ for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+ std::unique_ptr<AggregationStateHashTableBase> hash_table(
+ hash_tables->at(i).release());
+ mergeGroupByHashTables(hash_table.get(), final_hash_table.get());
+ hash_table->destroyPayload();
}
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
- DCHECK(group_by_hashtable_pool_ != nullptr);
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(hash_tables != nullptr);
- if (hash_tables->empty()) {
- // We may have a case where hash_tables is empty, e.g. no input blocks.
- // However for aggregateOnDistinctifyHashTableForGroupBy to work
- // correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- group_by_hashtable_pool_->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- DCHECK(hash_tables->back() != nullptr);
- AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
- DCHECK(agg_hash_table != nullptr);
handles_[agg_idx]->allowUpdate();
handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
- *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
+ *distinctify_hashtables_[agg_idx], final_hash_table.get(), agg_idx);
}
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(hash_tables != nullptr);
- if (hash_tables->empty()) {
- // We may have a case where hash_tables is empty, e.g. no input blocks.
- // However for aggregateOnDistinctifyHashTableForGroupBy to work
- // correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table =
- group_by_hashtable_pool_->getHashTableFast();
- group_by_hashtable_pool_->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
- DCHECK(agg_hash_table != nullptr);
ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
- *agg_hash_table, &group_by_keys, agg_idx);
+ *final_hash_table, &group_by_keys, agg_idx);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
}
}
+ final_hash_table->destroyPayload();
// Reorganize 'group_by_keys' in column-major order so that we can make a
// ColumnVectorsValueAccessor to bulk-insert results.
@@ -640,11 +639,10 @@ void AggregationOperationState::finalizeHashTable(
// in a single HashTable.
std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
std::size_t group_by_element_idx = 0;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- const Type &group_by_type = group_by_element->getType();
- if (NativeColumnVector::UsableForType(group_by_type)) {
+ for (const Type *group_by_type : group_by_types_) {
+ if (NativeColumnVector::UsableForType(*group_by_type)) {
NativeColumnVector *element_cv =
- new NativeColumnVector(group_by_type, group_by_keys.size());
+ new NativeColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
element_cv->appendTypedValue(
@@ -652,7 +650,7 @@ void AggregationOperationState::finalizeHashTable(
}
} else {
IndirectColumnVector *element_cv =
- new IndirectColumnVector(group_by_type, group_by_keys.size());
+ new IndirectColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
element_cv->appendTypedValue(
@@ -676,25 +674,6 @@ void AggregationOperationState::finalizeHashTable(
output_destination->bulkInsertTuples(&complete_result);
}
-void AggregationOperationState::destroyAggregationHashTablePayload() {
- std::vector<std::unique_ptr<AggregationStateHashTableBase>> *all_hash_tables =
- nullptr;
- if (!is_aggregate_partitioned_) {
- if (group_by_hashtable_pool_ != nullptr) {
- all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
- }
- } else {
- if (partitioned_group_by_hashtable_pool_ != nullptr) {
- all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables();
- }
- }
- if (all_hash_tables != nullptr) {
- for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index) {
- (*all_hash_tables)[ht_index]->destroyPayload();
- }
- }
-}
-
void AggregationOperationState::finalizeAggregatePartitioned(
const std::size_t partition_id, InsertDestination *output_destination) {
// Each element of 'group_by_keys' is a vector of values for a particular
@@ -703,15 +682,16 @@ void AggregationOperationState::finalizeAggregatePartitioned(
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
+ AggregationStateHashTableBase *hash_table =
+ partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- AggregationStateHashTableBase *hash_table =
- partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
*hash_table, &group_by_keys, agg_idx);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
}
}
+ hash_table->destroyPayload();
// Reorganize 'group_by_keys' in column-major order so that we can make a
// ColumnVectorsValueAccessor to bulk-insert results.
@@ -722,16 +702,17 @@ void AggregationOperationState::finalizeAggregatePartitioned(
// in a single HashTable.
std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
std::size_t group_by_element_idx = 0;
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
- const Type &group_by_type = group_by_element->getType();
- if (NativeColumnVector::UsableForType(group_by_type)) {
- NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+ for (const Type *group_by_type : group_by_types_) {
+ if (NativeColumnVector::UsableForType(*group_by_type)) {
+ NativeColumnVector *element_cv =
+ new NativeColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
}
} else {
- IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+ IndirectColumnVector *element_cv =
+ new IndirectColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index e0826b0..9f2bd92 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -33,6 +33,7 @@
#include "storage/HashTableBase.hpp"
#include "storage/HashTablePool.hpp"
#include "storage/PartitionedHashTablePool.hpp"
+#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
@@ -46,6 +47,7 @@ class CatalogRelationSchema;
class InsertDestination;
class LIPFilterAdaptiveProber;
class StorageManager;
+class TupleIdSequence;
DECLARE_int32(num_aggregation_partitions);
DECLARE_int32(partition_aggregation_num_groups_threshold);
@@ -178,11 +180,6 @@ class AggregationOperationState {
void finalizeAggregate(InsertDestination *output_destination);
/**
- * @brief Destroy the payloads in the aggregation hash tables.
- **/
- void destroyAggregationHashTablePayload();
-
- /**
* @brief Generate the final results for the aggregates managed by this
* AggregationOperationState and write them out to StorageBlock(s).
* In this implementation, each thread picks a hash table belonging to
@@ -236,27 +233,28 @@ class AggregationOperationState {
const std::vector<bool> &is_distinct,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
const std::vector<const AggregateFunction *> &aggregate_functions) const {
- // If there's no aggregation, return false.
- if (aggregate_functions.empty()) {
- return false;
- }
- // Check if there's a distinct operation involved in any aggregate, if so
- // the aggregate can't be partitioned.
- for (auto distinct : is_distinct) {
- if (distinct) {
- return false;
- }
- }
- // There's no distinct aggregation involved, Check if there's at least one
- // GROUP BY operation.
- if (group_by.empty()) {
- return false;
- }
- // There are GROUP BYs without DISTINCT. Check if the estimated number of
- // groups is large enough to warrant a partitioned aggregation.
- return estimated_num_groups >
- static_cast<std::size_t>(
- FLAGS_partition_aggregation_num_groups_threshold);
+// // If there's no aggregation, return false.
+// if (aggregate_functions.empty()) {
+// return false;
+// }
+// // Check if there's a distinct operation involved in any aggregate, if so
+// // the aggregate can't be partitioned.
+// for (auto distinct : is_distinct) {
+// if (distinct) {
+// return false;
+// }
+// }
+// // There's no distinct aggregation involved, Check if there's at least one
+// // GROUP BY operation.
+// if (group_by.empty()) {
+// return false;
+// }
+// // There are GROUP BYs without DISTINCT. Check if the estimated number of
+// // groups is large enough to warrant a partitioned aggregation.
+// return estimated_num_groups >
+// static_cast<std::size_t>(
+// FLAGS_partition_aggregation_num_groups_threshold);
+ return false;
}
// Common state for all aggregates in this operation: the input relation, the
@@ -267,27 +265,27 @@ class AggregationOperationState {
const bool is_aggregate_partitioned_;
std::unique_ptr<const Predicate> predicate_;
- std::vector<std::unique_ptr<const Scalar>> group_by_list_;
// Each individual aggregate in this operation has an AggregationHandle and
- // some number of Scalar arguments.
+ // zero (indicated by -1) or one argument.
std::vector<AggregationHandle *> handles_;
- std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
// For each aggregate, whether DISTINCT should be applied to the aggregate's
// arguments.
std::vector<bool> is_distinct_;
+ // Non-trivial group-by/argument expressions that need to be evaluated.
+ std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
+
+ std::vector<attribute_id> group_by_key_ids_;
+ std::vector<std::vector<attribute_id>> argument_ids_;
+
+ std::vector<const Type *> group_by_types_;
+
// Hash table for obtaining distinct (i.e. unique) arguments.
std::vector<std::unique_ptr<AggregationStateHashTableBase>>
distinctify_hashtables_;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all an aggregate's argument expressions are simply attributes in
- // 'input_relation_', then this caches the attribute IDs of those arguments.
- std::vector<std::vector<attribute_id>> arguments_as_attributes_;
-#endif
-
// Per-aggregate global states for aggregation without GROUP BY.
std::vector<std::unique_ptr<AggregationState>> single_states_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/storage/FastHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp
index 4a82a62..04e33bd 100644
--- a/storage/FastHashTable.hpp
+++ b/storage/FastHashTable.hpp
@@ -39,6 +39,7 @@
#include "threading/SpinSharedMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "utility/HashPair.hpp"
#include "utility/Macros.hpp"
@@ -407,109 +408,17 @@ class FastHashTable : public HashTableBase<resizable,
const std::uint8_t *init_value_ptr,
const std::uint8_t *source_state);
- /**
- * @brief Apply a functor to (multiple) entries in this hash table, with keys
- * drawn from a ValueAccessor. New values are first inserted if not
- * already present.
- *
- * @warning This method is only usable if allow_duplicate_keys is false.
- * @warning This method is threadsafe with regard to other calls to upsert(),
- * upsertCompositeKey(), upsertValueAccessor(), and
- * upsertValueAccessorCompositeKey(), but should not be used
- * simultaneously with put(), putCompositeKey(), putValueAccessor(),
- * or putValueAccessorCompositeKey().
- * @warning The ValueAccessor reference and ValueT* pointer passed to
- * functor's call operator are only guaranteed to be valid for the
- * duration of the call. The functor should not store a copy of
- * these pointers and assume that they remain valid.
- * @warning Although this method itself is threadsafe, the ValueT object
- * accessed by functor is not guaranteed to be (although it is
- * guaranteed that its initial insertion will be atomic). If it is
- * possible for multiple threads to call upsertValueAccessor() with
- * the same key at the same time, then their access to ValueT should
- * be made threadsafe (e.g. with the use of atomic types, mutexes,
- * or some other external synchronization).
- * @note This version is for single scalar keys, see also
- * upsertValueAccessorCompositeKey().
- * @note If the hash table is (close to) full and resizable is true, this
- * routine might result in rebuilding the entire hash table.
- *
- * @param accessor A ValueAccessor which will be used to access keys.
- * beginIteration() should be called on accessor before calling this
- * method.
- * @param key_attr_id The attribute ID of the keys to be read from accessor.
- * @param check_for_null_keys If true, each key will be checked to see if it
- * is null before upserting it (null keys are skipped). This must be
- * set to true if some of the keys that will be read from accessor may
- * be null.
- * @param functor A pointer to a functor, which should provide a call
- * operator that takes two arguments: const ValueAccessor& (or better
- * yet, a templated call operator which takes a const reference to
- * some subclass of ValueAccessor as its first argument) and ValueT*.
- * The call operator will be invoked once for every tuple with a
- * non-null key in accessor.
- * @return True on success, false if upsert failed because there was not
- * enough space to insert new entries for all the keys in accessor
- * (note that some entries may still have been upserted, and
- * accessor's iteration will be left on the first tuple which could
- * not be inserted).
- **/
bool upsertValueAccessorFast(
- const std::vector<attribute_id> &argument_ids,
ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids,
const attribute_id key_attr_id,
const bool check_for_null_keys);
- /**
- * @brief Apply a functor to (multiple) entries in this hash table, with keys
- * drawn from a ValueAccessor. New values are first inserted if not
- * already present. Composite key version.
- *
- * @warning This method is only usable if allow_duplicate_keys is false.
- * @warning This method is threadsafe with regard to other calls to upsert(),
- * upsertCompositeKey(), upsertValueAccessor(), and
- * upsertValueAccessorCompositeKey(), but should not be used
- * simultaneously with put(), putCompositeKey(), putValueAccessor(),
- * or putValueAccessorCompositeKey().
- * @warning The ValueAccessor reference and ValueT* pointer passed to
- * functor's call operator are only guaranteed to be valid for the
- * duration of the call. The functor should not store a copy of
- * these pointers and assume that they remain valid.
- * @warning Although this method itself is threadsafe, the ValueT object
- * accessed by functor is not guaranteed to be (although it is
- * guaranteed that its initial insertion will be atomic). If it is
- * possible for multiple threads to call upsertValueAccessor() with
- * the same key at the same time, then their access to ValueT should
- * be made threadsafe (e.g. with the use of atomic types, mutexes,
- * or some other external synchronization).
- * @note This version is for composite keys, see also upsertValueAccessor().
- * @note If the hash table is (close to) full and resizable is true, this
- * routine might result in rebuilding the entire hash table.
- *
- * @param accessor A ValueAccessor which will be used to access keys.
- * beginIteration() should be called on accessor before calling this
- * method.
- * @param key_attr_ids The attribute IDs of each key component to be read
- * from accessor.
- * @param check_for_null_keys If true, each key will be checked to see if it
- * is null before upserting it (null keys are skipped). This must be
- * set to true if some of the keys that will be read from accessor may
- * be null.
- * @param functor A pointer to a functor, which should provide a call
- * operator that takes two arguments: const ValueAccessor& (or better
- * yet, a templated call operator which takes a const reference to
- * some subclass of ValueAccessor as its first argument) and ValueT*.
- * The call operator will be invoked once for every tuple with a
- * non-null key in accessor.
- * @return True on success, false if upsert failed because there was not
- * enough space to insert new entries for all the keys in accessor
- * (note that some entries may still have been upserted, and
- * accessor's iteration will be left on the first tuple which could
- * not be inserted).
- **/
bool upsertValueAccessorCompositeKeyFast(
- const std::vector<attribute_id> &argument,
ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<std::vector<attribute_id>> &argument_ids,
const std::vector<attribute_id> &key_attr_ids,
const bool check_for_null_keys) override;
@@ -1779,84 +1688,87 @@ bool FastHashTable<resizable,
force_key_copy,
allow_duplicate_keys>::
upsertValueAccessorFast(
- const std::vector<attribute_id> &argument_ids,
ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids,
const attribute_id key_attr_id,
const bool check_for_null_keys) {
- DEBUG_ASSERT(!allow_duplicate_keys);
- std::size_t variable_size;
- return InvokeOnAnyValueAccessor(
- accessor,
- [&](auto *accessor) -> bool { // NOLINT(build/c++11)
- if (resizable) {
- bool continuing = true;
- while (continuing) {
- {
- continuing = false;
- SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
- while (accessor->next()) {
- TypedValue key = accessor->getTypedValue(key_attr_id);
- if (check_for_null_keys && key.isNull()) {
- continue;
- }
- variable_size = (force_key_copy && !scalar_key_inline_)
- ? key.getDataSize()
- : 0;
- std::uint8_t *value =
- this->upsertInternalFast(key, variable_size, nullptr);
- if (value == nullptr) {
- continuing = true;
- break;
- } else {
- SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
- for (unsigned int k = 0; k < num_handles_; ++k) {
- if (argument_ids[k] != kInvalidAttributeID) {
- handles_[k]->updateStateUnary(
- accessor->getTypedValue(argument_ids[k]),
- value + payload_offsets_[k]);
- } else {
- handles_[k]->updateStateNullary(value +
- payload_offsets_[k]);
- }
- }
- }
- }
- }
- if (continuing) {
- this->resize(0, variable_size);
- accessor->previous();
- }
- }
- } else {
- while (accessor->next()) {
- TypedValue key = accessor->getTypedValue(key_attr_id);
- if (check_for_null_keys && key.isNull()) {
- continue;
- }
- variable_size =
- (force_key_copy && !scalar_key_inline_) ? key.getDataSize() : 0;
- std::uint8_t *value =
- this->upsertInternalFast(key, variable_size, nullptr);
- if (value == nullptr) {
- return false;
- } else {
- SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
- for (unsigned int k = 0; k < num_handles_; ++k) {
- if (argument_ids[k] != kInvalidAttributeID) {
- handles_[k]->updateStateUnary(
- accessor->getTypedValue(argument_ids[k]),
- value + payload_offsets_[k]);
- } else {
- handles_[k]->updateStateNullary(value +
- payload_offsets_[k]);
- }
- }
- }
- }
- }
-
- return true;
- });
+ LOG(FATAL) << "Not implemented";
+ return true;
+// DEBUG_ASSERT(!allow_duplicate_keys);
+// std::size_t variable_size;
+// return InvokeOnAnyValueAccessor(
+// accessor,
+// [&](auto *accessor) -> bool { // NOLINT(build/c++11)
+// if (resizable) {
+// bool continuing = true;
+// while (continuing) {
+// {
+// continuing = false;
+// SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
+// while (accessor->next()) {
+// TypedValue key = accessor->getTypedValue(key_attr_id);
+// if (check_for_null_keys && key.isNull()) {
+// continue;
+// }
+// variable_size = (force_key_copy && !scalar_key_inline_)
+// ? key.getDataSize()
+// : 0;
+// std::uint8_t *value =
+// this->upsertInternalFast(key, variable_size, nullptr);
+// if (value == nullptr) {
+// continuing = true;
+// break;
+// } else {
+// SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+// for (unsigned int k = 0; k < num_handles_; ++k) {
+// if (argument_ids[k] != kInvalidAttributeID) {
+// handles_[k]->updateStateUnary(
+// accessor->getTypedValue(argument_ids[k]),
+// value + payload_offsets_[k]);
+// } else {
+// handles_[k]->updateStateNullary(value +
+// payload_offsets_[k]);
+// }
+// }
+// }
+// }
+// }
+// if (continuing) {
+// this->resize(0, variable_size);
+// accessor->previous();
+// }
+// }
+// } else {
+// while (accessor->next()) {
+// TypedValue key = accessor->getTypedValue(key_attr_id);
+// if (check_for_null_keys && key.isNull()) {
+// continue;
+// }
+// variable_size =
+// (force_key_copy && !scalar_key_inline_) ? key.getDataSize() : 0;
+// std::uint8_t *value =
+// this->upsertInternalFast(key, variable_size, nullptr);
+// if (value == nullptr) {
+// return false;
+// } else {
+// SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+// for (unsigned int k = 0; k < num_handles_; ++k) {
+// if (argument_ids[k] != kInvalidAttributeID) {
+// handles_[k]->updateStateUnary(
+// accessor->getTypedValue(argument_ids[k]),
+// value + payload_offsets_[k]);
+// } else {
+// handles_[k]->updateStateNullary(value +
+// payload_offsets_[k]);
+// }
+// }
+// }
+// }
+// }
+//
+// return true;
+// });
}
template <bool resizable,
@@ -1868,90 +1780,70 @@ bool FastHashTable<resizable,
force_key_copy,
allow_duplicate_keys>::
upsertValueAccessorCompositeKeyFast(
- const std::vector<attribute_id> &argument_ids,
ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<std::vector<attribute_id>> &argument_ids,
const std::vector<attribute_id> &key_attr_ids,
const bool check_for_null_keys) {
+ DEBUG_ASSERT(resizable);
DEBUG_ASSERT(!allow_duplicate_keys);
+
std::size_t variable_size;
std::vector<TypedValue> key_vector;
key_vector.resize(key_attr_ids.size());
return InvokeOnAnyValueAccessor(
accessor,
[&](auto *accessor) -> bool { // NOLINT(build/c++11)
- if (resizable) {
- bool continuing = true;
- while (continuing) {
- {
- continuing = false;
- SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
- while (accessor->next()) {
- if (this->GetCompositeKeyFromValueAccessor(*accessor,
- key_attr_ids,
- check_for_null_keys,
- &key_vector)) {
- continue;
- }
- variable_size =
- this->calculateVariableLengthCompositeKeyCopySize(
- key_vector);
- std::uint8_t *value = this->upsertCompositeKeyInternalFast(
- key_vector, nullptr, variable_size);
- if (value == nullptr) {
- continuing = true;
- break;
- } else {
- SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
- for (unsigned int k = 0; k < num_handles_; ++k) {
- if (argument_ids[k] != kInvalidAttributeID) {
- handles_[k]->updateStateUnary(
- accessor->getTypedValue(argument_ids[k]),
- value + payload_offsets_[k]);
- } else {
- handles_[k]->updateStateNullary(value +
- payload_offsets_[k]);
- }
- }
- }
- }
- }
- if (continuing) {
- this->resize(0, variable_size);
- accessor->previous();
- }
+ bool continuing = true;
+ while (continuing) {
+ {
+ continuing = false;
+ SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
+ while (accessor->next()) {
+ aux_accessor->next();
+ // TODO(jianqiao): templatize to involve aux_accessor
+ if (this->GetCompositeKeyFromValueAccessor(*accessor,
+ key_attr_ids,
+ check_for_null_keys,
+ &key_vector)) {
+ continue;
}
- } else {
- while (accessor->next()) {
- if (this->GetCompositeKeyFromValueAccessor(*accessor,
- key_attr_ids,
- check_for_null_keys,
- &key_vector)) {
- continue;
- }
- variable_size =
- this->calculateVariableLengthCompositeKeyCopySize(key_vector);
- std::uint8_t *value = this->upsertCompositeKeyInternalFast(
- key_vector, nullptr, variable_size);
- if (value == nullptr) {
- return false;
- } else {
- SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
- for (unsigned int k = 0; k < num_handles_; ++k) {
- if (argument_ids[k] != kInvalidAttributeID) {
+ variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
+ std::uint8_t *value = this->upsertCompositeKeyInternalFast(
+ key_vector, nullptr, variable_size);
+ if (value == nullptr) {
+ continuing = true;
+ break;
+ } else {
+ SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+ for (unsigned int k = 0; k < num_handles_; ++k) {
+ const auto &ids = argument_ids[k];
+ if (ids.empty()) {
+ handles_[k]->updateStateNullary(value +
+ payload_offsets_[k]);
+ } else {
+ const attribute_id argument_id = ids.front();
+ if (argument_id >= 0) {
handles_[k]->updateStateUnary(
- accessor->getTypedValue(argument_ids[k]),
+ accessor->getTypedValue(argument_id),
value + payload_offsets_[k]);
} else {
- handles_[k]->updateStateNullary(value +
- payload_offsets_[k]);
+ handles_[k]->updateStateUnary(
+ aux_accessor->getTypedValue(-(argument_id+2)),
+ value + payload_offsets_[k]);
}
}
}
}
}
-
- return true;
- });
+ }
+ if (continuing) {
+ this->resize(0, variable_size);
+ accessor->previous();
+ }
+ }
+ return true;
+ });
}
template <bool resizable,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1baa015/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index a3180bb..e3b8dbd 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -23,11 +23,13 @@
#include <cstddef>
#include <vector>
-#include "ValueAccessor.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
+class ColumnVectorsValueAccessor;
+class ValueAccessor;
+
/** \addtogroup Storage
* @{
*/
@@ -92,8 +94,9 @@ class HashTableBase {
* specialization from this file.
**/
virtual bool upsertValueAccessorCompositeKeyFast(
- const std::vector<attribute_id> &argument,
ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<std::vector<attribute_id>> &argument_ids,
const std::vector<attribute_id> &key_attr_ids,
const bool check_for_null_keys) {
return false;