You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/01 03:52:13 UTC
[1/2] incubator-quickstep git commit: Updates for distinct
Repository: incubator-quickstep
Updated Branches:
refs/heads/collision-free-agg 34ea858db -> 963a60428
Updates for distinct
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3fc85b28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3fc85b28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3fc85b28
Branch: refs/heads/collision-free-agg
Commit: 3fc85b28909fe49635325ea70d52d234ae2c957d
Parents: 34ea858
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Tue Jan 31 21:05:13 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Tue Jan 31 21:05:13 2017 -0600
----------------------------------------------------------------------
.../aggregation/AggregationConcreteHandle.cpp | 44 ++++++++
.../aggregation/AggregationConcreteHandle.hpp | 108 ++++++++++++++++--
expressions/aggregation/AggregationHandle.hpp | 111 ++++++++++++++++++-
.../aggregation/AggregationHandleDistinct.cpp | 4 +-
.../aggregation/AggregationHandleSum.cpp | 17 +++
.../aggregation/AggregationHandleSum.hpp | 21 ++++
storage/AggregationOperationState.cpp | 13 +--
.../PackedPayloadAggregationStateHashTable.hpp | 94 +++++++++++++++-
8 files changed, 381 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index 3151a91..c3d133a 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -19,6 +19,50 @@
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
+#include <cstddef>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/HashTable.hpp"
+#include "storage/HashTableFactory.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+
namespace quickstep {
+class StorageManager;
+class Type;
+class ValueAccessor;
+
+AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashTable(
+ const HashTableImplType hash_table_impl,
+ const std::vector<const Type*> &key_types,
+ const std::size_t estimated_num_distinct_keys,
+ StorageManager *storage_manager) const {
+ // Create a hash table with key types as key_types and value type as bool.
+ return AggregationStateHashTableFactory::CreateResizable(
+ hash_table_impl,
+ key_types,
+ estimated_num_distinct_keys,
+ {},
+ storage_manager);
+}
+
+void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_ids,
+ AggregationStateHashTableBase *distinctify_hash_table) const {
+ // If the key-value pair is already there, we don't need to update the value,
+ // which should always be "true". I.e. the value is just a placeholder.
+// AggregationStateFastHashTable *hash_table =
+// static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
+// if (key_ids.size() == 1) {
+// hash_table->upsertValueAccessorFast(
+// key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
+// } else {
+// std::vector<attribute_id> empty_args {kInvalidAttributeID};
+// hash_table->upsertValueAccessorCompositeKeyFast(
+// empty_args, accessor, key_ids, true /* check_for_null_keys */);
+// }
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 93e9bd0..04be232 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -21,6 +21,7 @@
#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
#include <cstddef>
+#include <cstdint>
#include <utility>
#include <vector>
@@ -28,6 +29,7 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/aggregation/AggregationID.hpp"
#include "storage/HashTableBase.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
#include "threading/SpinMutex.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
@@ -101,10 +103,39 @@ class AggregationConcreteHandle : public AggregationHandle {
<< "takes at least one argument.";
}
+ AggregationStateHashTableBase* createDistinctifyHashTable(
+ const HashTableImplType hash_table_impl,
+ const std::vector<const Type *> &key_types,
+ const std::size_t estimated_num_distinct_keys,
+ StorageManager *storage_manager) const override;
+
+ void insertValueAccessorIntoDistinctifyHashTable(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_ids,
+ AggregationStateHashTableBase *distinctify_hash_table) const override;
+
+ void blockUpdate() override {
+ block_update_ = true;
+ }
+
+ void allowUpdate() override {
+ block_update_ = false;
+ }
+
protected:
AggregationConcreteHandle(const AggregationID agg_id)
: AggregationHandle(agg_id) {}
+ template <typename HandleT, typename StateT>
+ StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+ const AggregationStateHashTableBase &distinctify_hash_table) const;
+
+ template <typename HandleT, typename HashTableT>
+ void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+ const AggregationStateHashTableBase &distinctify_hash_table,
+ AggregationStateHashTableBase *hash_table,
+ std::size_t index) const;
+
template <typename HandleT, typename HashTableT>
ColumnVector* finalizeHashTableHelper(
const Type &result_type,
@@ -125,6 +156,8 @@ class AggregationConcreteHandle : public AggregationHandle {
group_state);
}
+ bool block_update_;
+
private:
DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle);
};
@@ -148,14 +181,7 @@ class HashTableAggregateFinalizer {
output_column_vector_(output_column_vector) {}
inline void operator()(const std::vector<TypedValue> &group_by_key,
- const AggregationState &group_state) {
- group_by_keys_->emplace_back(group_by_key);
- output_column_vector_->appendTypedValue(
- handle_.finalizeHashTableEntry(group_state));
- }
-
- inline void operator()(const std::vector<TypedValue> &group_by_key,
- const unsigned char *byte_ptr) {
+ const std::uint8_t *byte_ptr) {
group_by_keys_->emplace_back(group_by_key);
output_column_vector_->appendTypedValue(
handle_.finalizeHashTableEntry(byte_ptr));
@@ -172,6 +198,68 @@ class HashTableAggregateFinalizer {
// ----------------------------------------------------------------------------
// Implementations of templated methods follow:
+template <typename HandleT, typename StateT>
+StateT* AggregationConcreteHandle::
+ aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+ const AggregationStateHashTableBase &distinctify_hash_table) const {
+ const HandleT &handle = static_cast<const HandleT &>(*this);
+ StateT *state = static_cast<StateT *>(createInitialState());
+
+ // A lambda function which will be called on each key from the distinctify
+ // hash table.
+ const auto aggregate_functor = [&handle, &state](
+ const TypedValue &key, const std::uint8_t *dumb_placeholder) {
+ // For each (unary) key in the distinctify hash table, aggregate the key
+ // into "state".
+ handle.iterateUnaryInl(key, reinterpret_cast<std::uint8_t *>(state));
+ };
+
+ const auto &hash_table =
+ static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
+ distinctify_hash_table);
+ // Invoke the lambda function "aggregate_functor" on each key from the
+ // distinctify hash table.
+ hash_table.forEach(&aggregate_functor);
+
+ return state;
+}
+
+template <typename HandleT, typename HashTableT>
+void AggregationConcreteHandle::
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+ const AggregationStateHashTableBase &distinctify_hash_table,
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const {
+ const HandleT &handle = static_cast<const HandleT &>(*this);
+ HashTableT *target_hash_table =
+ static_cast<HashTableT *>(aggregation_hash_table);
+
+ // A lambda function which will be called on each key-value pair from the
+ // distinctify hash table.
+ const auto aggregate_functor = [&handle, &target_hash_table, &index](
+ std::vector<TypedValue> &key, const std::uint8_t *dumb_placeholder) {
+ // For each (composite) key vector in the distinctify hash table with size N.
+ // The first N-1 entries are GROUP BY columns and the last entry is the
+ // argument to be aggregated on.
+ const TypedValue argument(std::move(key.back()));
+ key.pop_back();
+
+ // An upserter as lambda function for aggregating the argument into its
+ // GROUP BY group's entry inside aggregation_hash_table.
+ const auto upserter = [&handle, &argument](std::uint8_t *state) {
+ handle.iterateUnaryInl(argument, state);
+ };
+
+ target_hash_table->upsertCompositeKey(key, &upserter, index);
+ };
+
+ const HashTableT &source_hash_table =
+ static_cast<const HashTableT &>(distinctify_hash_table);
+ // Invoke the lambda function "aggregate_functor" on each composite key vector
+ // from the distinctify hash table.
+ source_hash_table.forEachCompositeKey(&aggregate_functor);
+}
+
template <typename HandleT, typename HashTableT>
ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
const Type &result_type,
@@ -188,14 +276,14 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
new NativeColumnVector(result_type, hash_table_concrete.numEntries());
HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer(
handle, group_by_keys, result);
- hash_table_concrete.forEach(&finalizer, index);
+ hash_table_concrete.forEachCompositeKey(&finalizer, index);
return result;
} else {
IndirectColumnVector *result = new IndirectColumnVector(
result_type, hash_table_concrete.numEntries());
HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer(
handle, group_by_keys, result);
- hash_table_concrete.forEach(&finalizer, index);
+ hash_table_concrete.forEachCompositeKey(&finalizer, index);
return result;
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 8e2aea6..bc9c27f 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -209,6 +209,97 @@ class AggregationHandle {
int index) const = 0;
/**
+ * @brief Create a new HashTable for the distinctify step for DISTINCT
+ * aggregation.
+ *
+ * Distinctify is the first step for DISTINCT aggregation. This step inserts
+ * the GROUP BY expression values and aggregation arguments together as keys
+ * into the distinctify hash table, so that arguments are distinctified within
+ * each GROUP BY group. Later, a second-round aggregation on the distinctify
+ * hash table will be performed to actually compute the aggregated result for
+ * each GROUP BY group.
+ *
+ * In the case of single aggregation where there is no GROUP BY expressions,
+ * we simply treat it as a special GROUP BY case that the GROUP BY expression
+ * vector is empty.
+ *
+ * @param hash_table_impl The choice of which concrete HashTable
+ * implementation to use.
+ * @param key_types The types of the GROUP BY expressions together with the
+ * types of the aggregation arguments.
+ * @param estimated_num_distinct_keys The estimated number of distinct keys
+ * (i.e. GROUP BY expressions together with aggregation arguments) for
+ * the distinctify step. This is used to size the initial HashTable.
+ * This is an estimate only, and the HashTable will be resized if it
+ * becomes over-full.
+ * @param storage_manager The StorageManager to use to create the HashTable.
+ * A StorageBlob will be allocated to serve as the HashTable's
+ * in-memory storage.
+ *
+ * @return A new HashTable instance with the appropriate state type for this
+ * aggregate.
+ */
+ virtual AggregationStateHashTableBase* createDistinctifyHashTable(
+ const HashTableImplType hash_table_impl,
+ const std::vector<const Type *> &key_types,
+ const std::size_t estimated_num_distinct_keys,
+ StorageManager *storage_manager) const = 0;
+
+ /**
+ * @brief Inserts the GROUP BY expressions and aggregation arguments together
+ * as keys into the distinctify hash table.
+ *
+ * @param accessor The ValueAccessor that will be iterated over to read
+ * tuples.
+ * @param key_ids The attribute_ids of the GROUP BY expressions in accessor
+ * together with the attribute_ids of the arguments to this aggregate
+ * in accessor, in order.
+ * @param distinctify_hash_table The HashTable to store the GROUP BY
+ * expressions and the aggregation arguments together as hash table
+ * keys and a bool constant \c true as hash table value (So the hash
+ * table actually serves as a hash set). This should have been created
+ * by calling createDistinctifyHashTable();
+ */
+ virtual void insertValueAccessorIntoDistinctifyHashTable(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_ids,
+ AggregationStateHashTableBase *distinctify_hash_table) const = 0;
+
+ /**
+ * @brief Perform single (i.e. without GROUP BY) aggregation on the keys from
+ * the distinctify hash table to actually compute the aggregated results.
+ *
+ * @param distinctify_hash_table Hash table which stores the distinctified
+ * aggregation arguments as hash table keys. This should have been
+ * created by calling createDistinctifyHashTable();
+ * @return A new AggregationState which contains the aggregated results from
+ * applying the aggregate to the distinctify hash table.
+ * Caller is responsible for deleting the returned AggregationState.
+ */
+ virtual AggregationState* aggregateOnDistinctifyHashTableForSingle(
+ const AggregationStateHashTableBase &distinctify_hash_table) const {
+ return nullptr;
+ }
+
+ /**
+ * @brief Perform GROUP BY aggregation on the keys from the distinctify hash
+ * table and upserts states into the aggregation hash table.
+ *
+ * @param distinctify_hash_table Hash table which stores the GROUP BY
+ * expression values and aggregation arguments together as hash table
+ * keys.
+ * @param aggregation_hash_table The HashTable to upsert AggregationStates in.
+ * This should have been created by calling createGroupByHashTable() on
+ * this same AggregationHandle.
+ * @param index The index of the distinctify hash table for which we perform
+ * the DISTINCT aggregation.
+ */
+ virtual void aggregateOnDistinctifyHashTableForGroupBy(
+ const AggregationStateHashTableBase &distinctify_hash_table,
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const {}
+
+ /**
* @brief Get the number of bytes needed to store the aggregation handle's
* state.
**/
@@ -236,7 +327,7 @@ class AggregationHandle {
* @param byte_ptr The pointer where the aggregation state is stored.
**/
virtual void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const {}
+ std::uint8_t *byte_ptr) const = 0;
/**
* @brief Merge two aggregation states for this aggregation handle.
@@ -248,7 +339,7 @@ class AggregationHandle {
* @param dst A pointer to the destination aggregation state.
**/
virtual void mergeStates(const std::uint8_t *src,
- std::uint8_t *dst) const {}
+ std::uint8_t *dst) const = 0;
/**
* @brief Initialize the payload (in the aggregation hash table) for the given
@@ -256,7 +347,7 @@ class AggregationHandle {
*
* @param byte_ptr The pointer to the aggregation state in the hash table.
**/
- virtual void initPayload(std::uint8_t *byte_ptr) const {}
+ virtual void initPayload(std::uint8_t *byte_ptr) const = 0;
/**
* @brief Destroy the payload (in the aggregation hash table) for the given
@@ -264,7 +355,19 @@ class AggregationHandle {
*
* @param byte_ptr The pointer to the aggregation state in the hash table.
**/
- virtual void destroyPayload(std::uint8_t *byte_ptr) const {}
+ virtual void destroyPayload(std::uint8_t *byte_ptr) const {};
+
+ /**
+ * @brief Inform the aggregation handle to block (prohibit) updates on the
+ * aggregation state.
+ **/
+ virtual void blockUpdate() = 0;
+
+ /**
+ * @brief Inform the aggregation handle to allow updates on the
+ * aggregation state.
+ **/
+ virtual void allowUpdate() = 0;
protected:
AggregationHandle(const AggregationID agg_id)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
index c6c47c7..1886335 100644
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ b/expressions/aggregation/AggregationHandleDistinct.cpp
@@ -41,11 +41,11 @@ ColumnVector* AggregationHandleDistinct::finalizeHashTable(
DCHECK(group_by_keys->empty());
const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
- const bool &dumb_placeholder) -> void {
+ const std::uint8_t *dumb_placeholder) -> void {
group_by_keys->emplace_back(std::move(group_by_key));
};
static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
- hash_table).forEach(&keys_retriever);
+ hash_table).forEachCompositeKey(&keys_retriever);
return nullptr;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 00b229e..29a986f 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -150,4 +150,21 @@ ColumnVector* AggregationHandleSum::finalizeHashTable(
*result_type_, hash_table, group_by_keys, index);
}
+AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
+ const AggregationStateHashTableBase &distinctify_hash_table) const {
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
+ AggregationHandleSum,
+ AggregationStateSum>(distinctify_hash_table);
+}
+
+void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
+ const AggregationStateHashTableBase &distinctify_hash_table,
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+ AggregationHandleSum,
+ PackedPayloadSeparateChainingAggregationStateHashTable>(
+ distinctify_hash_table, aggregation_hash_table, index);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index 9fb7706..cdcec4b 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -123,6 +123,18 @@ class AggregationHandleSum : public AggregationConcreteHandle {
state->null_ = false;
}
+ inline void iterateUnaryInl(const TypedValue &value,
+ std::uint8_t *byte_ptr) const {
+ DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
+ if (value.isNull()) return;
+ TypedValue *sum_ptr =
+ reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+ bool *null_ptr =
+ reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
+ *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value);
+ *null_ptr = false;
+ }
+
AggregationState* accumulate(
ValueAccessor *accessor,
ColumnVectorsValueAccessor *aux_accessor,
@@ -178,6 +190,15 @@ class AggregationHandleSum : public AggregationConcreteHandle {
std::vector<std::vector<TypedValue>> *group_by_keys,
int index) const override;
+ AggregationState* aggregateOnDistinctifyHashTableForSingle(
+ const AggregationStateHashTableBase &distinctify_hash_table)
+ const override;
+
+ void aggregateOnDistinctifyHashTableForGroupBy(
+ const AggregationStateHashTableBase &distinctify_hash_table,
+ AggregationStateHashTableBase *aggregation_hash_table,
+ std::size_t index) const override;
+
private:
friend class AggregateFunctionSum;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 5de2653..d04af81 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -123,12 +123,10 @@ AggregationOperationState::AggregationOperationState(
// on the group-by expressions.
DCHECK_GT(group_by_key_ids_.size(), 0u);
- handles_.emplace_back(new AggregationHandleDistinct());
- is_distinct_.emplace_back(false);
group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
hash_table_impl_type,
group_by_types_,
- {handles_.front().get()},
+ {},
storage_manager));
} else {
std::vector<AggregationHandle *> group_by_handles;
@@ -471,9 +469,9 @@ void AggregationOperationState::mergeSingleState(
void AggregationOperationState::mergeGroupByHashTables(
AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) const {
- HashTableMergerFast merger(dst);
+ HashTableMerger merger(dst);
static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src)
- ->forEach(&merger);
+ ->forEachCompositeKey(&merger);
}
void AggregationOperationState::aggregateBlockHashTable(
@@ -662,11 +660,6 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree(
complete_result.addColumn(key_cv.release());
for (std::size_t i = 0; i < handles_.size(); ++i) {
- if (handles_[i]->getAggregationID() == AggregationID::kDistinct) {
- DCHECK_EQ(1u, handles_.size());
- break;
- }
-
const Type *result_type = handles_[i]->getResultType();
DCHECK(NativeColumnVector::UsableForType(*result_type));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/storage/PackedPayloadAggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadAggregationStateHashTable.hpp b/storage/PackedPayloadAggregationStateHashTable.hpp
index 70152e7..8364bb4 100644
--- a/storage/PackedPayloadAggregationStateHashTable.hpp
+++ b/storage/PackedPayloadAggregationStateHashTable.hpp
@@ -89,6 +89,11 @@ class PackedPayloadSeparateChainingAggregationStateHashTable
inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
const std::uint8_t *source_state);
+ template <typename FunctorT>
+ inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
+ FunctorT *functor,
+ int index);
+
inline const std::uint8_t* getSingleCompositeKey(
const std::vector<TypedValue> &key) const;
@@ -102,6 +107,13 @@ class PackedPayloadSeparateChainingAggregationStateHashTable
template <typename FunctorT>
inline std::size_t forEach(FunctorT *functor, const int index) const;
+ template <typename FunctorT>
+ inline std::size_t forEachCompositeKey(FunctorT *functor) const;
+
+ template <typename FunctorT>
+ inline std::size_t forEachCompositeKey(FunctorT *functor,
+ const int index) const;
+
private:
void resize(const std::size_t extra_buckets,
const std::size_t extra_variable_storage,
@@ -118,6 +130,10 @@ class PackedPayloadSeparateChainingAggregationStateHashTable
return total;
}
+ inline bool getNextEntry(TypedValue *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const;
+
inline bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
const std::uint8_t **value,
std::size_t *entry_num) const;
@@ -301,7 +317,7 @@ class PackedPayloadSeparateChainingAggregationStateHashTable
// ----------------------------------------------------------------------------
// Implementations of template class methods follow.
-class HashTableMergerFast {
+class HashTableMerger {
public:
/**
* @brief Constructor
@@ -310,7 +326,7 @@ class HashTableMergerFast {
* @param destination_hash_table The destination hash table to which other
* hash tables will be merged.
**/
- explicit HashTableMergerFast(
+ explicit HashTableMerger(
AggregationStateHashTableBase *destination_hash_table)
: destination_hash_table_(
static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
@@ -331,7 +347,7 @@ class HashTableMergerFast {
private:
PackedPayloadSeparateChainingAggregationStateHashTable *destination_hash_table_;
- DISALLOW_COPY_AND_ASSIGN(HashTableMergerFast);
+ DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
};
inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
@@ -348,6 +364,23 @@ inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
}
inline bool PackedPayloadSeparateChainingAggregationStateHashTable
+ ::getNextEntry(TypedValue *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const {
+ if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
+ *key = key_manager_.getKeyComponentTyped(bucket, 0);
+ *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ ++(*entry_num);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+
+inline bool PackedPayloadSeparateChainingAggregationStateHashTable
::getNextEntryCompositeKey(std::vector<TypedValue> *key,
const std::uint8_t **value,
std::size_t *entry_num) const {
@@ -513,6 +546,28 @@ inline bool PackedPayloadSeparateChainingAggregationStateHashTable
}
}
+template <typename FunctorT>
+inline bool PackedPayloadSeparateChainingAggregationStateHashTable
+ ::upsertCompositeKey(const std::vector<TypedValue> &key,
+ FunctorT *functor,
+ int index) {
+ const std::size_t variable_size =
+ calculateVariableLengthCompositeKeyCopySize(key);
+ for (;;) {
+ {
+ SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
+ std::uint8_t *value =
+ upsertCompositeKeyInternal(key, variable_size);
+ if (value != nullptr) {
+ (*functor)(value + payload_offsets_[index]);
+ return true;
+ }
+ }
+ resize(0, variable_size);
+ }
+}
+
+
inline std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable
::upsertCompositeKeyInternal(const std::vector<TypedValue> &key,
const std::size_t variable_key_size) {
@@ -690,6 +745,35 @@ inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
::forEach(FunctorT *functor) const {
std::size_t entries_visited = 0;
std::size_t entry_num = 0;
+ TypedValue key;
+ const std::uint8_t *value_ptr;
+ while (getNextEntry(&key, &value_ptr, &entry_num)) {
+ ++entries_visited;
+ (*functor)(key, value_ptr);
+ }
+ return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
+ ::forEach(FunctorT *functor, const int index) const {
+ std::size_t entries_visited = 0;
+ std::size_t entry_num = 0;
+ TypedValue key;
+ const std::uint8_t *value_ptr;
+ while (getNextEntry(&key, &value_ptr, &entry_num)) {
+ ++entries_visited;
+ (*functor)(key, value_ptr + payload_offsets_[index]);
+ key.clear();
+ }
+ return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
+ ::forEachCompositeKey(FunctorT *functor) const {
+ std::size_t entries_visited = 0;
+ std::size_t entry_num = 0;
std::vector<TypedValue> key;
const std::uint8_t *value_ptr;
while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
@@ -702,7 +786,8 @@ inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
template <typename FunctorT>
inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
- ::forEach(FunctorT *functor, const int index) const {
+ ::forEachCompositeKey(FunctorT *functor,
+ const int index) const {
std::size_t entries_visited = 0;
std::size_t entry_num = 0;
std::vector<TypedValue> key;
@@ -715,7 +800,6 @@ inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
return entries_visited;
}
-
} // namespace quickstep
#endif // QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_
[2/2] incubator-quickstep git commit: updates
Posted by ji...@apache.org.
updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/963a6042
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/963a6042
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/963a6042
Branch: refs/heads/collision-free-agg
Commit: 963a604288e158b953f99fac16ff03e0015d9860
Parents: 3fc85b2
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Tue Jan 31 21:52:01 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Tue Jan 31 21:52:01 2017 -0600
----------------------------------------------------------------------
.../aggregation/AggregationHandleDistinct.cpp | 53 ----------
.../aggregation/AggregationHandleDistinct.hpp | 106 -------------------
expressions/aggregation/CMakeLists.txt | 13 ---
query_optimizer/ExecutionGenerator.cpp | 5 +
storage/AggregationOperationState.cpp | 42 ++++++--
storage/CMakeLists.txt | 1 -
6 files changed, 38 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
deleted file mode 100644
index 1886335..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
-
-#include "types/TypedValue.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class ColumnVector;
-
-ColumnVector* AggregationHandleDistinct::finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const {
- DCHECK(group_by_keys->empty());
-
- const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
- const std::uint8_t *dumb_placeholder) -> void {
- group_by_keys->emplace_back(std::move(group_by_key));
- };
- static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
- hash_table).forEachCompositeKey(&keys_retriever);
-
- return nullptr;
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
deleted file mode 100644
index 0d8905b..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
-#include "storage/HashTableBase.hpp"
-#include "types/TypedValue.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class AggregationState;
-class ColumnVector;
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-/** \addtogroup Expressions
- * @{
- */
-
-class AggregationHandleDistinct : public AggregationConcreteHandle {
- public:
- /**
- * @brief Constructor.
- **/
- AggregationHandleDistinct()
- : AggregationConcreteHandle(AggregationID::kDistinct) {}
-
- std::vector<const Type *> getArgumentTypes() const override {
- return {};
- }
-
- const Type* getResultType() const override {
- LOG(FATAL)
- << "AggregationHandleDistinct does not support getResultType().";
- }
-
- AggregationState* createInitialState() const override {
- LOG(FATAL)
- << "AggregationHandleDistinct does not support createInitialState().";
- }
-
- AggregationState* accumulateNullary(
- const std::size_t num_tuples) const override {
- LOG(FATAL)
- << "AggregationHandleDistinct does not support accumulateNullary().";
- }
-
- AggregationState* accumulate(
- ValueAccessor *accessor,
- ColumnVectorsValueAccessor *aux_accessor,
- const std::vector<attribute_id> &argument_ids) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support "
- "accumulate().";
- }
-
- void mergeStates(const AggregationState &source,
- AggregationState *destination) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support mergeStates().";
- }
-
- TypedValue finalize(const AggregationState &state) const override {
- LOG(FATAL) << "AggregationHandleDistinct does not support finalize().";
- }
-
- ColumnVector* finalizeHashTable(
- const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys,
- int index) const override;
-
- private:
- DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index bd239d4..432da09 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -55,9 +55,6 @@ add_library(quickstep_expressions_aggregation_AggregationHandleAvg
add_library(quickstep_expressions_aggregation_AggregationHandleCount
AggregationHandleCount.cpp
AggregationHandleCount.hpp)
-add_library(quickstep_expressions_aggregation_AggregationHandleDistinct
- AggregationHandleDistinct.cpp
- AggregationHandleDistinct.hpp)
add_library(quickstep_expressions_aggregation_AggregationHandleMax
AggregationHandleMax.cpp
AggregationHandleMax.hpp)
@@ -193,15 +190,6 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
quickstep_types_containers_ColumnVector
quickstep_types_containers_ColumnVectorUtil
quickstep_utility_Macros)
-target_link_libraries(quickstep_expressions_aggregation_AggregationHandleDistinct
- glog
- quickstep_catalog_CatalogTypedefs
- quickstep_expressions_aggregation_AggregationConcreteHandle
- quickstep_expressions_aggregation_AggregationID
- quickstep_storage_HashTableBase
- quickstep_storage_PackedPayloadAggregationStateHashTable
- quickstep_types_TypedValue
- quickstep_utility_Macros)
target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
glog
quickstep_catalog_CatalogTypedefs
@@ -267,7 +255,6 @@ target_link_libraries(quickstep_expressions_aggregation
quickstep_expressions_aggregation_AggregationHandle
quickstep_expressions_aggregation_AggregationHandleAvg
quickstep_expressions_aggregation_AggregationHandleCount
- quickstep_expressions_aggregation_AggregationHandleDistinct
quickstep_expressions_aggregation_AggregationHandleMax
quickstep_expressions_aggregation_AggregationHandleMin
quickstep_expressions_aggregation_AggregationHandleSum
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 6694001..d32505b 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -429,6 +429,11 @@ bool ExecutionGenerator::canUseCollisionFreeAggregation(
for (const auto &agg_expr : aggregate->aggregate_expressions()) {
const E::AggregateFunctionPtr agg_func =
std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+ if (agg_func->is_distinct()) {
+ return false;
+ }
+
switch (agg_func->getAggregate().getAggregationID()) {
case AggregationID::kCount: // Fall through
case AggregationID::kSum:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index d04af81..1bc5832 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -34,7 +34,6 @@
#include "expressions/aggregation/AggregateFunction.hpp"
#include "expressions/aggregation/AggregateFunctionFactory.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
@@ -676,14 +675,25 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree(
void AggregationOperationState::finalizeHashTableImplPartitioned(
const std::size_t partition_id,
InsertDestination *output_destination) {
+ PackedPayloadSeparateChainingAggregationStateHashTable *hash_table =
+ static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
+ partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
+
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
+ if (handles_.empty()) {
+ const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
+ const std::uint8_t *dumb_placeholder) -> void {
+ group_by_keys.emplace_back(std::move(group_by_key));
+ };
+
+ hash_table->forEachCompositeKey(&keys_retriever);
+ }
+
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
- AggregationStateHashTableBase *hash_table =
- partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
*hash_table, &group_by_keys, agg_idx);
@@ -737,10 +747,6 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
void AggregationOperationState::finalizeHashTableImplThreadPrivate(
InsertDestination *output_destination) {
- // Each element of 'group_by_keys' is a vector of values for a particular
- // group (which is also the prefix of the finalized Tuple for that group).
- std::vector<std::vector<TypedValue>> group_by_keys;
-
// TODO(harshad) - The merge phase may be slower when each hash table contains
// large number of entries. We should find ways in which we can perform a
// parallel merge.
@@ -754,15 +760,33 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
return;
}
- std::unique_ptr<AggregationStateHashTableBase> final_hash_table(
+ std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr(
hash_tables->back().release());
for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
std::unique_ptr<AggregationStateHashTableBase> hash_table(
hash_tables->at(i).release());
- mergeGroupByHashTables(hash_table.get(), final_hash_table.get());
+ mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
hash_table->destroyPayload();
}
+ PackedPayloadSeparateChainingAggregationStateHashTable *final_hash_table =
+ static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
+ final_hash_table_ptr.get());
+
+ // Each element of 'group_by_keys' is a vector of values for a particular
+ // group (which is also the prefix of the finalized Tuple for that group).
+ std::vector<std::vector<TypedValue>> group_by_keys;
+
+ if (handles_.empty()) {
+ const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
+ const std::uint8_t *dumb_placeholder) -> void {
+ group_by_keys.emplace_back(std::move(group_by_key));
+ };
+
+ final_hash_table->forEachCompositeKey(&keys_retriever);
+ }
+
+
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index c7bc28f..4ff612e 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -274,7 +274,6 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_expressions_aggregation_AggregateFunction
quickstep_expressions_aggregation_AggregateFunctionFactory
quickstep_expressions_aggregation_AggregationHandle
- quickstep_expressions_aggregation_AggregationHandleDistinct
quickstep_expressions_aggregation_AggregationID
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar