You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:53 UTC
[29/30] incubator-quickstep git commit: Initial commit for
QUICKSTEP-28 and QUICKSTEP-29. Code refactoring and cleanup,
some more optimizations are pending.
Initial commit for QUICKSTEP-28 and QUICKSTEP-29. Code refactoring and cleanup, some more optimizations are pending.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/39485d62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/39485d62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/39485d62
Branch: refs/heads/quickstep-28-29
Commit: 39485d620402273890e92072e05f856fd9da7f9d
Parents: ccea2ff
Author: rathijit <ra...@node-2.hashtable.quickstep-pg0.wisc.cloudlab.us>
Authored: Mon Jul 4 02:44:48 2016 -0500
Committer: rathijit <ra...@node-2.aggregation.quickstep-pg0.wisc.cloudlab.us>
Committed: Fri Aug 5 14:23:35 2016 -0500
----------------------------------------------------------------------
.../aggregation/AggregationConcreteHandle.cpp | 29 +-
.../aggregation/AggregationConcreteHandle.hpp | 223 ++
expressions/aggregation/AggregationHandle.hpp | 8 +-
.../aggregation/AggregationHandleAvg.cpp | 40 +-
.../aggregation/AggregationHandleAvg.hpp | 62 +-
.../aggregation/AggregationHandleCount.cpp | 38 +-
.../aggregation/AggregationHandleCount.hpp | 50 +-
.../aggregation/AggregationHandleDistinct.cpp | 2 +-
.../aggregation/AggregationHandleDistinct.hpp | 2 +-
.../aggregation/AggregationHandleMax.cpp | 29 +-
.../aggregation/AggregationHandleMax.hpp | 39 +-
.../aggregation/AggregationHandleMin.cpp | 30 +-
.../aggregation/AggregationHandleMin.hpp | 44 +-
.../aggregation/AggregationHandleSum.cpp | 31 +-
.../aggregation/AggregationHandleSum.hpp | 52 +-
expressions/aggregation/CMakeLists.txt | 7 +
storage/AggregationOperationState.cpp | 95 +-
storage/AggregationOperationState.hpp | 7 +-
storage/CMakeLists.txt | 58 +
storage/FastHashTable.hpp | 2640 ++++++++++++++++++
storage/FastHashTableFactory.hpp | 300 ++
storage/FastSeparateChainingHashTable.hpp | 1761 ++++++++++++
storage/HashTableBase.hpp | 2 +-
storage/HashTablePool.hpp | 42 +
storage/StorageBlock.cpp | 88 +-
storage/StorageBlock.hpp | 8 +
threading/SpinMutex.hpp | 2 +
27 files changed, 5587 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index 05ca58d..d808302 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -22,6 +22,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "storage/HashTable.hpp"
+#include "storage/FastHashTable.hpp"
#include "storage/HashTableFactory.hpp"
namespace quickstep {
@@ -49,22 +50,24 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
AggregationStateHashTableBase *distinctify_hash_table) const {
// If the key-value pair is already there, we don't need to update the value,
// which should always be "true". I.e. the value is just a placeholder.
- const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {};
+// const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {};
- AggregationStateHashTable<bool> *hash_table =
- static_cast<AggregationStateHashTable<bool>*>(distinctify_hash_table);
+ AggregationStateFastHashTable *hash_table =
+ static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
if (key_ids.size() == 1) {
- hash_table->upsertValueAccessor(accessor,
- key_ids[0],
- true /* check_for_null_keys */,
- true /* initial_value */,
- &noop_upserter);
+// TODO(rathijit): fix
+// hash_table->upsertValueAccessor(accessor,
+// key_ids[0],
+// true /* check_for_null_keys */,
+// true /* initial_value */,
+// &noop_upserter);
} else {
- hash_table->upsertValueAccessorCompositeKey(accessor,
- key_ids,
- true /* check_for_null_keys */,
- true /* initial_value */,
- &noop_upserter);
+ std::vector<std::vector<attribute_id>> empty_args;
+ empty_args.resize(1);
+ hash_table->upsertValueAccessorCompositeKeyFast(empty_args,
+ accessor,
+ key_ids,
+ true /* check_for_null_keys */);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 0267e17..c5d7b3c 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -31,6 +31,7 @@
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "utility/Macros.hpp"
+#include "threading/SpinMutex.hpp"
#include "glog/logging.h"
@@ -79,6 +80,37 @@ class HashTableStateUpserter {
DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter);
};
+template <typename HandleT>
+class HashTableStateUpserterFast {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param handle The aggregation handle being used.
+ * @param source_state The aggregation state in the source aggregation hash
+ * table. The corresponding state (for the same key) in the destination
+ * hash table will be upserted.
+ **/
+ HashTableStateUpserterFast(const HandleT &handle, const uint8_t *source_state)
+ : handle_(handle), source_state_(source_state) {}
+
+ /**
+ * @brief The operator for the functor required for the upsert.
+ *
+ * @param destination_state The aggregation state in the aggregation hash
+ * table that is being upserted.
+ **/
+ void operator()(uint8_t *destination_state) {
+ handle_.mergeStatesFast(source_state_, destination_state);
+ }
+
+ private:
+ const HandleT &handle_;
+ const uint8_t *source_state_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast);
+};
+
/**
* @brief A class to support the functor for merging group by hash tables.
**/
@@ -129,6 +161,53 @@ class HashTableMerger {
DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
};
+template <typename HandleT, typename HashTableT>
+class HashTableMergerFast {
+ public:
+ /**
+ * @brief Constructor
+ *
+ * @param handle The Aggregation handle being used.
+ * @param destination_hash_table The destination hash table to which other
+ * hash tables will be merged.
+ **/
+ HashTableMergerFast(const HandleT &handle,
+ AggregationStateHashTableBase *destination_hash_table)
+ : handle_(handle),
+ destination_hash_table_(
+ static_cast<HashTableT *>(destination_hash_table)) {}
+
+ /**
+ * @brief The operator for the functor.
+ *
+ * @param group_by_key The group by key being merged.
+ * @param source_state The aggregation state for the given key in the source
+ * aggregation hash table.
+ **/
+ inline void operator()(const std::vector<TypedValue> &group_by_key,
+ const uint8_t *source_state) {
+ const uint8_t *original_state =
+ destination_hash_table_->getSingleCompositeKey(group_by_key);
+ if (original_state != nullptr) {
+ HashTableStateUpserterFast<HandleT> upserter(
+ handle_, source_state);
+ // The CHECK is required as upsertCompositeKey can return false if the
+ // hash table runs out of space during the upsert process. The ideal
+ // solution will be to retry again if the upsert fails.
+ CHECK(destination_hash_table_->upsertCompositeKeyFast(
+ group_by_key, original_state, &upserter));
+ } else {
+ destination_hash_table_->putCompositeKeyFast(group_by_key, source_state);
+ }
+ }
+
+ private:
+ const HandleT &handle_;
+ HashTableT *destination_hash_table_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashTableMergerFast);
+};
+
/**
* @brief The helper intermediate subclass of AggregationHandle that provides
* virtual method implementations as well as helper methods that are
@@ -208,11 +287,26 @@ class AggregationConcreteHandle : public AggregationHandle {
template <typename HandleT,
typename HashTableT>
+ void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+ const AggregationStateHashTableBase &distinctify_hash_table,
+ AggregationStateHashTableBase *hash_table) const;
+
+
+ template <typename HandleT,
+ typename HashTableT>
ColumnVector* finalizeHashTableHelper(
const Type &result_type,
const AggregationStateHashTableBase &hash_table,
std::vector<std::vector<TypedValue>> *group_by_keys) const;
+ template <typename HandleT,
+ typename HashTableT>
+ ColumnVector* finalizeHashTableHelperFast(
+ const Type &result_type,
+ const AggregationStateHashTableBase &hash_table,
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const;
+
template <typename HandleT, typename HashTableT>
inline TypedValue finalizeGroupInHashTable(
const AggregationStateHashTableBase &hash_table,
@@ -224,11 +318,29 @@ class AggregationConcreteHandle : public AggregationHandle {
return static_cast<const HandleT*>(this)->finalizeHashTableEntry(*group_state);
}
+ template <typename HandleT, typename HashTableT>
+ inline TypedValue finalizeGroupInHashTableFast(
+ const AggregationStateHashTableBase &hash_table,
+ const std::vector<TypedValue> &group_key,
+ int index) const {
+ const std::uint8_t *group_state
+ = static_cast<const HashTableT&>(hash_table).getSingleCompositeKey(group_key, index);
+ DCHECK(group_state != nullptr)
+ << "Could not find entry for specified group_key in HashTable";
+ return static_cast<const HandleT*>(this)->finalizeHashTableEntryFast(group_state);
+ }
+
template <typename HandleT, typename StateT, typename HashTableT>
void mergeGroupByHashTablesHelper(
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const;
+ template <typename HandleT, typename HashTableT>
+ void mergeGroupByHashTablesHelperFast(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const;
+
+
private:
DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle);
};
@@ -302,6 +414,12 @@ class HashTableAggregateFinalizer {
output_column_vector_->appendTypedValue(handle_.finalizeHashTableEntry(group_state));
}
+ inline void operator()(const std::vector<TypedValue> &group_by_key,
+ const unsigned char *byte_ptr) {
+ group_by_keys_->emplace_back(group_by_key);
+ output_column_vector_->appendTypedValue(handle_.finalizeHashTableEntryFast(byte_ptr));
+ }
+
private:
const HandleT &handle_;
std::vector<std::vector<TypedValue>> *group_by_keys_;
@@ -414,6 +532,42 @@ void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHe
template <typename HandleT,
typename HashTableT>
+void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+ const AggregationStateHashTableBase &distinctify_hash_table,
+ AggregationStateHashTableBase *aggregation_hash_table) const {
+ const HandleT& handle = static_cast<const HandleT&>(*this);
+ HashTableT *target_hash_table = static_cast<HashTableT*>(aggregation_hash_table);
+
+ // A lambda function which will be called on each key-value pair from the
+ // distinctify hash table.
+ const auto aggregate_functor = [&handle, &target_hash_table](
+ std::vector<TypedValue> &key,
+ const bool &dumb_placeholder) {
+ // For each (composite) key vector in the distinctify hash table with size N.
+ // The first N-1 entries are GROUP BY columns and the last entry is the argument
+ // to be aggregated on.
+ const TypedValue argument(std::move(key.back()));
+ key.pop_back();
+
+ // An upserter as lambda function for aggregating the argument into its
+ // GROUP BY group's entry inside aggregation_hash_table.
+ const auto upserter = [&handle, &argument](std::uint8_t *state) {
+ handle.iterateUnaryInlFast(argument, state+sizeof(SpinMutex));
+ };
+
+ target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter);
+ };
+
+ const HashTableT &source_hash_table =
+ static_cast<const HashTableT&>(distinctify_hash_table);
+ // Invoke the lambda function "aggregate_functor" on each composite key vector
+ // from the distinctify hash table.
+ source_hash_table.forEachCompositeKeyFast(&aggregate_functor);
+}
+
+
+template <typename HandleT,
+ typename HashTableT>
ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
const Type &result_type,
const AggregationStateHashTableBase &hash_table,
@@ -463,6 +617,59 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
}
template <typename HandleT,
+ typename HashTableT>
+ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
+ const Type &result_type,
+ const AggregationStateHashTableBase &hash_table,
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ const HandleT &handle = static_cast<const HandleT&>(*this);
+ const HashTableT &hash_table_concrete = static_cast<const HashTableT&>(hash_table);
+
+ if (group_by_keys->empty()) {
+ if (NativeColumnVector::UsableForType(result_type)) {
+ NativeColumnVector *result = new NativeColumnVector(result_type,
+ hash_table_concrete.numEntries());
+ HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer(
+ handle,
+ group_by_keys,
+ result);
+ hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
+ return result;
+ } else {
+ IndirectColumnVector *result = new IndirectColumnVector(result_type,
+ hash_table_concrete.numEntries());
+ HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer(
+ handle,
+ group_by_keys,
+ result);
+ hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
+ return result;
+ }
+ } else {
+ if (NativeColumnVector::UsableForType(result_type)) {
+ NativeColumnVector *result = new NativeColumnVector(result_type,
+ group_by_keys->size());
+ for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
+ result->appendTypedValue(finalizeGroupInHashTableFast<HandleT, HashTableT>(hash_table,
+ group_by_key,
+ index));
+ }
+ return result;
+ } else {
+ IndirectColumnVector *result = new IndirectColumnVector(result_type,
+ hash_table_concrete.numEntries());
+ for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
+ result->appendTypedValue(finalizeGroupInHashTableFast<HandleT, HashTableT>(hash_table,
+ group_by_key,
+ index));
+ }
+ return result;
+ }
+ }
+}
+
+template <typename HandleT,
typename StateT,
typename HashTableT>
void AggregationConcreteHandle::mergeGroupByHashTablesHelper(
@@ -478,6 +685,22 @@ void AggregationConcreteHandle::mergeGroupByHashTablesHelper(
source_hash_table_concrete.forEachCompositeKey(&merger);
}
+template <typename HandleT,
+ typename HashTableT>
+void AggregationConcreteHandle::mergeGroupByHashTablesHelperFast(
+ const AggregationStateHashTableBase &source_hash_table,
+ AggregationStateHashTableBase *destination_hash_table) const {
+ const HandleT &handle = static_cast<const HandleT &>(*this);
+ const HashTableT &source_hash_table_concrete =
+ static_cast<const HashTableT &>(source_hash_table);
+
+ HashTableMergerFast<HandleT, HashTableT> merger(handle,
+ destination_hash_table);
+
+ source_hash_table_concrete.forEachCompositeKeyFast(&merger);
+}
+
+
} // namespace quickstep
#endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index cdebb03..d9c3897 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -265,7 +265,7 @@ class AggregationHandle {
**/
virtual ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const = 0;
+ std::vector<std::vector<TypedValue>> *group_by_keys, int index) const = 0;
/**
* @brief Create a new HashTable for the distinctify step for DISTINCT aggregation.
@@ -362,6 +362,12 @@ class AggregationHandle {
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const = 0;
+ virtual size_t getPayloadSize() const {return 8;}
+ virtual void setPayloadOffset(std::size_t) {}
+ virtual void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) {}
+ virtual void mergeStatesFast(const uint8_t *src, uint8_t *dst) const {}
+ virtual void initPayload(uint8_t *byte_ptr) {}
+
protected:
AggregationHandle() {
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 42a2fb9..d0bd3b8 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -137,8 +137,7 @@ void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
AggregationStateHashTableBase *hash_table) const {
DCHECK_EQ(1u, argument_ids.size())
<< "Got wrong number of arguments for AVG: " << argument_ids.size();
-
- aggregateValueAccessorIntoHashTableUnaryHelper<
+/* aggregateValueAccessorIntoHashTableUnaryHelper<
AggregationHandleAvg,
AggregationStateAvg,
AggregationStateHashTable<AggregationStateAvg>>(
@@ -146,7 +145,14 @@ void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
argument_ids.front(),
group_by_key_ids,
blank_state_,
- hash_table);
+ hash_table); */
+
+/* static_cast<AggregationStateFastHashTable *>(hash_table)->upsertValueAccessorCompositeKeyFast(
+ argument_ids.front(),
+ accessor,
+ group_by_key_ids,
+ true,
+ const_cast<AggregationHandleAvg *>(this));*/
}
void AggregationHandleAvg::mergeStates(
@@ -161,6 +167,19 @@ void AggregationHandleAvg::mergeStates(
avg_source.sum_);
}
+void AggregationHandleAvg::mergeStatesFast(
+ const uint8_t *source,
+ uint8_t *destination) const {
+ const TypedValue *src_sum_ptr = reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset);
+ const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(source + blank_state_.count_offset);
+ TypedValue *dst_sum_ptr = reinterpret_cast<TypedValue *>(destination+blank_state_.sum_offset);
+ std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(destination + blank_state_.count_offset);
+ (*dst_count_ptr) += (*src_count_ptr);
+ *dst_sum_ptr = merge_add_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
+}
+
+
+
TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state);
if (agg_state.count_ == 0) {
@@ -175,12 +194,14 @@ TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
ColumnVector* AggregationHandleAvg::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
- return finalizeHashTableHelper<AggregationHandleAvg,
- AggregationStateHashTable<AggregationStateAvg>>(
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ return finalizeHashTableHelperFast<AggregationHandleAvg,
+ AggregationStateFastHashTable>(
*result_type_,
hash_table,
- group_by_keys);
+ group_by_keys,
+ index);
}
AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
@@ -206,9 +227,8 @@ void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
void AggregationHandleAvg::mergeGroupByHashTables(
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const {
- mergeGroupByHashTablesHelper<AggregationHandleAvg,
- AggregationStateAvg,
- AggregationStateHashTable<AggregationStateAvg>>(
+ mergeGroupByHashTablesHelperFast<AggregationHandleAvg,
+ AggregationStateFastHashTable>(
source_hash_table, destination_hash_table);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 4ad4b21..fe178f6 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -29,6 +29,7 @@
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
@@ -57,7 +58,10 @@ class AggregationStateAvg : public AggregationState {
*/
AggregationStateAvg(const AggregationStateAvg &orig)
: sum_(orig.sum_),
- count_(orig.count_) {
+ count_(orig.count_),
+ sum_offset(orig.sum_offset),
+ count_offset(orig.count_offset),
+ mutex_offset(orig.mutex_offset) {
}
/**
@@ -65,11 +69,19 @@ class AggregationStateAvg : public AggregationState {
*/
~AggregationStateAvg() override {}
+ size_t getPayloadSize() const {
+ size_t p1 = reinterpret_cast<size_t>(&sum_);
+ size_t p2 = reinterpret_cast<size_t>(&mutex_);
+ return (p2-p1);
+ }
+
private:
friend class AggregationHandleAvg;
AggregationStateAvg()
- : sum_(0), count_(0) {
+ : sum_(0), count_(0), sum_offset(0),
+ count_offset(reinterpret_cast<uint8_t *>(&count_)-reinterpret_cast<uint8_t *>(&sum_)),
+ mutex_offset(reinterpret_cast<uint8_t *>(&mutex_)-reinterpret_cast<uint8_t *>(&sum_)) {
}
// TODO(shoban): We might want to specialize sum_ and count_ to use atomics
@@ -77,6 +89,8 @@ class AggregationStateAvg : public AggregationState {
TypedValue sum_;
std::int64_t count_;
SpinMutex mutex_;
+
+ int sum_offset, count_offset, mutex_offset;
};
/**
@@ -109,6 +123,26 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
++state->count_;
}
+ inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+ DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
+ if (value.isNull()) return;
+ TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset);
+ *sum_ptr = fast_add_operator_->applyToTypedValues(*sum_ptr, value);
+ ++(*count_ptr);
+ }
+
+ inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+ iterateUnaryInlFast(arguments.front(), byte_ptr);
+ }
+
+ void initPayload(uint8_t *byte_ptr) override {
+ TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset);
+ *sum_ptr = blank_state_.sum_;
+ *count_ptr = blank_state_.count_;
+ }
+
AggregationState* accumulateColumnVectors(
const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
@@ -127,6 +161,9 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
void mergeStates(const AggregationState &source,
AggregationState *destination) const override;
+ void mergeStatesFast(const uint8_t *source,
+ uint8_t *destination) const override;
+
TypedValue finalize(const AggregationState &state) const override;
inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
@@ -139,9 +176,24 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
TypedValue(static_cast<double>(agg_state.count_)));
}
+ inline TypedValue finalizeHashTableEntryFast(const uint8_t *byte_ptr) const {
+// const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state);
+ // TODO(chasseur): Could improve performance further if we made a special
+ // version of finalizeHashTable() that collects all the sums into one
+ // ColumnVector and all the counts into another and then applies
+ // '*divide_operator_' to them in bulk.
+
+ uint8_t *value_ptr = const_cast<uint8_t*>(byte_ptr);
+ TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset);
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(value_ptr + blank_state_.count_offset);
+ return divide_operator_->applyToTypedValues(*sum_ptr,
+ TypedValue(static_cast<double>(*count_ptr)));
+ }
+
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const override;
/**
* @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
@@ -162,6 +214,10 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const override;
+ size_t getPayloadSize() const override {
+ return blank_state_.getPayloadSize();
+ }
+
private:
friend class AggregateFunctionAvg;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 964b7c2..dc11dac 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -135,18 +135,18 @@ template <bool count_star, bool nullable_type>
if (count_star) {
DCHECK_EQ(0u, argument_ids.size())
<< "Got wrong number of arguments for COUNT(*): " << argument_ids.size();
- aggregateValueAccessorIntoHashTableNullaryHelper<
+/* aggregateValueAccessorIntoHashTableNullaryHelper<
AggregationHandleCount<count_star, nullable_type>,
AggregationStateCount,
AggregationStateHashTable<AggregationStateCount>>(
accessor,
group_by_key_ids,
AggregationStateCount(),
- hash_table);
+ hash_table);*/
} else {
DCHECK_EQ(1u, argument_ids.size())
<< "Got wrong number of arguments for COUNT: " << argument_ids.size();
- aggregateValueAccessorIntoHashTableUnaryHelper<
+/* aggregateValueAccessorIntoHashTableUnaryHelper<
AggregationHandleCount<count_star, nullable_type>,
AggregationStateCount,
AggregationStateHashTable<AggregationStateCount>>(
@@ -154,7 +154,7 @@ template <bool count_star, bool nullable_type>
argument_ids.front(),
group_by_key_ids,
AggregationStateCount(),
- hash_table);
+ hash_table); */
}
}
@@ -170,14 +170,25 @@ template <bool count_star, bool nullable_type>
}
template <bool count_star, bool nullable_type>
+void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast(
+ const uint8_t *source,
+ uint8_t *destination) const {
+ const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(source);
+ std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(destination);
+ (*dst_count_ptr) += (*src_count_ptr);
+}
+
+template <bool count_star, bool nullable_type>
ColumnVector* AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
- return finalizeHashTableHelper<AggregationHandleCount<count_star, nullable_type>,
- AggregationStateHashTable<AggregationStateCount>>(
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ return finalizeHashTableHelperFast<AggregationHandleCount<count_star, nullable_type>,
+ AggregationStateFastHashTable>(
TypeFactory::GetType(kLong),
hash_table,
- group_by_keys);
+ group_by_keys,
+ index);
}
template <bool count_star, bool nullable_type>
@@ -197,12 +208,10 @@ void AggregationHandleCount<count_star, nullable_type>
const AggregationStateHashTableBase &distinctify_hash_table,
AggregationStateHashTableBase *aggregation_hash_table) const {
DCHECK_EQ(count_star, false);
- aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
AggregationHandleCount<count_star, nullable_type>,
- AggregationStateCount,
- AggregationStateHashTable<AggregationStateCount>>(
+ AggregationStateFastHashTable>(
distinctify_hash_table,
- AggregationStateCount(),
aggregation_hash_table);
}
@@ -210,10 +219,9 @@ template <bool count_star, bool nullable_type>
void AggregationHandleCount<count_star, nullable_type>::mergeGroupByHashTables(
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const {
- mergeGroupByHashTablesHelper<
+ mergeGroupByHashTablesHelperFast<
AggregationHandleCount,
- AggregationStateCount,
- AggregationStateHashTable<AggregationStateCount>>(source_hash_table,
+ AggregationStateFastHashTable>(source_hash_table,
destination_hash_table);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 50138b9..4046106 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -30,6 +30,7 @@
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
#include "types/TypedValue.hpp"
#include "utility/Macros.hpp"
@@ -62,6 +63,10 @@ class AggregationStateCount : public AggregationState {
*/
~AggregationStateCount() override {}
+ size_t getPayloadSize() const {
+ return sizeof(count_);
+ }
+
private:
friend class AggregationHandleCount<false, false>;
friend class AggregationHandleCount<false, true>;
@@ -108,6 +113,11 @@ class AggregationHandleCount : public AggregationConcreteHandle {
state->count_.fetch_add(1, std::memory_order_relaxed);
}
+ inline void iterateNullaryInlFast(uint8_t *byte_ptr) {
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+ (*count_ptr)++;
+ }
+
/**
* @brief Iterate with count aggregation state.
*/
@@ -117,6 +127,25 @@ class AggregationHandleCount : public AggregationConcreteHandle {
}
}
+ inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) const {
+ if ((!nullable_type) || (!value.isNull())) {
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+ (*count_ptr)++;
+ }
+ }
+
+ inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+ if (arguments.size())
+ iterateUnaryInlFast(arguments.front(), byte_ptr);
+ else
+ iterateNullaryInlFast(byte_ptr);
+ }
+
+ void initPayload(uint8_t *byte_ptr) override {
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+ *count_ptr = 0;
+ }
+
AggregationState* accumulateNullary(const std::size_t num_tuples) const override {
return new AggregationStateCount(num_tuples);
}
@@ -139,6 +168,9 @@ class AggregationHandleCount : public AggregationConcreteHandle {
void mergeStates(const AggregationState &source,
AggregationState *destination) const override;
+ void mergeStatesFast(const uint8_t *source,
+ uint8_t *destination) const override;
+
TypedValue finalize(const AggregationState &state) const override {
return TypedValue(static_cast<const AggregationStateCount&>(state).count_.load(std::memory_order_relaxed));
}
@@ -147,9 +179,21 @@ class AggregationHandleCount : public AggregationConcreteHandle {
return TypedValue(static_cast<const AggregationStateCount&>(state).count_.load(std::memory_order_relaxed));
}
+ inline TypedValue finalizeHashTableEntryFast(const uint8_t *byte_ptr) const {
+// const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state);
+ // TODO(chasseur): Could improve performance further if we made a special
+ // version of finalizeHashTable() that collects all the sums into one
+ // ColumnVector and all the counts into another and then applies
+ // '*divide_operator_' to them in bulk.
+
+ const std::int64_t *count_ptr = reinterpret_cast<const std::int64_t *>(byte_ptr);
+ return TypedValue(*count_ptr);
+ }
+
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const override;
/**
* @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
@@ -170,6 +214,10 @@ class AggregationHandleCount : public AggregationConcreteHandle {
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const override;
+ size_t getPayloadSize() const override {
+ return sizeof(std::int64_t);
+ }
+
private:
friend class AggregateFunctionCount;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
index fe8ffcf..0088239 100644
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ b/expressions/aggregation/AggregationHandleDistinct.cpp
@@ -63,7 +63,7 @@ void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable(
ColumnVector* AggregationHandleDistinct::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
+ std::vector<std::vector<TypedValue>> *group_by_keys, int index) const {
DCHECK(group_by_keys->empty());
const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
index 6342c2b..1e36845 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -107,7 +107,7 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ std::vector<std::vector<TypedValue>> *group_by_keys, int index) const override;
void mergeGroupByHashTables(
const AggregationStateHashTableBase &source_hash_table,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index a7a4a52..f0eebde 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -87,7 +87,7 @@ void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
DCHECK_EQ(1u, argument_ids.size())
<< "Got wrong number of arguments for MAX: " << argument_ids.size();
- aggregateValueAccessorIntoHashTableUnaryHelper<
+/* aggregateValueAccessorIntoHashTableUnaryHelper<
AggregationHandleMax,
AggregationStateMax,
AggregationStateHashTable<AggregationStateMax>>(
@@ -95,7 +95,7 @@ void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
argument_ids.front(),
group_by_key_ids,
AggregationStateMax(type_),
- hash_table);
+ hash_table);*/
}
void AggregationHandleMax::mergeStates(
@@ -109,14 +109,26 @@ void AggregationHandleMax::mergeStates(
}
}
+void AggregationHandleMax::mergeStatesFast(
+ const std::uint8_t *source,
+ std::uint8_t *destination) const {
+ const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source);
+ TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination);
+ if (!(src_max_ptr->isNull())) {
+ compareAndUpdateFast(dst_max_ptr, *src_max_ptr);
+ }
+}
+
ColumnVector* AggregationHandleMax::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
- return finalizeHashTableHelper<AggregationHandleMax,
- AggregationStateHashTable<AggregationStateMax>>(
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ return finalizeHashTableHelperFast<AggregationHandleMax,
+ AggregationStateFastHashTable>(
type_.getNullableVersion(),
hash_table,
- group_by_keys);
+ group_by_keys,
+ index);
}
AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
@@ -142,9 +154,8 @@ void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
void AggregationHandleMax::mergeGroupByHashTables(
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const {
- mergeGroupByHashTablesHelper<AggregationHandleMax,
- AggregationStateMax,
- AggregationStateHashTable<AggregationStateMax>>(
+ mergeGroupByHashTablesHelperFast<AggregationHandleMax,
+ AggregationStateFastHashTable>(
source_hash_table, destination_hash_table);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 5af5a12..ce7c702 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -29,6 +29,7 @@
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
@@ -105,6 +106,22 @@ class AggregationHandleMax : public AggregationConcreteHandle {
compareAndUpdate(static_cast<AggregationStateMax*>(state), value);
}
+ inline void iterateUnaryInlFast(const TypedValue &value, std::uint8_t *byte_ptr) const {
+ DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
+ TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+ compareAndUpdateFast(max_ptr, value);
+ }
+
+ inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+ iterateUnaryInlFast(arguments.front(), byte_ptr);
+ }
+
+ void initPayload(uint8_t *byte_ptr) override {
+ TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+ TypedValue t1 = (type_.getNullableVersion().makeNullValue());
+ *max_ptr = t1;
+ }
+
AggregationState* accumulateColumnVectors(
const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
@@ -123,6 +140,9 @@ class AggregationHandleMax : public AggregationConcreteHandle {
void mergeStates(const AggregationState &source,
AggregationState *destination) const override;
+ void mergeStatesFast(const std::uint8_t *source,
+ std::uint8_t *destination) const override;
+
TypedValue finalize(const AggregationState &state) const override {
return TypedValue(static_cast<const AggregationStateMax&>(state).max_);
}
@@ -131,9 +151,15 @@ class AggregationHandleMax : public AggregationConcreteHandle {
return TypedValue(static_cast<const AggregationStateMax&>(state).max_);
}
+ inline TypedValue finalizeHashTableEntryFast(const std::uint8_t *byte_ptr) const {
+ const TypedValue *max_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
+ return TypedValue(*max_ptr);
+ }
+
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const override;
/**
* @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
@@ -155,6 +181,10 @@ class AggregationHandleMax : public AggregationConcreteHandle {
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const override;
+ size_t getPayloadSize() const override {
+ return sizeof(TypedValue);
+ }
+
private:
friend class AggregateFunctionMax;
@@ -181,6 +211,13 @@ class AggregationHandleMax : public AggregationConcreteHandle {
}
}
+ inline void compareAndUpdateFast(TypedValue *max_ptr, const TypedValue &value) const {
+ if (value.isNull()) return;
+ if (max_ptr->isNull() || fast_comparator_->compareTypedValues(value, *max_ptr)) {
+ *max_ptr = value;
+ }
+ }
+
const Type &type_;
std::unique_ptr<UncheckedComparator> fast_comparator_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index ca9b163..cbedd9b 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -89,7 +89,7 @@ void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
DCHECK_EQ(1u, argument_ids.size())
<< "Got wrong number of arguments for MIN: " << argument_ids.size();
- aggregateValueAccessorIntoHashTableUnaryHelper<
+/* aggregateValueAccessorIntoHashTableUnaryHelper<
AggregationHandleMin,
AggregationStateMin,
AggregationStateHashTable<AggregationStateMin>>(
@@ -97,7 +97,7 @@ void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
argument_ids.front(),
group_by_key_ids,
AggregationStateMin(type_),
- hash_table);
+ hash_table);*/
}
void AggregationHandleMin::mergeStates(
@@ -111,14 +111,27 @@ void AggregationHandleMin::mergeStates(
}
}
+void AggregationHandleMin::mergeStatesFast(
+ const std::uint8_t *source,
+ std::uint8_t *destination) const {
+ const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source);
+ TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination);
+
+ if (!(src_min_ptr->isNull())) {
+ compareAndUpdateFast(dst_min_ptr, *src_min_ptr);
+ }
+}
+
ColumnVector* AggregationHandleMin::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
- return finalizeHashTableHelper<AggregationHandleMin,
- AggregationStateHashTable<AggregationStateMin>>(
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ return finalizeHashTableHelperFast<AggregationHandleMin,
+ AggregationStateFastHashTable>(
type_.getNonNullableVersion(),
hash_table,
- group_by_keys);
+ group_by_keys,
+ index);
}
AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
@@ -144,9 +157,8 @@ void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
void AggregationHandleMin::mergeGroupByHashTables(
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const {
- mergeGroupByHashTablesHelper<AggregationHandleMin,
- AggregationStateMin,
- AggregationStateHashTable<AggregationStateMin>>(
+ mergeGroupByHashTablesHelperFast<AggregationHandleMin,
+ AggregationStateFastHashTable>(
source_hash_table, destination_hash_table);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index f68bb9d..7c7869e 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -29,6 +29,7 @@
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
@@ -64,6 +65,11 @@ class AggregationStateMin : public AggregationState {
*/
~AggregationStateMin() override {}
+ size_t getPayloadSize() const {
+ return sizeof(TypedValue);
+ }
+
+
private:
friend class AggregationHandleMin;
@@ -104,6 +110,22 @@ class AggregationHandleMin : public AggregationConcreteHandle {
compareAndUpdate(state, value);
}
+ inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+ DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
+ TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+ compareAndUpdateFast(min_ptr, value);
+ }
+
+ inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+ iterateUnaryInlFast(arguments.front(), byte_ptr);
+ }
+
+ void initPayload(uint8_t *byte_ptr) override {
+ TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+ TypedValue t1 = (type_.getNullableVersion().makeNullValue());
+ *min_ptr = t1;
+ }
+
AggregationState* accumulateColumnVectors(
const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
@@ -122,6 +144,9 @@ class AggregationHandleMin : public AggregationConcreteHandle {
void mergeStates(const AggregationState &source,
AggregationState *destination) const override;
+ void mergeStatesFast(const uint8_t *source,
+ uint8_t *destination) const override;
+
TypedValue finalize(const AggregationState &state) const override {
return static_cast<const AggregationStateMin&>(state).min_;
}
@@ -130,9 +155,15 @@ class AggregationHandleMin : public AggregationConcreteHandle {
return static_cast<const AggregationStateMin&>(state).min_;
}
+ inline TypedValue finalizeHashTableEntryFast(const std::uint8_t *byte_ptr) const {
+ const TypedValue *min_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
+ return TypedValue(*min_ptr);
+ }
+
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const override;
/**
* @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
@@ -153,6 +184,10 @@ class AggregationHandleMin : public AggregationConcreteHandle {
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const override;
+ size_t getPayloadSize() const override {
+ return sizeof(TypedValue);
+ }
+
private:
friend class AggregateFunctionMin;
@@ -178,6 +213,13 @@ class AggregationHandleMin : public AggregationConcreteHandle {
}
}
+ inline void compareAndUpdateFast(TypedValue *min_ptr, const TypedValue &value) const {
+ if (value.isNull()) return;
+ if (min_ptr->isNull() || fast_comparator_->compareTypedValues(value, *min_ptr)) {
+ *min_ptr = value;
+ }
+ }
+
const Type &type_;
std::unique_ptr<UncheckedComparator> fast_comparator_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 691ff39..ad486eb 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -93,7 +93,6 @@ AggregationState* AggregationHandleSum::accumulateColumnVectors(
const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
DCHECK_EQ(1u, column_vectors.size())
<< "Got wrong number of ColumnVectors for SUM: " << column_vectors.size();
-
std::size_t num_tuples = 0;
TypedValue cv_sum = fast_operator_->accumulateColumnVector(
blank_state_.sum_,
@@ -127,7 +126,7 @@ void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
DCHECK_EQ(1u, argument_ids.size())
<< "Got wrong number of arguments for SUM: " << argument_ids.size();
- aggregateValueAccessorIntoHashTableUnaryHelper<
+/* aggregateValueAccessorIntoHashTableUnaryHelper<
AggregationHandleSum,
AggregationStateSum,
AggregationStateHashTable<AggregationStateSum>>(
@@ -135,7 +134,7 @@ void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
argument_ids.front(),
group_by_key_ids,
blank_state_,
- hash_table);
+ hash_table);*/
}
void AggregationHandleSum::mergeStates(
@@ -150,6 +149,17 @@ void AggregationHandleSum::mergeStates(
sum_destination->null_ = sum_destination->null_ && sum_source.null_;
}
+void AggregationHandleSum::mergeStatesFast(
+ const uint8_t *source,
+ uint8_t *destination) const {
+ const TypedValue *src_sum_ptr = reinterpret_cast<const TypedValue *>(source+blank_state_.sum_offset);
+ const bool *src_null_ptr = reinterpret_cast<const bool *>(source+blank_state_.null_offset);
+ TypedValue *dst_sum_ptr = reinterpret_cast<TypedValue *>(destination+blank_state_.sum_offset);
+ bool *dst_null_ptr = reinterpret_cast<bool *>(destination+blank_state_.null_offset);
+ *dst_sum_ptr = merge_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
+ *dst_null_ptr = (*dst_null_ptr) && (*src_null_ptr);
+}
+
TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
const AggregationStateSum &agg_state = static_cast<const AggregationStateSum&>(state);
if (agg_state.null_) {
@@ -162,12 +172,14 @@ TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
ColumnVector* AggregationHandleSum::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const {
- return finalizeHashTableHelper<AggregationHandleSum,
- AggregationStateHashTable<AggregationStateSum>>(
+ std::vector<std::vector<TypedValue>> *group_by_keys,
+ int index) const {
+ return finalizeHashTableHelperFast<AggregationHandleSum,
+ AggregationStateFastHashTable>(
*result_type_,
hash_table,
- group_by_keys);
+ group_by_keys,
+ index);
}
AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
@@ -193,9 +205,8 @@ void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
void AggregationHandleSum::mergeGroupByHashTables(
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const {
- mergeGroupByHashTablesHelper<AggregationHandleSum,
- AggregationStateSum,
- AggregationStateHashTable<AggregationStateSum>>(
+ mergeGroupByHashTablesHelperFast<AggregationHandleSum,
+ AggregationStateFastHashTable>(
source_hash_table, destination_hash_table);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index fdc0884..a8d2b5a 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -29,6 +29,7 @@
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
@@ -57,27 +58,39 @@ class AggregationStateSum : public AggregationState {
*/
AggregationStateSum(const AggregationStateSum &orig)
: sum_(orig.sum_),
- null_(orig.null_) {
+ null_(orig.null_),
+ sum_offset(orig.sum_offset),
+ null_offset(orig.null_offset) {
}
private:
friend class AggregationHandleSum;
AggregationStateSum()
- : sum_(0), null_(true) {
+ : sum_(0), null_(true), sum_offset(0),
+ null_offset(reinterpret_cast<uint8_t *>(&null_)-reinterpret_cast<uint8_t *>(&sum_)) {
}
AggregationStateSum(TypedValue &&sum, const bool is_null)
: sum_(std::move(sum)), null_(is_null) {
}
+ size_t getPayloadSize() const {
+ size_t p1 = reinterpret_cast<size_t>(&sum_);
+ size_t p2 = reinterpret_cast<size_t>(&mutex_);
+ return (p2-p1);
+ }
+
// TODO(shoban): We might want to specialize sum_ to use atomics for int types
// similar to in AggregationStateCount.
TypedValue sum_;
bool null_;
SpinMutex mutex_;
+
+ int sum_offset, null_offset;
};
+
/**
* @brief An aggregationhandle for sum.
**/
@@ -105,6 +118,26 @@ class AggregationHandleSum : public AggregationConcreteHandle {
state->null_ = false;
}
+ inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+ DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
+ if (value.isNull()) return;
+ TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
+ bool *null_ptr = reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset);
+ *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value);
+ *null_ptr = false;
+ }
+
+ inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+ iterateUnaryInlFast(arguments.front(), byte_ptr);
+ }
+
+ void initPayload(uint8_t *byte_ptr) override {
+ TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
+ bool *null_ptr = reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset);
+ *sum_ptr = blank_state_.sum_;
+ *null_ptr = true;
+ }
+
AggregationState* accumulateColumnVectors(
const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
@@ -123,15 +156,24 @@ class AggregationHandleSum : public AggregationConcreteHandle {
void mergeStates(const AggregationState &source,
AggregationState *destination) const override;
+ void mergeStatesFast(const uint8_t *source,
+ uint8_t *destination) const override;
+
TypedValue finalize(const AggregationState &state) const override;
inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
return static_cast<const AggregationStateSum&>(state).sum_;
}
+ inline TypedValue finalizeHashTableEntryFast(const uint8_t *byte_ptr) const {
+ uint8_t *value_ptr = const_cast<uint8_t*>(byte_ptr);
+ TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset);
+ return *sum_ptr;
+ }
+
ColumnVector* finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
- std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+ std::vector<std::vector<TypedValue>> *group_by_keys, int index) const override;
/**
* @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
@@ -152,6 +194,10 @@ class AggregationHandleSum : public AggregationConcreteHandle {
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const override;
+ size_t getPayloadSize() const override {
+ return blank_state_.getPayloadSize();
+ }
+
private:
friend class AggregateFunctionSum;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 5744c52..6f259fa 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -144,9 +144,11 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
glog
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
+ quickstep_threading_SpinMutex
quickstep_types_TypedValue
quickstep_types_containers_ColumnVector
quickstep_utility_Macros)
@@ -161,6 +163,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
@@ -178,6 +181,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
@@ -202,6 +206,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
@@ -218,6 +223,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
@@ -234,6 +240,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 4878cf1..185c6b1 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -83,6 +83,9 @@ AggregationOperationState::AggregationOperationState(
group_by_types.emplace_back(&group_by_element->getType());
}
+ std::vector<AggregationHandle *> group_by_handles;
+ group_by_handles.clear();
+
if (aggregate_functions.size() == 0) {
// If there is no aggregation function, then it is a distinctify operation
// on the group-by expressions.
@@ -92,11 +95,17 @@ AggregationOperationState::AggregationOperationState(
arguments_.push_back({});
is_distinct_.emplace_back(false);
- group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+ /* group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
new HashTablePool(estimated_num_entries,
hash_table_impl_type,
group_by_types,
handles_.back().get(),
+ storage_manager)));*/
+ group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+ new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types,
+ handles_.back(),
storage_manager)));
} else {
// Set up each individual aggregate in this operation.
@@ -107,6 +116,7 @@ AggregationOperationState::AggregationOperationState(
std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
std::vector<HashTableImplType>::const_iterator distinctify_hash_table_impl_types_it
= distinctify_hash_table_impl_types.begin();
+ std::vector<std::size_t> payload_sizes;
for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) {
// Get the Types of this aggregate's arguments so that we can create an
// AggregationHandle.
@@ -126,12 +136,15 @@ AggregationOperationState::AggregationOperationState(
if (!group_by_list_.empty()) {
// Aggregation with GROUP BY: create a HashTable pool for per-group states.
- group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+ /* group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
new HashTablePool(estimated_num_entries,
hash_table_impl_type,
group_by_types,
handles_.back().get(),
- storage_manager)));
+ storage_manager)));*/
+ group_by_handles.emplace_back(handles_.back());
+ payload_sizes.emplace_back(handles_.back()->getPayloadSize());
+
} else {
// Aggregation without GROUP BY: create a single global state.
single_states_.emplace_back(handles_.back()->createInitialState());
@@ -166,17 +179,40 @@ AggregationOperationState::AggregationOperationState(
// the number of entries in the distinctify hash table. We may estimate
// for each distinct aggregation an estimated_num_distinct_keys value during
// query optimization, if it worths.
- distinctify_hashtables_.emplace_back(
+ /* distinctify_hashtables_.emplace_back(
handles_.back()->createDistinctifyHashTable(
*distinctify_hash_table_impl_types_it,
key_types,
estimated_num_entries,
+ storage_manager));*/
+
+std::vector<AggregationHandle *> local;
+local.emplace_back(handles_.back());
+ distinctify_hashtables_.emplace_back(
+AggregationStateFastHashTableFactory::CreateResizable(
+ *distinctify_hash_table_impl_types_it,
+ key_types,
+ estimated_num_entries,
+ {0},
+ local,
storage_manager));
+
++distinctify_hash_table_impl_types_it;
} else {
distinctify_hashtables_.emplace_back(nullptr);
}
}
+
+ if (!group_by_handles.empty()) {
+ // Aggregation with GROUP BY: create a HashTable pool for per-group states.
+ group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+ new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types,
+ payload_sizes,
+ group_by_handles,
+ storage_manager)));
+ }
}
}
@@ -410,17 +446,24 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
// Call StorageBlock::aggregateGroupBy() to aggregate this block's values
// directly into the (threadsafe) shared global HashTable for this
// aggregate.
- DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
- AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
+ DCHECK(group_by_hashtable_pools_[0] != nullptr);
+ AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast();
DCHECK(agg_hash_table != nullptr);
- block->aggregateGroupBy(*handles_[agg_idx],
+ /* block->aggregateGroupBy(*handles_[agg_idx],
arguments_[agg_idx],
group_by_list_,
predicate_.get(),
agg_hash_table,
&reuse_matches,
+ &reuse_group_by_vectors);*/
+ block->aggregateGroupByFast(arguments_,
+ group_by_list_,
+ predicate_.get(),
+ agg_hash_table,
+ &reuse_matches,
&reuse_group_by_vectors);
- group_by_hashtable_pools_[agg_idx]->returnHashTable(agg_hash_table);
+ group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
+ break;
}
}
}
@@ -444,6 +487,12 @@ void AggregationOperationState::finalizeSingleState(InsertDestination *output_de
output_destination->insertTuple(Tuple(std::move(attribute_values)));
}
+void AggregationOperationState::mergeGroupByHashTables(AggregationStateHashTableBase *src,
+ AggregationStateHashTableBase *dst) {
+ HashTableMergerNewFast merger(dst);
+ (static_cast<FastHashTable<true, false, true, false> *>(src))->forEachCompositeKeyFast(&merger);
+}
+
void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) {
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
@@ -455,18 +504,21 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
// TODO(harshad) - Find heuristics for faster merge, even in a single thread.
// e.g. Keep merging entries from smaller hash tables to larger.
+// auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
+
+ auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
if (hash_tables->size() > 1) {
for (int hash_table_index = 0;
hash_table_index < static_cast<int>(hash_tables->size() - 1);
++hash_table_index) {
// Merge each hash table to the last hash table.
- handles_[agg_idx]->mergeGroupByHashTables(
- (*(*hash_tables)[hash_table_index]),
+ mergeGroupByHashTables(
+ (*hash_tables)[hash_table_index].get(),
hash_tables->back().get());
}
}
+ break;
}
// Collect per-aggregate finalized values.
@@ -475,16 +527,16 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
agg_idx < handles_.size();
++agg_idx) {
if (is_distinct_[agg_idx]) {
- DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
- auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ DCHECK(group_by_hashtable_pools_[0] != nullptr);
+ auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
DCHECK(hash_tables != nullptr);
if (hash_tables->empty()) {
// We may have a case where hash_tables is empty, e.g. no input blocks.
// However for aggregateOnDistinctifyHashTableForGroupBy to work
// correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
- group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[0]->getHashTableFast();
+ group_by_hashtable_pools_[0]->returnHashTable(new_hash_table);
+ hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
}
DCHECK(hash_tables->back() != nullptr);
AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
@@ -494,21 +546,22 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
agg_hash_table);
}
- auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
DCHECK(hash_tables != nullptr);
if (hash_tables->empty()) {
// We may have a case where hash_tables is empty, e.g. no input blocks.
// However for aggregateOnDistinctifyHashTableForGroupBy to work
// correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
- group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
- hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+ AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[0]->getHashTable();
+ group_by_hashtable_pools_[0]->returnHashTable(new_hash_table);
+ hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
}
AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
DCHECK(agg_hash_table != nullptr);
ColumnVector* agg_result_col =
handles_[agg_idx]->finalizeHashTable(*agg_hash_table,
- &group_by_keys);
+ &group_by_keys,
+ agg_idx);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 0199749..8934cda 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -165,6 +165,8 @@ class AggregationOperationState {
**/
void finalizeAggregate(InsertDestination *output_destination);
+ int dflag;
+
private:
// Merge locally (per storage block) aggregated states with global aggregation
// states.
@@ -185,7 +187,8 @@ class AggregationOperationState {
// Each individual aggregate in this operation has an AggregationHandle and
// some number of Scalar arguments.
- std::vector<std::unique_ptr<AggregationHandle>> handles_;
+// std::vector<std::unique_ptr<AggregationHandle>> handles_;
+ std::vector<AggregationHandle *> handles_;
std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
// For each aggregate, whether DISTINCT should be applied to the aggregate's
@@ -215,6 +218,8 @@ class AggregationOperationState {
StorageManager *storage_manager_;
+ void mergeGroupByHashTables(AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst);
+
DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 582effd..e67b21f 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -196,6 +196,9 @@ if (ENABLE_DISTRIBUTED)
endif()
add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
+add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp)
+add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp)
+add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp)
add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
@@ -624,6 +627,55 @@ target_link_libraries(quickstep_storage_EvictionPolicy
quickstep_threading_SpinMutex
quickstep_threading_SpinSharedMutex
quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastHashTable
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_aggregation_AggregationHandleAvg
+ quickstep_storage_HashTable
+ quickstep_storage_HashTableBase
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_storage_TupleReference
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorUtil
+ quickstep_threading_SpinMutex
+ quickstep_threading_SpinSharedMutex
+ quickstep_types_Type
+ quickstep_types_TypedValue
+ quickstep_utility_BloomFilter
+ quickstep_utility_HashPair
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastHashTableFactory
+ glog
+ quickstep_storage_FastHashTable
+ quickstep_storage_FastSeparateChainingHashTable
+ quickstep_storage_HashTable
+ quickstep_storage_HashTable_proto
+ quickstep_storage_HashTableBase
+ quickstep_storage_HashTableFactory
+ quickstep_storage_LinearOpenAddressingHashTable
+ quickstep_storage_SeparateChainingHashTable
+ quickstep_storage_SimpleScalarSeparateChainingHashTable
+ quickstep_storage_TupleReference
+ quickstep_types_TypeFactory
+ quickstep_utility_BloomFilter
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
+ quickstep_storage_FastHashTable
+ quickstep_storage_HashTable
+ quickstep_storage_HashTableBase
+ quickstep_storage_HashTableKeyManager
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_threading_SpinSharedMutex
+ quickstep_types_Type
+ quickstep_types_TypedValue
+ quickstep_utility_Alignment
+ quickstep_utility_Macros
+ quickstep_utility_PrimeNumber)
target_link_libraries(quickstep_storage_FileManager
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
@@ -709,6 +761,8 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
target_link_libraries(quickstep_storage_HashTablePool
glog
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
+ quickstep_storage_FastHashTableFactory
quickstep_storage_HashTableBase
quickstep_threading_SpinMutex
quickstep_utility_Macros
@@ -913,6 +967,7 @@ target_link_libraries(quickstep_storage_StorageBlock
quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
quickstep_storage_CompressedPackedRowStoreTupleStorageSubBlock
quickstep_storage_CountedReference
+ quickstep_storage_FastHashTable
quickstep_storage_HashTableBase
quickstep_storage_IndexSubBlock
quickstep_storage_InsertDestinationInterface
@@ -1096,6 +1151,9 @@ target_link_libraries(quickstep_storage
quickstep_storage_EvictionPolicy
quickstep_storage_FileManager
quickstep_storage_FileManagerLocal
+ quickstep_storage_FastHashTable
+ quickstep_storage_FastHashTableFactory
+ quickstep_storage_FastSeparateChainingHashTable
quickstep_storage_HashTable
quickstep_storage_HashTable_proto
quickstep_storage_HashTableBase