You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:54 UTC
[30/30] incubator-quickstep git commit: Fixed 4 failures on unit tests
Fixed 4 failures on unit tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d9c9e686
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d9c9e686
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d9c9e686
Branch: refs/heads/quickstep-28-29
Commit: d9c9e6867d3cbd6ed6209e944a2b7d91748e40d3
Parents: 39485d6
Author: rathijit <ra...@node-2.aggregation.quickstep-pg0.wisc.cloudlab.us>
Authored: Fri Aug 5 06:00:12 2016 -0500
Committer: rathijit <ra...@node-2.aggregation.quickstep-pg0.wisc.cloudlab.us>
Committed: Fri Aug 5 14:23:35 2016 -0500
----------------------------------------------------------------------
.../aggregation/AggregationConcreteHandle.cpp | 14 +++---
.../aggregation/AggregationConcreteHandle.hpp | 41 ++++++++++++++--
expressions/aggregation/AggregationHandle.hpp | 6 ++-
.../aggregation/AggregationHandleAvg.cpp | 14 +++---
.../aggregation/AggregationHandleAvg.hpp | 15 +++++-
.../aggregation/AggregationHandleCount.cpp | 7 +--
.../aggregation/AggregationHandleCount.hpp | 19 ++++++--
.../aggregation/AggregationHandleDistinct.cpp | 2 +-
.../aggregation/AggregationHandleDistinct.hpp | 2 +-
.../aggregation/AggregationHandleMax.cpp | 14 +++---
.../aggregation/AggregationHandleMax.hpp | 13 ++++-
.../aggregation/AggregationHandleMin.cpp | 14 +++---
.../aggregation/AggregationHandleMin.hpp | 15 +++++-
.../aggregation/AggregationHandleSum.cpp | 15 +++---
.../aggregation/AggregationHandleSum.hpp | 15 +++++-
storage/AggregationOperationState.cpp | 51 +++++++++++---------
storage/CMakeLists.txt | 1 -
storage/FastHashTable.hpp | 41 +++++++++++++++-
storage/FastSeparateChainingHashTable.hpp | 16 +++---
19 files changed, 221 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index d808302..9bc8fc8 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -50,17 +50,17 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
AggregationStateHashTableBase *distinctify_hash_table) const {
// If the key-value pair is already there, we don't need to update the value,
// which should always be "true". I.e. the value is just a placeholder.
-// const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {};
+ // const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {};
AggregationStateFastHashTable *hash_table =
static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
if (key_ids.size() == 1) {
-// TODO(rathijit): fix
-// hash_table->upsertValueAccessor(accessor,
-// key_ids[0],
-// true /* check_for_null_keys */,
-// true /* initial_value */,
-// &noop_upserter);
+ std::vector<std::vector<attribute_id>> args;
+ args.emplace_back(key_ids);
+ hash_table->upsertValueAccessorFast(args,
+ accessor,
+ key_ids[0],
+ true /* check_for_null_keys */);
} else {
std::vector<std::vector<attribute_id>> empty_args;
empty_args.resize(1);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index c5d7b3c..df02721 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -27,6 +27,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
#include "storage/HashTable.hpp"
+#include "storage/FastHashTable.hpp"
#include "storage/HashTableBase.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
@@ -278,6 +279,11 @@ class AggregationConcreteHandle : public AggregationHandle {
const AggregationStateHashTableBase &distinctify_hash_table) const;
template <typename HandleT,
+ typename StateT>
+ StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+ const AggregationStateHashTableBase &distinctify_hash_table) const;
+
+ template <typename HandleT,
typename StateT,
typename HashTableT>
void aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
@@ -289,7 +295,7 @@ class AggregationConcreteHandle : public AggregationHandle {
typename HashTableT>
void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *hash_table) const;
+ AggregationStateHashTableBase *hash_table, int index) const;
template <typename HandleT,
@@ -494,6 +500,31 @@ StateT* AggregationConcreteHandle::aggregateOnDistinctifyHashTableForSingleUnary
}
template <typename HandleT,
+ typename StateT>
+StateT* AggregationConcreteHandle::aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+ const AggregationStateHashTableBase &distinctify_hash_table) const {
+ const HandleT& handle = static_cast<const HandleT&>(*this);
+ StateT *state = static_cast<StateT*>(createInitialState());
+
+ // A lambda function which will be called on each key from the distinctify
+ // hash table.
+ const auto aggregate_functor = [&handle, &state](const TypedValue &key,
+ const std::uint8_t &dumb_placeholder) {
+ // For each (unary) key in the distinctify hash table, aggregate the key
+ // into "state".
+ handle.iterateUnaryInl(state, key);
+ };
+
+ const AggregationStateFastHashTable &hash_table =
+ static_cast<const AggregationStateFastHashTable &>(distinctify_hash_table);
+ // Invoke the lambda function "aggregate_functor" on each key from the distinctify
+ // hash table.
+ hash_table.forEach(&aggregate_functor);
+
+ return state;
+}
+
+template <typename HandleT,
typename StateT,
typename HashTableT>
void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
@@ -534,13 +565,13 @@ template <typename HandleT,
typename HashTableT>
void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const {
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const {
const HandleT& handle = static_cast<const HandleT&>(*this);
HashTableT *target_hash_table = static_cast<HashTableT*>(aggregation_hash_table);
// A lambda function which will be called on each key-value pair from the
// distinctify hash table.
- const auto aggregate_functor = [&handle, &target_hash_table](
+ const auto aggregate_functor = [&handle, &target_hash_table, &index](
std::vector<TypedValue> &key,
const bool &dumb_placeholder) {
// For each (composite) key vector in the distinctify hash table with size N.
@@ -552,10 +583,10 @@ void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHe
// An upserter as lambda function for aggregating the argument into its
// GROUP BY group's entry inside aggregation_hash_table.
const auto upserter = [&handle, &argument](std::uint8_t *state) {
- handle.iterateUnaryInlFast(argument, state+sizeof(SpinMutex));
+ handle.iterateUnaryInlFast(argument, state);
};
- target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter);
+ target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index);
};
const HashTableT &source_hash_table =
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index d9c3897..368b94e 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -347,7 +347,7 @@ class AggregationHandle {
*/
virtual void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const = 0;
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const = 0;
/**
* @brief Merge two GROUP BY hash tables in one.
@@ -362,11 +362,13 @@ class AggregationHandle {
const AggregationStateHashTableBase &source_hash_table,
AggregationStateHashTableBase *destination_hash_table) const = 0;
- virtual size_t getPayloadSize() const {return 8;}
+ virtual size_t getPayloadSize() const {return 1;}
virtual void setPayloadOffset(std::size_t) {}
virtual void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) {}
virtual void mergeStatesFast(const uint8_t *src, uint8_t *dst) const {}
virtual void initPayload(uint8_t *byte_ptr) {}
+ virtual void BlockUpdate() {}
+ virtual void AllowUpdate() {}
protected:
AggregationHandle() {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index d0bd3b8..93adf55 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -42,7 +42,7 @@ namespace quickstep {
class StorageManager;
AggregationHandleAvg::AggregationHandleAvg(const Type &type)
- : argument_type_(type) {
+ : argument_type_(type), block_update(false) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_precision_id;
@@ -206,7 +206,7 @@ ColumnVector* AggregationHandleAvg::finalizeHashTable(
AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
AggregationHandleAvg,
AggregationStateAvg>(
distinctify_hash_table);
@@ -214,14 +214,12 @@ AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle
void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
AggregationHandleAvg,
- AggregationStateAvg,
- AggregationStateHashTable<AggregationStateAvg>>(
+ AggregationStateFastHashTable>(
distinctify_hash_table,
- blank_state_,
- aggregation_hash_table);
+ aggregation_hash_table, index);
}
void AggregationHandleAvg::mergeGroupByHashTables(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index fe178f6..3de0f2b 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -123,7 +123,7 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
++state->count_;
}
- inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+ inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) const {
DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
if (value.isNull()) return;
TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
@@ -133,9 +133,18 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
}
inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+ if (block_update) return;
iterateUnaryInlFast(arguments.front(), byte_ptr);
}
+ void BlockUpdate() override {
+ block_update = true;
+ }
+
+ void AllowUpdate() override {
+ block_update = false;
+ }
+
void initPayload(uint8_t *byte_ptr) override {
TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset);
@@ -208,7 +217,7 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
*/
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const override;
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const override;
void mergeGroupByHashTables(
const AggregationStateHashTableBase &source_hash_table,
@@ -235,6 +244,8 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
+ bool block_update;
+
DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index dc11dac..a1d5388 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -196,7 +196,7 @@ AggregationState* AggregationHandleCount<count_star, nullable_type>
::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
DCHECK_EQ(count_star, false);
- return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
AggregationHandleCount<count_star, nullable_type>,
AggregationStateCount>(
distinctify_hash_table);
@@ -206,13 +206,14 @@ template <bool count_star, bool nullable_type>
void AggregationHandleCount<count_star, nullable_type>
::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const {
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const {
DCHECK_EQ(count_star, false);
aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
AggregationHandleCount<count_star, nullable_type>,
AggregationStateFastHashTable>(
distinctify_hash_table,
- aggregation_hash_table);
+ aggregation_hash_table,
+ index);
}
template <bool count_star, bool nullable_type>
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 4046106..64d03a3 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -135,15 +135,24 @@ class AggregationHandleCount : public AggregationConcreteHandle {
}
inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+ if (block_update) return;
if (arguments.size())
iterateUnaryInlFast(arguments.front(), byte_ptr);
else
iterateNullaryInlFast(byte_ptr);
}
+ void BlockUpdate() override {
+ block_update = true;
+ }
+
+ void AllowUpdate() override {
+ block_update = false;
+ }
+
void initPayload(uint8_t *byte_ptr) override {
- std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
- *count_ptr = 0;
+ std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+ *count_ptr = 0;
}
AggregationState* accumulateNullary(const std::size_t num_tuples) const override {
@@ -208,7 +217,7 @@ class AggregationHandleCount : public AggregationConcreteHandle {
*/
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const override;
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const override;
void mergeGroupByHashTables(
const AggregationStateHashTableBase &source_hash_table,
@@ -224,9 +233,11 @@ class AggregationHandleCount : public AggregationConcreteHandle {
/**
* @brief Constructor.
**/
- AggregationHandleCount() {
+ AggregationHandleCount() : block_update(false) {
}
+ bool block_update;
+
DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
index 0088239..cfcedcf 100644
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ b/expressions/aggregation/AggregationHandleDistinct.cpp
@@ -70,7 +70,7 @@ ColumnVector* AggregationHandleDistinct::finalizeHashTable(
const bool &dumb_placeholder) -> void {
group_by_keys->emplace_back(std::move(group_by_key));
};
- static_cast<const AggregationStateHashTable<bool>&>(hash_table).forEachCompositeKey(&keys_retriever);
+ static_cast<const AggregationStateFastHashTable&>(hash_table).forEachCompositeKey(&keys_retriever);
return nullptr;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
index 1e36845..2947d12 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -88,7 +88,7 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *groupby_hash_table) const override {
+ AggregationStateHashTableBase *groupby_hash_table, int index) const override {
LOG(FATAL) << "AggregationHandleDistinct does not support "
<< "aggregateOnDistinctifyHashTableForGroupBy().";
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index f0eebde..ef44742 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -37,7 +37,7 @@ namespace quickstep {
class StorageManager;
AggregationHandleMax::AggregationHandleMax(const Type &type)
- : type_(type) {
+ : type_(type), block_update(false) {
fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kGreater)
.makeUncheckedComparatorForTypes(type,
type.getNonNullableVersion()));
@@ -133,7 +133,7 @@ ColumnVector* AggregationHandleMax::finalizeHashTable(
AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
AggregationHandleMax,
AggregationStateMax>(
distinctify_hash_table);
@@ -141,14 +141,12 @@ AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle
void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
AggregationHandleMax,
- AggregationStateMax,
- AggregationStateHashTable<AggregationStateMax>>(
+ AggregationStateFastHashTable>(
distinctify_hash_table,
- AggregationStateMax(type_),
- aggregation_hash_table);
+ aggregation_hash_table, index);
}
void AggregationHandleMax::mergeGroupByHashTables(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index ce7c702..d82b9a8 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -113,9 +113,18 @@ class AggregationHandleMax : public AggregationConcreteHandle {
}
inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+ if (block_update) return;
iterateUnaryInlFast(arguments.front(), byte_ptr);
}
+ void BlockUpdate() override {
+ block_update = true;
+ }
+
+ void AllowUpdate() override {
+ block_update = false;
+ }
+
void initPayload(uint8_t *byte_ptr) override {
TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
TypedValue t1 = (type_.getNullableVersion().makeNullValue());
@@ -175,7 +184,7 @@ class AggregationHandleMax : public AggregationConcreteHandle {
*/
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const override;
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const override;
void mergeGroupByHashTables(
const AggregationStateHashTableBase &source_hash_table,
@@ -221,6 +230,8 @@ class AggregationHandleMax : public AggregationConcreteHandle {
const Type &type_;
std::unique_ptr<UncheckedComparator> fast_comparator_;
+ bool block_update;
+
DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index cbedd9b..b6e46f4 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -39,7 +39,7 @@ namespace quickstep {
class StorageManager;
AggregationHandleMin::AggregationHandleMin(const Type &type)
- : type_(type) {
+ : type_(type), block_update(false) {
fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kLess)
.makeUncheckedComparatorForTypes(type,
type.getNonNullableVersion()));
@@ -136,7 +136,7 @@ ColumnVector* AggregationHandleMin::finalizeHashTable(
AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
AggregationHandleMin,
AggregationStateMin>(
distinctify_hash_table);
@@ -144,14 +144,12 @@ AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle
void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
AggregationHandleMin,
- AggregationStateMin,
- AggregationStateHashTable<AggregationStateMin>>(
+ AggregationStateFastHashTable>(
distinctify_hash_table,
- AggregationStateMin(type_),
- aggregation_hash_table);
+ aggregation_hash_table, index);
}
void AggregationHandleMin::mergeGroupByHashTables(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 7c7869e..adf79d7 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -110,16 +110,25 @@ class AggregationHandleMin : public AggregationConcreteHandle {
compareAndUpdate(state, value);
}
- inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+ inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) const {
DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
compareAndUpdateFast(min_ptr, value);
}
inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+ if (block_update) return;
iterateUnaryInlFast(arguments.front(), byte_ptr);
}
+ void BlockUpdate() override {
+ block_update = true;
+ }
+
+ void AllowUpdate() override {
+ block_update = false;
+ }
+
void initPayload(uint8_t *byte_ptr) override {
TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
TypedValue t1 = (type_.getNullableVersion().makeNullValue());
@@ -178,7 +187,7 @@ class AggregationHandleMin : public AggregationConcreteHandle {
*/
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const override;
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const override;
void mergeGroupByHashTables(
const AggregationStateHashTableBase &source_hash_table,
@@ -223,6 +232,8 @@ class AggregationHandleMin : public AggregationConcreteHandle {
const Type &type_;
std::unique_ptr<UncheckedComparator> fast_comparator_;
+ bool block_update;
+
DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index ad486eb..1b0e355 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -43,7 +43,7 @@ namespace quickstep {
class StorageManager;
AggregationHandleSum::AggregationHandleSum(const Type &type)
- : argument_type_(type) {
+ : argument_type_(type), block_update(false) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_precision_id;
@@ -184,7 +184,7 @@ ColumnVector* AggregationHandleSum::finalizeHashTable(
AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
AggregationHandleSum,
AggregationStateSum>(
distinctify_hash_table);
@@ -192,14 +192,13 @@ AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle
void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const {
+ aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
AggregationHandleSum,
- AggregationStateSum,
- AggregationStateHashTable<AggregationStateSum>>(
+ AggregationStateFastHashTable>(
distinctify_hash_table,
- blank_state_,
- aggregation_hash_table);
+ aggregation_hash_table,
+ index);
}
void AggregationHandleSum::mergeGroupByHashTables(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index a8d2b5a..53f43ce 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -118,7 +118,7 @@ class AggregationHandleSum : public AggregationConcreteHandle {
state->null_ = false;
}
- inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+ inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) const {
DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
if (value.isNull()) return;
TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
@@ -128,9 +128,18 @@ class AggregationHandleSum : public AggregationConcreteHandle {
}
inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+ if (block_update) return;
iterateUnaryInlFast(arguments.front(), byte_ptr);
}
+ void BlockUpdate() override {
+ block_update = true;
+ }
+
+ void AllowUpdate() override {
+ block_update = false;
+ }
+
void initPayload(uint8_t *byte_ptr) override {
TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
bool *null_ptr = reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset);
@@ -188,7 +197,7 @@ class AggregationHandleSum : public AggregationConcreteHandle {
*/
void aggregateOnDistinctifyHashTableForGroupBy(
const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table) const override;
+ AggregationStateHashTableBase *aggregation_hash_table, int index) const override;
void mergeGroupByHashTables(
const AggregationStateHashTableBase &source_hash_table,
@@ -214,6 +223,8 @@ class AggregationHandleSum : public AggregationConcreteHandle {
std::unique_ptr<UncheckedBinaryOperator> fast_operator_;
std::unique_ptr<UncheckedBinaryOperator> merge_operator_;
+ bool block_update;
+
DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 185c6b1..14d4ea6 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -105,7 +105,8 @@ AggregationOperationState::AggregationOperationState(
new HashTablePool(estimated_num_entries,
hash_table_impl_type,
group_by_types,
- handles_.back(),
+ {1},
+ handles_,
storage_manager)));
} else {
// Set up each individual aggregate in this operation.
@@ -142,8 +143,11 @@ AggregationOperationState::AggregationOperationState(
group_by_types,
handles_.back().get(),
storage_manager)));*/
+ if (*is_distinct_it) {
+ handles_.back()->BlockUpdate();
+ }
group_by_handles.emplace_back(handles_.back());
- payload_sizes.emplace_back(handles_.back()->getPayloadSize());
+ payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
} else {
// Aggregation without GROUP BY: create a single global state.
@@ -186,26 +190,26 @@ AggregationOperationState::AggregationOperationState(
estimated_num_entries,
storage_manager));*/
-std::vector<AggregationHandle *> local;
-local.emplace_back(handles_.back());
+ std::vector<AggregationHandle *> local;
+ // local.emplace_back(handles_.back());
+ local.clear();
distinctify_hashtables_.emplace_back(
-AggregationStateFastHashTableFactory::CreateResizable(
+ AggregationStateFastHashTableFactory::CreateResizable(
*distinctify_hash_table_impl_types_it,
key_types,
estimated_num_entries,
{0},
local,
storage_manager));
-
++distinctify_hash_table_impl_types_it;
} else {
distinctify_hashtables_.emplace_back(nullptr);
}
}
- if (!group_by_handles.empty()) {
- // Aggregation with GROUP BY: create a HashTable pool for per-group states.
- group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+ if (!group_by_handles.empty()) {
+ // Aggregation with GROUP BY: create a HashTable pool for per-group states.
+ group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
new HashTablePool(estimated_num_entries,
hash_table_impl_type,
group_by_types,
@@ -213,7 +217,7 @@ AggregationStateFastHashTableFactory::CreateResizable(
group_by_handles,
storage_manager)));
}
- }
+ }
}
AggregationOperationState* AggregationOperationState::ReconstructFromProto(
@@ -442,13 +446,15 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
distinctify_hashtables_[agg_idx].get(),
&reuse_matches,
&reuse_group_by_vectors);
- } else {
- // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
- // directly into the (threadsafe) shared global HashTable for this
- // aggregate.
- DCHECK(group_by_hashtable_pools_[0] != nullptr);
- AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast();
- DCHECK(agg_hash_table != nullptr);
+ }
+ }
+
+ // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+ // directly into the (threadsafe) shared global HashTable for this
+ // aggregate.
+ DCHECK(group_by_hashtable_pools_[0] != nullptr);
+ AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast();
+ DCHECK(agg_hash_table != nullptr);
/* block->aggregateGroupBy(*handles_[agg_idx],
arguments_[agg_idx],
group_by_list_,
@@ -456,16 +462,13 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
agg_hash_table,
&reuse_matches,
&reuse_group_by_vectors);*/
- block->aggregateGroupByFast(arguments_,
+ block->aggregateGroupByFast(arguments_,
group_by_list_,
predicate_.get(),
agg_hash_table,
&reuse_matches,
&reuse_group_by_vectors);
- group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
- break;
- }
- }
+ group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
}
void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) {
@@ -541,9 +544,11 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
DCHECK(hash_tables->back() != nullptr);
AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
DCHECK(agg_hash_table != nullptr);
+ handles_[agg_idx]->AllowUpdate();
handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
*distinctify_hashtables_[agg_idx],
- agg_hash_table);
+ agg_hash_table,
+ agg_idx);
}
auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index e67b21f..74b4df5 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -629,7 +629,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_FastHashTable
quickstep_catalog_CatalogTypedefs
- quickstep_expressions_aggregation_AggregationHandleAvg
quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_StorageBlob
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/storage/FastHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp
index 12e447f..cba039a 100644
--- a/storage/FastHashTable.hpp
+++ b/storage/FastHashTable.hpp
@@ -35,7 +35,6 @@
#include "storage/TupleReference.hpp"
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorUtil.hpp"
-#include "expressions/aggregation/AggregationHandleAvg.hpp"
#include "threading/SpinSharedMutex.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
@@ -434,6 +433,11 @@ class FastHashTable : public HashTableBase<resizable,
const uint8_t *init_value_ptr,
FunctorT *functor);
+ template <typename FunctorT>
+ bool upsertCompositeKeyFast(const std::vector<TypedValue> &key,
+ const uint8_t *init_value_ptr,
+ FunctorT *functor, int index);
+
bool upsertCompositeKeyNewFast(const std::vector<TypedValue> &key,
const uint8_t *init_value_ptr,
const uint8_t *source_state);
@@ -1851,6 +1855,41 @@ template <bool resizable,
bool serializable,
bool force_key_copy,
bool allow_duplicate_keys>
+template <typename FunctorT>
+bool FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::upsertCompositeKeyFast(const std::vector<TypedValue> &key,
+ const std::uint8_t *init_value_ptr,
+ FunctorT *functor, int index) {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ const std::size_t variable_size = calculateVariableLengthCompositeKeyCopySize(key);
+ if (resizable) {
+ for (;;) {
+ {
+ SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
+ uint8_t *value = upsertCompositeKeyInternalFast(key, init_value_ptr, variable_size);
+ if (value != nullptr) {
+ (*functor)(value+payload_offsets_[index]);
+ return true;
+ }
+ }
+ resize(0, variable_size);
+ }
+ } else {
+ uint8_t *value = upsertCompositeKeyInternalFast(key, init_value_ptr, variable_size);
+ if (value == nullptr) {
+ return false;
+ } else {
+ (*functor)(value+payload_offsets_[index]);
+ return true;
+ }
+ }
+}
+
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
bool FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
::upsertCompositeKeyNewFast(const std::vector<TypedValue> &key,
const std::uint8_t *init_value_ptr,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9c9e686/storage/FastSeparateChainingHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastSeparateChainingHashTable.hpp b/storage/FastSeparateChainingHashTable.hpp
index 64c4979..756d6e5 100644
--- a/storage/FastSeparateChainingHashTable.hpp
+++ b/storage/FastSeparateChainingHashTable.hpp
@@ -308,8 +308,11 @@ FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_dup
key_manager_(this->key_types_, kValueOffset + this->total_payload_size_),
bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
init_payload_ = static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
- for (auto handle : handles)
- handle->initPayload(init_payload_);
+ int k = 0;
+ for (auto handle : handles) {
+ handle->initPayload(init_payload_+this->payload_offsets_[k]);
+ k++;
+ }
// Bucket size always rounds up to the alignment requirement of the atomic
// size_t "next" pointer at the front or a ValueT, whichever is larger.
//
@@ -437,8 +440,7 @@ FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_dup
true),
kBucketAlignment(alignof(std::atomic<std::size_t>) < alignof(uint8_t) ? alignof(uint8_t)
: alignof(std::atomic<std::size_t>)),
- kValueOffset((((sizeof(std::atomic<std::size_t>) + sizeof(std::size_t) - 1) /
- alignof(uint8_t)) + 1) * alignof(uint8_t)),
+ kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
key_manager_(this->key_types_, kValueOffset + sizeof(uint8_t)),
bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
// Bucket size always rounds up to the alignment requirement of the atomic
@@ -1046,7 +1048,6 @@ uint8_t* FastSeparateChainingHashTable<resizable, serializable, force_key_copy,
else
memcpy(value, init_value_ptr, this->total_payload_size_);
-
// Update the previous chain pointer to point to the new bucket.
pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);
@@ -1168,10 +1169,11 @@ uint8_t* FastSeparateChainingHashTable<resizable, serializable, force_key_copy,
// uint8_t *value;
// value = static_cast<unsigned char*>(bucket) + kValueOffset;
uint8_t *value = static_cast<unsigned char*>(bucket) + kValueOffset;
- if (init_value_ptr == nullptr)
+ if (init_value_ptr == nullptr) {
memcpy(value, init_payload_, this->total_payload_size_);
- else
+ } else {
memcpy(value, init_value_ptr, this->total_payload_size_);
+ }
// Update the previous chaing pointer to point to the new bucket.
pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);