You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/16 18:29:44 UTC
[20/29] incubator-quickstep git commit: Single aggregationGroupBy
method in StorageBlock.
Single aggregationGroupBy method in StorageBlock.
- New methods for separating unary and nullary updation of states.
- Added TODO to move method from HashTableBase class.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/06f39905
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/06f39905
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/06f39905
Branch: refs/heads/partitioned-aggregation
Commit: 06f399057cc2eca5c29834d6eaa8a7ce43898642
Parents: 2a9efc4
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Mon Sep 12 16:03:01 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Sep 15 15:41:05 2016 -0500
----------------------------------------------------------------------
catalog/CatalogTypedefs.hpp | 2 +
.../aggregation/AggregationConcreteHandle.cpp | 7 +-
expressions/aggregation/AggregationHandle.hpp | 32 ++++++-
.../aggregation/AggregationHandleAvg.hpp | 6 +-
.../aggregation/AggregationHandleCount.hpp | 15 ++--
.../aggregation/AggregationHandleMax.hpp | 6 +-
.../aggregation/AggregationHandleMin.hpp | 6 +-
.../aggregation/AggregationHandleSum.hpp | 6 +-
query_optimizer/ExecutionGenerator.cpp | 20 ++---
.../tests/AggregationOperator_unittest.cpp | 3 +-
storage/AggregationOperationState.cpp | 12 +--
storage/FastHashTable.hpp | 60 +++++++------
storage/FastHashTableFactory.hpp | 46 ----------
storage/HashTableBase.hpp | 19 +++-
storage/StorageBlock.cpp | 91 ++------------------
storage/StorageBlock.hpp | 25 ++----
16 files changed, 129 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/catalog/CatalogTypedefs.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogTypedefs.hpp b/catalog/CatalogTypedefs.hpp
index f7a2d53..70bac84 100644
--- a/catalog/CatalogTypedefs.hpp
+++ b/catalog/CatalogTypedefs.hpp
@@ -49,6 +49,8 @@ constexpr int kInvalidCatalogId = -1;
// Used to indicate no preference for a NUMA Node ID.
constexpr numa_node_id kAnyNUMANodeID = -1;
+constexpr attribute_id kInvalidAttributeID = -1;
+
/** @} */
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index ae677d9..e3fb520 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -56,13 +56,10 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
AggregationStateFastHashTable *hash_table =
static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
if (key_ids.size() == 1) {
- std::vector<std::vector<attribute_id>> args;
- args.emplace_back(key_ids);
hash_table->upsertValueAccessorFast(
- args, accessor, key_ids[0], true /* check_for_null_keys */);
+ key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
} else {
- std::vector<std::vector<attribute_id>> empty_args;
- empty_args.resize(1);
+ std::vector<attribute_id> empty_args {kInvalidAttributeID};
hash_table->upsertValueAccessorCompositeKeyFast(
empty_args, accessor, key_ids, true /* check_for_null_keys */);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index d2cee6d..4435760 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -130,7 +130,7 @@ class AggregationHandle {
* A StorageBlob will be allocated to serve as the HashTable's
* in-memory storage.
* @return A new HashTable instance with the appropriate state type for this
- * aggregate as the ValueT.
+ * aggregate.
**/
virtual AggregationStateHashTableBase* createGroupByHashTable(
const HashTableImplType hash_table_impl,
@@ -297,7 +297,7 @@ class AggregationHandle {
* in-memory
* storage.
* @return A new HashTable instance with the appropriate state type for this
- * aggregate as the ValueT.
+ * aggregate.
*/
virtual AggregationStateHashTableBase* createDistinctifyHashTable(
const HashTableImplType hash_table_impl,
@@ -357,12 +357,36 @@ class AggregationHandle {
std::size_t index) const = 0;
virtual std::size_t getPayloadSize() const { return 1; }
- virtual void updateState(const std::vector<TypedValue> &arguments,
- std::uint8_t *byte_ptr) const {}
+
+ /**
+ * @brief Update the aggregation state for nullary aggregation function e.g.
+ * COUNT(*).
+ *
+ * @note This function should be overloaded by those aggregation function
+ * which can perform nullary operations, e.g. COUNT.
+ *
+ * @param byte_ptr The pointer where the aggregation state is stored.
+ **/
+ virtual void updateStateNullary(std::uint8_t *byte_ptr) const {}
+
+ /**
+ * @brief Update the aggregation state for unary aggregation function e.g.
+ * SUM(a).
+ *
+ * @param argument The argument which will be used to update the state of the
+ * aggregation function.
+ * @param byte_ptr The pointer where the aggregation state is stored.
+ **/
+ virtual void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const {}
+
virtual void mergeStatesFast(const std::uint8_t *src,
std::uint8_t *dst) const {}
+
virtual void initPayload(std::uint8_t *byte_ptr) const {}
+
virtual void blockUpdate() {}
+
virtual void allowUpdate() {}
protected:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 3e49213..366ba8e 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -141,10 +141,10 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
++(*count_ptr);
}
- inline void updateState(const std::vector<TypedValue> &arguments,
- std::uint8_t *byte_ptr) const override {
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
if (!block_update_) {
- iterateUnaryInlFast(arguments.front(), byte_ptr);
+ iterateUnaryInlFast(argument, byte_ptr);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 2c6d717..9b97590 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -135,13 +135,16 @@ class AggregationHandleCount : public AggregationConcreteHandle {
}
}
- inline void updateState(const std::vector<TypedValue> &arguments,
- std::uint8_t *byte_ptr) const override {
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
if (!block_update_) {
- if (arguments.size())
- iterateUnaryInlFast(arguments.front(), byte_ptr);
- else
- iterateNullaryInlFast(byte_ptr);
+ iterateUnaryInlFast(argument, byte_ptr);
+ }
+ }
+
+ inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
+ if (!block_update_) {
+ iterateNullaryInlFast(byte_ptr);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index de173c9..6c54b9d 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -112,10 +112,10 @@ class AggregationHandleMax : public AggregationConcreteHandle {
compareAndUpdateFast(max_ptr, value);
}
- inline void updateState(const std::vector<TypedValue> &arguments,
- std::uint8_t *byte_ptr) const override {
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
if (!block_update_) {
- iterateUnaryInlFast(arguments.front(), byte_ptr);
+ iterateUnaryInlFast(argument, byte_ptr);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 4a0eca4..9baf736 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -114,10 +114,10 @@ class AggregationHandleMin : public AggregationConcreteHandle {
compareAndUpdateFast(min_ptr, value);
}
- inline void updateState(const std::vector<TypedValue> &arguments,
- std::uint8_t *byte_ptr) const override {
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
if (!block_update_) {
- iterateUnaryInlFast(arguments.front(), byte_ptr);
+ iterateUnaryInlFast(argument, byte_ptr);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index 8d719ab..18d45d9 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -133,10 +133,10 @@ class AggregationHandleSum : public AggregationConcreteHandle {
*null_ptr = false;
}
- inline void updateState(const std::vector<TypedValue> &arguments,
- std::uint8_t *byte_ptr) const override {
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
if (!block_update_) {
- iterateUnaryInlFast(arguments.front(), byte_ptr);
+ iterateUnaryInlFast(argument, byte_ptr);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 130134c..968314e 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1371,13 +1371,9 @@ void ExecutionGenerator::convertAggregate(
}
if (!group_by_types.empty()) {
- // SimplifyHashTableImplTypeProto() switches the hash table implementation
- // from SeparateChaining to SimpleScalarSeparateChaining when there is a
- // single scalar key type with a reversible hash function.
+ // Right now, only SeparateChaining is supported.
aggr_state_proto->set_hash_table_impl_type(
- SimplifyHashTableImplTypeProto(
- HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type),
- group_by_types));
+ serialization::HashTableImplType::SEPARATE_CHAINING);
}
for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
@@ -1404,15 +1400,9 @@ void ExecutionGenerator::convertAggregate(
if (unnamed_aggregate_expression->is_distinct()) {
const std::vector<E::ScalarPtr> &arguments = unnamed_aggregate_expression->getArguments();
DCHECK_GE(arguments.size(), 1u);
- if (group_by_types.empty() && arguments.size() == 1) {
- aggr_state_proto->add_distinctify_hash_table_impl_types(
- SimplifyHashTableImplTypeProto(
- HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type),
- {&arguments[0]->getValueType()}));
- } else {
- aggr_state_proto->add_distinctify_hash_table_impl_types(
- HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type));
- }
+ // Right now only SeparateChaining implementation is supported.
+ aggr_state_proto->add_distinctify_hash_table_impl_types(
+ serialization::HashTableImplType::SEPARATE_CHAINING);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 0138362..6881dea 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -363,8 +363,9 @@ class AggregationOperatorTest : public ::testing::Test {
aggr_state_proto->set_estimated_num_entries(estimated_entries);
// Also need to set the HashTable implementation for GROUP BY.
+ // Right now, only SeparateChaining is supported.
aggr_state_proto->set_hash_table_impl_type(
- serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING);
+ serialization::HashTableImplType::SEPARATE_CHAINING);
// Create Operators.
op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index c5f59f9..e50d133 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -448,12 +448,12 @@ void AggregationOperationState::aggregateBlockHashTable(
AggregationStateHashTableBase *agg_hash_table =
group_by_hashtable_pools_[0]->getHashTableFast();
DCHECK(agg_hash_table != nullptr);
- block->aggregateGroupByFast(arguments_,
- group_by_list_,
- predicate_.get(),
- agg_hash_table,
- &reuse_matches,
- &reuse_group_by_vectors);
+ block->aggregateGroupBy(arguments_,
+ group_by_list_,
+ predicate_.get(),
+ agg_hash_table,
+ &reuse_matches,
+ &reuse_group_by_vectors);
group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/storage/FastHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp
index f1e8d1a..4a95cd9 100644
--- a/storage/FastHashTable.hpp
+++ b/storage/FastHashTable.hpp
@@ -456,7 +456,7 @@ class FastHashTable : public HashTableBase<resizable,
* not be inserted).
**/
bool upsertValueAccessorFast(
- const std::vector<std::vector<attribute_id>> &argument_ids,
+ const std::vector<attribute_id> &argument_ids,
ValueAccessor *accessor,
const attribute_id key_attr_id,
const bool check_for_null_keys);
@@ -509,7 +509,7 @@ class FastHashTable : public HashTableBase<resizable,
* not be inserted).
**/
bool upsertValueAccessorCompositeKeyFast(
- const std::vector<std::vector<attribute_id>> &argument,
+ const std::vector<attribute_id> &argument,
ValueAccessor *accessor,
const std::vector<attribute_id> &key_attr_ids,
const bool check_for_null_keys) override;
@@ -1866,13 +1866,12 @@ bool FastHashTable<resizable,
force_key_copy,
allow_duplicate_keys>::
upsertValueAccessorFast(
- const std::vector<std::vector<attribute_id>> &argument_ids,
+ const std::vector<attribute_id> &argument_ids,
ValueAccessor *accessor,
const attribute_id key_attr_id,
const bool check_for_null_keys) {
DEBUG_ASSERT(!allow_duplicate_keys);
std::size_t variable_size;
- std::vector<TypedValue> local;
return InvokeOnAnyValueAccessor(
accessor,
[&](auto *accessor) -> bool { // NOLINT(build/c++11)
@@ -1898,13 +1897,14 @@ bool FastHashTable<resizable,
} else {
SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
for (unsigned int k = 0; k < num_handles_; ++k) {
- local.clear();
- if (argument_ids[k].size()) {
- local.emplace_back(
- accessor->getTypedValue(argument_ids[k].front()));
+ if (argument_ids[k] != kInvalidAttributeID) {
+ handles_[k]->updateStateUnary(
+ accessor->getTypedValue(argument_ids[k]),
+ value + payload_offsets_[k]);
+ } else {
+ handles_[k]->updateStateNullary(value +
+ payload_offsets_[k]);
}
- handles_[k]->updateState(local,
- value + payload_offsets_[k]);
}
}
}
@@ -1929,12 +1929,14 @@ bool FastHashTable<resizable,
} else {
SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
for (unsigned int k = 0; k < num_handles_; ++k) {
- local.clear();
- if (argument_ids[k].size()) {
- local.emplace_back(
- accessor->getTypedValue(argument_ids[k].front()));
+ if (argument_ids[k] != kInvalidAttributeID) {
+ handles_[k]->updateStateUnary(
+ accessor->getTypedValue(argument_ids[k]),
+ value + payload_offsets_[k]);
+ } else {
+ handles_[k]->updateStateNullary(value +
+ payload_offsets_[k]);
}
- handles_[k]->updateState(local, value + payload_offsets_[k]);
}
}
}
@@ -1953,7 +1955,7 @@ bool FastHashTable<resizable,
force_key_copy,
allow_duplicate_keys>::
upsertValueAccessorCompositeKeyFast(
- const std::vector<std::vector<attribute_id>> &argument_ids,
+ const std::vector<attribute_id> &argument_ids,
ValueAccessor *accessor,
const std::vector<attribute_id> &key_attr_ids,
const bool check_for_null_keys) {
@@ -1961,7 +1963,6 @@ bool FastHashTable<resizable,
std::size_t variable_size;
std::vector<TypedValue> key_vector;
key_vector.resize(key_attr_ids.size());
- std::vector<TypedValue> local;
return InvokeOnAnyValueAccessor(
accessor,
[&](auto *accessor) -> bool { // NOLINT(build/c++11)
@@ -1989,13 +1990,14 @@ bool FastHashTable<resizable,
} else {
SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
for (unsigned int k = 0; k < num_handles_; ++k) {
- local.clear();
- if (argument_ids[k].size()) {
- local.emplace_back(
- accessor->getTypedValue(argument_ids[k].front()));
+ if (argument_ids[k] != kInvalidAttributeID) {
+ handles_[k]->updateStateUnary(
+ accessor->getTypedValue(argument_ids[k]),
+ value + payload_offsets_[k]);
+ } else {
+ handles_[k]->updateStateNullary(value +
+ payload_offsets_[k]);
}
- handles_[k]->updateState(local,
- value + payload_offsets_[k]);
}
}
}
@@ -2022,12 +2024,14 @@ bool FastHashTable<resizable,
} else {
SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
for (unsigned int k = 0; k < num_handles_; ++k) {
- local.clear();
- if (argument_ids[k].size()) {
- local.emplace_back(
- accessor->getTypedValue(argument_ids[k].front()));
+ if (argument_ids[k] != kInvalidAttributeID) {
+ handles_[k]->updateStateUnary(
+ accessor->getTypedValue(argument_ids[k]),
+ value + payload_offsets_[k]);
+ } else {
+ handles_[k]->updateStateNullary(value +
+ payload_offsets_[k]);
}
- handles_[k]->updateState(local, value + payload_offsets_[k]);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/storage/FastHashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTableFactory.hpp b/storage/FastHashTableFactory.hpp
index 6ad3212..dc4f893 100644
--- a/storage/FastHashTableFactory.hpp
+++ b/storage/FastHashTableFactory.hpp
@@ -90,30 +90,6 @@ class FastHashTableFactory {
serializable,
force_key_copy,
allow_duplicate_keys>(key_types, num_entries, payload_sizes, handles, storage_manager);
- case HashTableImplType::kLinearOpenAddressing:
-/* return new LinearOpenAddressingHashTable<
- ValueT,
- resizable,
- serializable,
- force_key_copy,
- allow_duplicate_keys>(key_types, num_entries, storage_manager);*/
- return new FastSeparateChainingHashTable<
- resizable,
- serializable,
- force_key_copy,
- allow_duplicate_keys>(key_types, num_entries, payload_sizes, handles, storage_manager);
- case HashTableImplType::kSimpleScalarSeparateChaining:
- return new FastSeparateChainingHashTable<
- resizable,
- serializable,
- force_key_copy,
- allow_duplicate_keys>(key_types, num_entries, payload_sizes, handles, storage_manager);
-/* return new SimpleScalarSeparateChainingHashTable<
- ValueT,
- resizable,
- serializable,
- force_key_copy,
- allow_duplicate_keys>(key_types, num_entries, storage_manager);*/
default: {
LOG(FATAL) << "Unrecognized HashTableImplType in HashTableFactory::createResizable()\n";
}
@@ -167,28 +143,6 @@ class FastHashTableFactory {
hash_table_memory_size,
new_hash_table,
hash_table_memory_zeroed);
- case HashTableImplType::kLinearOpenAddressing:
-/* return new LinearOpenAddressingHashTable<
- ValueT,
- resizable,
- serializable,
- force_key_copy,
- allow_duplicate_keys>(key_types,
- hash_table_memory,
- hash_table_memory_size,
- new_hash_table,
- hash_table_memory_zeroed);*/
- case HashTableImplType::kSimpleScalarSeparateChaining:
-/* return new SimpleScalarSeparateChainingHashTable<
- ValueT,
- resizable,
- serializable,
- force_key_copy,
- allow_duplicate_keys>(key_types,
- hash_table_memory,
- hash_table_memory_size,
- new_hash_table,
- hash_table_memory_zeroed);*/
default: {
LOG(FATAL) << "Unrecognized HashTableImplType\n";
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index b908d6f..cd0a141 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -74,8 +74,25 @@ class HashTableBase {
public:
virtual ~HashTableBase() {}
+ /**
+ * TODO(harshad) We should get rid of this function from here. We are
+ * postponing it because of the amount of work to be done is significant.
+ * The steps are as follows:
+ * 1. Replace AggregationStateHashTableBase occurence in HashTablePool to
+ * the FastHashTable implementation (i.e. an implementation specialized for
+ * aggregation).
+ * 2. Remove createGroupByHashTable from the AggregationHandle* classes.
+ * 3. Replace AggregationStateHashTableBase occurences in AggregationHandle*
+ * clases to the FastHashTable implementation (i.e. an implementation
+ * specialized for aggregation).
+ * 4. Move this method to the FastHashTable class from here, so that it can
+ * be called from the AggregationHandle* classes.
+ *
+ * Optionally, we can also remove the AggregationStateHashTableBase
+ * specialization from this file.
+ **/
virtual bool upsertValueAccessorCompositeKeyFast(
- const std::vector<std::vector<attribute_id>> &argument,
+ const std::vector<attribute_id> &argument,
ValueAccessor *accessor,
const std::vector<attribute_id> &key_attr_ids,
const bool check_for_null_keys) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 8ff18b5..ec5990f 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -415,87 +415,6 @@ AggregationState* StorageBlock::aggregate(
}
void StorageBlock::aggregateGroupBy(
- const AggregationHandle &handle,
- const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
- AggregationStateHashTableBase *hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
- std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
- DCHECK_GT(group_by.size(), 0u)
- << "Called aggregateGroupBy() with zero GROUP BY expressions";
-
- SubBlocksReference sub_blocks_ref(*tuple_store_,
- indices_,
- indices_consistent_);
-
- // IDs of 'arguments' as attributes in the ValueAccessor we create below.
- std::vector<attribute_id> argument_ids;
-
- // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
- std::vector<attribute_id> key_ids;
-
- // An intermediate ValueAccessor that stores the materialized 'arguments' for
- // this aggregate, as well as the GROUP BY expression values.
- ColumnVectorsValueAccessor temp_result;
- {
- std::unique_ptr<ValueAccessor> accessor;
- if (predicate) {
- if (!*reuse_matches) {
- // If there is a filter predicate that hasn't already been evaluated,
- // evaluate it now and save the results for other aggregates on this
- // same block.
- reuse_matches->reset(getMatchesForPredicate(predicate));
- }
-
- // Create a filtered ValueAccessor that only iterates over predicate
- // matches.
- accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
- } else {
- // Create a ValueAccessor that iterates over all tuples in this block
- accessor.reset(tuple_store_->createValueAccessor());
- }
-
- attribute_id attr_id = 0;
-
- // First, put GROUP BY keys into 'temp_result'.
- if (reuse_group_by_vectors->empty()) {
- // Compute GROUP BY values from group_by Scalars, and store them in
- // reuse_group_by_vectors for reuse by other aggregates on this same
- // block.
- reuse_group_by_vectors->reserve(group_by.size());
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
- reuse_group_by_vectors->emplace_back(
- group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
- temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
- key_ids.push_back(attr_id++);
- }
- } else {
- // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
- DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
- << "Wrong number of reuse_group_by_vectors";
- for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
- temp_result.addColumn(reuse_cv.get(), false);
- key_ids.push_back(attr_id++);
- }
- }
-
- // Compute argument vectors and add them to 'temp_result'.
- for (const std::unique_ptr<const Scalar> &argument : arguments) {
- temp_result.addColumn(argument->getAllValues(accessor.get(), &sub_blocks_ref));
- argument_ids.push_back(attr_id++);
- }
- }
-
- // Actually do aggregation into '*hash_table'.
- handle.aggregateValueAccessorIntoHashTable(&temp_result,
- argument_ids,
- key_ids,
- hash_table);
-}
-
-
-void StorageBlock::aggregateGroupByFast(
const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
const Predicate *predicate,
@@ -510,8 +429,7 @@ void StorageBlock::aggregateGroupByFast(
indices_consistent_);
// IDs of 'arguments' as attributes in the ValueAccessor we create below.
- std::vector<attribute_id> arg_ids;
- std::vector<std::vector<attribute_id>> argument_ids;
+ std::vector<attribute_id> argument_ids;
// IDs of GROUP BY key element(s) in the ValueAccessor we create below.
std::vector<attribute_id> key_ids;
@@ -563,12 +481,13 @@ void StorageBlock::aggregateGroupByFast(
// Compute argument vectors and add them to 'temp_result'.
for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
- arg_ids.clear();
for (const std::unique_ptr<const Scalar> &args : argument) {
temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
- arg_ids.push_back(attr_id++);
+ argument_ids.push_back(attr_id++);
+ }
+ if (argument.empty()) {
+ argument_ids.push_back(kInvalidAttributeID);
}
- argument_ids.push_back(arg_ids);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/06f39905/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 8b59a3c..398008e 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -459,23 +459,14 @@ class StorageBlock : public StorageBlockBase {
* attributes as std::vector<attribute_id> (like in selectSimple()) for fast
* path when there are no expressions specified in the query.
*/
- void aggregateGroupBy(const AggregationHandle &handle,
- const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
- AggregationStateHashTableBase *hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
- std::vector<std::unique_ptr<ColumnVector>>
- *reuse_group_by_vectors) const;
-
-
- void aggregateGroupByFast(const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const Predicate *predicate,
- AggregationStateHashTableBase *hash_table,
- std::unique_ptr<TupleIdSequence> *reuse_matches,
- std::vector<std::unique_ptr<ColumnVector>>
- *reuse_group_by_vectors) const;
+ void aggregateGroupBy(
+ const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const Predicate *predicate,
+ AggregationStateHashTableBase *hash_table,
+ std::unique_ptr<TupleIdSequence> *reuse_matches,
+ std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
+
/**
* @brief Inserts the GROUP BY expressions and aggregation arguments together
* as keys into the distinctify hash table.