You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/17 21:56:02 UTC
[4/6] incubator-quickstep git commit: Updates
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index cbbfc22..9fa3bd2 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -34,6 +34,7 @@
#include "storage/HashTablePool.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/ScopedBuffer.hpp"
namespace quickstep {
@@ -167,8 +168,8 @@ class AggregationOperationState {
**/
void finalizeAggregate(InsertDestination *output_destination);
- static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
- AggregationStateHashTableBase *dst);
+ static void mergeGroupByHashTables(AggregationStateHashTableBase *destination_hash_table,
+ const AggregationStateHashTableBase *source_hash_table);
int dflag;
@@ -176,7 +177,7 @@ class AggregationOperationState {
// Merge locally (per storage block) aggregated states with global aggregation
// states.
void mergeSingleState(
- const std::vector<std::unique_ptr<AggregationState>> &local_state);
+ const std::vector<ScopedBuffer> &local_state);
// Aggregate on input block.
void aggregateBlockSingleState(const block_id input_block);
@@ -201,10 +202,6 @@ class AggregationOperationState {
// arguments.
std::vector<bool> is_distinct_;
- // Hash table for obtaining distinct (i.e. unique) arguments.
- std::vector<std::unique_ptr<AggregationStateHashTableBase>>
- distinctify_hashtables_;
-
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all an aggregate's argument expressions are simply attributes in
// 'input_relation_', then this caches the attribute IDs of those arguments.
@@ -212,14 +209,7 @@ class AggregationOperationState {
#endif
// Per-aggregate global states for aggregation without GROUP BY.
- std::vector<std::unique_ptr<AggregationState>> single_states_;
-
- // Per-aggregate HashTables for aggregation with GROUP BY.
- //
- // TODO(shoban): We should ideally store the aggregation state together in one
- // hash table to prevent multiple lookups.
- std::vector<std::unique_ptr<AggregationStateHashTableBase>>
- group_by_hashtables_;
+ std::vector<ScopedBuffer> single_states_;
// A vector of group by hash table pools.
std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationResultIterator.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationResultIterator.hpp b/storage/AggregationResultIterator.hpp
new file mode 100644
index 0000000..259c533
--- /dev/null
+++ b/storage/AggregationResultIterator.hpp
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_AGGREGATION_RESULT_ITERATOR_HPP_
+#define QUICKSTEP_STORAGE_AGGREGATION_RESULT_ITERATOR_HPP_
+
+#include <cstddef>
+#include <vector>
+
+#include "storage/AggregationStateManager.hpp"
+#include "storage/HashTableUntypedKeyManager.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ * @{
+ */
+
+class AggregationResultIterator {
+ public:
+ AggregationResultIterator(const void *buckets,
+ const std::size_t bucket_size,
+ const std::size_t num_entries,
+ const HashTableUntypedKeyManager &key_manager,
+ const AggregationStateManager<false> &state_manager)
+ : buckets_(buckets),
+ bucket_size_(bucket_size),
+ num_entries_(num_entries),
+ key_manager_(key_manager),
+ state_manager_(state_manager) {}
+
+ inline std::size_t getKeySize() const {
+ return key_manager_.getFixedKeySize();
+ }
+
+ inline std::size_t getResultsSize() const {
+ return state_manager_.getResultsSizeInBytes();
+ }
+
+ inline void beginIteration() {
+ current_position_ = std::numeric_limits<std::size_t>::max();
+ }
+
+ inline bool iterationFinished() const {
+ return current_position_ + 1 >= num_entries_;
+ }
+
+ inline bool next() {
+ ++current_position_;
+ return current_position_ < num_entries_;
+ }
+
+ inline void previous() {
+ --current_position_;
+ }
+
+ inline void writeKeyTo(void *destination) const {
+ key_manager_.copyUntypedKey(
+ destination,
+ key_manager_.getUntypedKeyComponent(getCurrentBucket()));
+ }
+
+ inline void writeResultsTo(void *destination) const {
+ state_manager_.finalizeStates(destination, getCurrentBucket());
+ }
+
+ private:
+ inline const void* getCurrentBucket() const {
+ return static_cast<const char *>(buckets_) + current_position_ * bucket_size_;
+ }
+
+ friend class ThreadPrivateAggregationStateHashTable;
+
+ std::size_t current_position_;
+
+ const void *buckets_;
+ const std::size_t bucket_size_;
+ const std::size_t num_entries_;
+ const HashTableUntypedKeyManager &key_manager_;
+ const AggregationStateManager<false> &state_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(AggregationResultIterator);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_AGGREGATION_RESULT_ITERATOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationStateHashTable.hpp b/storage/AggregationStateHashTable.hpp
new file mode 100644
index 0000000..85a6bdc
--- /dev/null
+++ b/storage/AggregationStateHashTable.hpp
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_AGGREGATION_STATE_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_AGGREGATION_STATE_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdlib>
+#include <cstring>
+#include <limits>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/AggregationResultIterator.hpp"
+#include "storage/AggregationStateManager.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/HashTableUntypedKeyManager.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "threading/SpinMutex.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFunctors.hpp"
+#include "utility/Alignment.hpp"
+#include "utility/InlineMemcpy.hpp"
+#include "utility/Macros.hpp"
+#include "utility/PrimeNumber.hpp"
+#include "utility/ScopedBuffer.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ * @{
+ */
+
+class ThreadPrivateAggregationStateHashTable : public AggregationStateHashTableBase {
+ public:
+ ThreadPrivateAggregationStateHashTable(const std::vector<const Type *> &key_types,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager)
+ : payload_manager_(handles),
+ key_types_(key_types),
+ key_manager_(this->key_types_, payload_manager_.getStatesSizeInBytes()),
+ slots_(num_entries * kHashTableLoadFactor,
+ key_manager_.getUntypedKeyHashFunctor(),
+ key_manager_.getUntypedKeyEqualityFunctor()),
+ bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize(),
+ payload_manager_.getStatesSizeInBytes())),
+ buckets_allocated_(0),
+ storage_manager_(storage_manager) {
+ std::size_t num_storage_slots =
+ this->storage_manager_->SlotsNeededForBytes(num_entries);
+
+ // Get a StorageBlob to hold the hash table.
+ const block_id blob_id = this->storage_manager_->createBlob(num_storage_slots);
+ this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
+
+ buckets_ = this->blob_->getMemoryMutable();
+ num_buckets_ = num_storage_slots * kSlotSizeBytes / bucket_size_;
+ }
+
+ ~ThreadPrivateAggregationStateHashTable() {}
+
+ inline std::size_t numEntries() const {
+ return buckets_allocated_;
+ }
+
+ inline std::size_t getKeySizeInBytes() const {
+ return key_manager_.getFixedKeySize();
+ }
+
+ inline std::size_t getStatesSizeInBytes() const {
+ return payload_manager_.getStatesSizeInBytes();
+ }
+
+ inline std::size_t getResultsSizeInBytes() const {
+ return payload_manager_.getResultsSizeInBytes();
+ }
+
+ AggregationResultIterator* createResultIterator() const override {
+ return new AggregationResultIterator(buckets_,
+ bucket_size_,
+ buckets_allocated_,
+ key_manager_,
+ payload_manager_);
+ }
+
+ bool upsertValueAccessor(ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const std::vector<attribute_id> &argument_ids) override {
+ if (key_manager_.isKeyNullable()) {
+ return upsertValueAccessorInternal<true>(
+ accessor, key_attr_id, argument_ids);
+ } else {
+ return upsertValueAccessorInternal<false>(
+ accessor, key_attr_id, argument_ids);
+ }
+ }
+
+ template <bool check_for_null_keys>
+ bool upsertValueAccessorInternal(ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const std::vector<attribute_id> &argument_ids) {
+ return InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> bool { // NOLINT(build/c++11)
+ accessor->beginIteration();
+ while (accessor->next()) {
+ const void *key = accessor->template getUntypedValue<check_for_null_keys>(key_attr_id);
+ if (check_for_null_keys && key == nullptr) {
+ continue;
+ }
+ bool is_empty;
+ void *bucket = locateBucket(key, &is_empty);
+ if (is_empty) {
+ payload_manager_.initializeStates(bucket);
+ } else {
+ payload_manager_.template updateStates<check_for_null_keys>(
+ bucket, accessor, argument_ids);
+ }
+ }
+ return true;
+ });
+ }
+
+ bool upsertValueAccessorCompositeKey(ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const std::vector<attribute_id> &argument_ids) override {
+ if (key_manager_.isKeyNullable()) {
+ return upsertValueAccessorCompositeKeyInternal<true>(
+ accessor, key_attr_ids, argument_ids);
+ } else {
+ return upsertValueAccessorCompositeKeyInternal<false>(
+ accessor, key_attr_ids, argument_ids);
+ }
+ }
+
+ template <bool check_for_null_keys>
+ bool upsertValueAccessorCompositeKeyInternal(ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const std::vector<attribute_id> &argument_ids) {
+ return InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> bool { // NOLINT(build/c++11)
+ accessor->beginIteration();
+ void *prealloc_bucket = allocateBucket();
+ while (accessor->next()) {
+ if (check_for_null_keys) {
+ const bool is_null =
+ key_manager_.writeNullableUntypedKeyFromValueAccessorToBucket(
+ accessor,
+ key_attr_ids,
+ prealloc_bucket);
+ if (is_null) {
+ continue;
+ }
+ } else {
+ key_manager_.writeUntypedKeyFromValueAccessorToBucket(
+ accessor,
+ key_attr_ids,
+ prealloc_bucket);
+ }
+ void *bucket = locateBucketWithPrealloc(prealloc_bucket);
+ if (bucket == prealloc_bucket) {
+ payload_manager_.initializeStates(bucket);
+ prealloc_bucket = allocateBucket();
+ } else {
+ payload_manager_.template updateStates<check_for_null_keys>(
+ bucket, accessor, argument_ids);
+ }
+ }
+ // Reclaim the last unused bucket
+ --buckets_allocated_;
+ return true;
+ });
+ }
+
+ void mergeHashTable(const ThreadPrivateAggregationStateHashTable *source_hash_table) {
+ source_hash_table->forEachKeyAndStates(
+ [&](const void *source_key, const void *source_states) -> void {
+ bool is_empty;
+ void *bucket = locateBucket(source_key, &is_empty);
+ if (is_empty) {
+ payload_manager_.copyStates(bucket, source_states);
+ } else {
+ payload_manager_.mergeStates(bucket, source_states);
+ }
+ });
+ }
+
+ template <typename FunctorT>
+ inline void forEachKey(const FunctorT &functor) const {
+ for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+ functor(key_manager_.getUntypedKeyComponent(locateBucket(i)));
+ }
+ }
+
+ template <typename FunctorT>
+ inline void forEachKeyAndStates(const FunctorT &functor) const {
+ for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+ const char *bucket = static_cast<const char *>(locateBucket(i));
+ functor(key_manager_.getUntypedKeyComponent(bucket), bucket);
+ }
+ }
+
+ inline void* locateBucket(const std::size_t bucket_id) const {
+ return static_cast<char *>(buckets_) + bucket_id * bucket_size_;
+ }
+
+ inline void* locateBucket(const void *key, bool *is_empty) {
+ auto slot_it = slots_.find(key);
+ if (slot_it == slots_.end()) {
+ void *bucket = allocateBucket();
+ key_manager_.writeUntypedKeyToBucket(key, bucket);
+ slots_.emplace(key_manager_.getUntypedKeyComponent(bucket), bucket);
+ *is_empty = true;
+ return bucket;
+ } else {
+ *is_empty = false;
+ return slot_it->second;
+ }
+ }
+
+ inline void* locateBucketWithPrealloc(void *prealloc_bucket) {
+ const void *key = key_manager_.getUntypedKeyComponent(prealloc_bucket);
+ auto slot_it = slots_.find(key);
+ if (slot_it == slots_.end()) {
+ slots_.emplace(key, prealloc_bucket);
+ return prealloc_bucket;
+ } else {
+ return slot_it->second;
+ }
+ }
+
+ inline void* allocateBucket() {
+ if (buckets_allocated_ >= num_buckets_) {
+ resize();
+ }
+ void *bucket = locateBucket(buckets_allocated_);
+ ++buckets_allocated_;
+ return bucket;
+ }
+
+ void resize() {
+ const std::size_t resized_memory_required = num_buckets_ * bucket_size_ * 2;
+ const std::size_t resized_storage_slots =
+ this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
+ const block_id resized_blob_id =
+ this->storage_manager_->createBlob(resized_storage_slots);
+ MutableBlobReference resized_blob =
+ this->storage_manager_->getBlobMutable(resized_blob_id);
+
+ void *resized_buckets = resized_blob->getMemoryMutable();
+ std::memcpy(resized_buckets, buckets_, buckets_allocated_ * bucket_size_);
+
+ for (auto &pair : slots_) {
+ pair.second =
+ (static_cast<const char *>(pair.first) - static_cast<char *>(buckets_))
+ + static_cast<char *>(resized_buckets);
+ }
+
+ buckets_ = resized_buckets;
+ num_buckets_ = resized_storage_slots * kSlotSizeBytes / bucket_size_;
+ std::swap(this->blob_, resized_blob);
+ }
+
+ void print() const override {
+ std::cerr << "Bucket size = " << bucket_size_ << "\n";
+ std::cerr << "Buckets: \n";
+ for (const auto &pair : slots_) {
+ std::cerr << pair.first << " -- " << pair.second << "\n";
+ std::cerr << *static_cast<const int *>(pair.second) << "\n";
+ }
+ }
+
+ private:
+ // Helper object to manage hash table payloads (i.e. aggregation states).
+ AggregationStateManager<false> payload_manager_;
+
+ // Type(s) of keys.
+ const std::vector<const Type*> key_types_;
+
+ // Helper object to manage key storage.
+ HashTableUntypedKeyManager key_manager_;
+
+ // Round bucket size up to a multiple of kBucketAlignment.
+ static std::size_t ComputeBucketSize(const std::size_t fixed_key_size,
+ const std::size_t total_payload_size) {
+ constexpr std::size_t kBucketAlignment = 4;
+ return (((fixed_key_size + total_payload_size - 1)
+ / kBucketAlignment) + 1) * kBucketAlignment;
+ }
+
+ std::unordered_map<const void *, void *,
+ UntypedHashFunctor,
+ UntypedEqualityFunctor> slots_;
+
+ void *buckets_;
+ const std::size_t bucket_size_;
+ std::size_t num_buckets_;
+ std::size_t buckets_allocated_;
+
+ StorageManager *storage_manager_;
+ MutableBlobReference blob_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPrivateAggregationStateHashTable);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_AGGREGATION_STATE_HASH_TABLE_HPP_
+
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationStateManager.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationStateManager.hpp b/storage/AggregationStateManager.hpp
new file mode 100644
index 0000000..98dca90
--- /dev/null
+++ b/storage/AggregationStateManager.hpp
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_AGGREGATION_STATE_MANAGER_HPP_
+#define QUICKSTEP_STORAGE_AGGREGATION_STATE_MANAGER_HPP_
+
+#include <cstddef>
+#include <cstring>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "threading/SpinMutex.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "utility/InlineMemcpy.hpp"
+#include "utility/Macros.hpp"
+#include "utility/ScopedBuffer.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ * @{
+ */
+
+template <bool use_mutex>
+class AggregationStateManager {
+ public:
+ AggregationStateManager(const std::vector<AggregationHandle *> &handles)
+ : handles_(handles),
+ states_size_in_bytes_(0),
+ results_size_in_bytes_(0) {
+ if (use_mutex) {
+ states_size_in_bytes_ += sizeof(SpinMutex);
+ }
+ for (const AggregationHandle *handle : handles) {
+ const std::size_t state_size = handle->getStateSize();
+ state_sizes_.emplace_back(state_size);
+ state_offsets_.emplace_back(states_size_in_bytes_);
+ states_size_in_bytes_ += state_size;
+
+ const std::size_t result_size = handle->getResultSize();
+ result_sizes_.emplace_back(result_size);
+ result_offsets_.emplace_back(results_size_in_bytes_);
+ results_size_in_bytes_ += result_size;
+
+ accumulate_functors_.emplace_back(handle->getStateAccumulateFunctor());
+ merge_functors_.emplace_back(handle->getStateMergeFunctor());
+ finalize_functors_.emplace_back(handle->getStateFinalizeFunctor());
+ }
+
+ initial_states_.reset(states_size_in_bytes_, false);
+ if (use_mutex) {
+ new(initial_states_.get()) Mutex;
+ }
+ for (std::size_t i = 0; i < handles_.size(); ++i) {
+ handles_[i]->initializeState(
+ static_cast<char *>(initial_states_.get()) + state_offsets_[i]);
+ }
+ }
+
+ inline std::size_t getStatesSizeInBytes() const {
+ return states_size_in_bytes_;
+ }
+
+ inline std::size_t getResultsSizeInBytes() const {
+ return results_size_in_bytes_;
+ }
+
+ inline void initializeStates(void *states) const {
+ copyStates(states, initial_states_.get());
+ }
+
+ template <bool check_for_null_keys, typename ValueAccessorT>
+ inline void updateState(void *states,
+ ValueAccessorT *accessor,
+ const attribute_id argument_id) const {
+ // TODO: templates on whether to check invalid attribute id
+ DCHECK_NE(argument_id, kInvalidAttributeID);
+
+ const void *value =
+ accessor->template getUntypedValue<check_for_null_keys>(argument_id);
+ if (check_for_null_keys && value == nullptr) {
+ return;
+ }
+ accumulate_functors_.front()(states, value);
+ }
+
+ template <bool check_for_null_keys, typename ValueAccessorT>
+ inline void updateStates(void *states,
+ ValueAccessorT *accessor,
+ const std::vector<attribute_id> &argument_ids) const {
+ for (std::size_t i = 0; i < argument_ids.size(); ++i) {
+ // TODO: templates on whether to check invalid attribute id
+ DCHECK_NE(argument_ids[i], kInvalidAttributeID);
+
+ const void *value =
+ accessor->template getUntypedValue<check_for_null_keys>(argument_ids[i]);
+ if (check_for_null_keys && value == nullptr) {
+ return;
+ }
+ accumulate_functors_[i](getStateComponent(states, i), value);
+ }
+ }
+
+ inline void copyStates(void *destination_states,
+ const void *source_states) const {
+ InlineMemcpy(destination_states, source_states, states_size_in_bytes_);
+ }
+
+ inline void mergeStates(void *destination_states,
+ const void *source_states) const {
+ for (std::size_t i = 0; i < merge_functors_.size(); ++i) {
+ merge_functors_[i](getStateComponent(destination_states, i),
+ getStateComponent(source_states, i));
+ }
+ }
+
+ inline void finalizeStates(void *results, const void *states) const {
+ for (std::size_t i = 0; i < merge_functors_.size(); ++i) {
+ finalize_functors_[i](getResultComponent(results, i),
+ getStateComponent(states, i));
+ }
+ }
+
+ inline const void* getStateComponent(const void *states,
+ const std::size_t component_id) const {
+ return static_cast<const char *>(states) + state_offsets_[component_id];
+ }
+
+ inline void* getStateComponent(void *states,
+ const std::size_t component_id) const {
+ return static_cast<char *>(states) + state_offsets_[component_id];
+ }
+
+ inline void* getResultComponent(void *results,
+ const std::size_t component_id) const {
+ return static_cast<char *>(results) + result_offsets_[component_id];
+ }
+
+ private:
+ std::vector<AggregationHandle *> handles_;
+
+ std::vector<std::size_t> state_sizes_;
+ std::vector<std::size_t> state_offsets_;
+ std::size_t states_size_in_bytes_;
+
+ std::vector<std::size_t> result_sizes_;
+ std::vector<std::size_t> result_offsets_;
+ std::size_t results_size_in_bytes_;
+
+ std::vector<AggregationStateAccumulateFunctor> accumulate_functors_;
+ std::vector<AggregationStateMergeFunctor> merge_functors_;
+ std::vector<AggregationStateFinalizeFunctor> finalize_functors_;
+
+ ScopedBuffer initial_states_;
+
+ DISALLOW_COPY_AND_ASSIGN(AggregationStateManager);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_AGGREGATION_STATE_MANAGER_HPP_
+
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index bdc7596..0aaaca4 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -145,11 +145,13 @@ if (ENABLE_DISTRIBUTED)
endif()
# Declare micro-libs:
-add_library(quickstep_storage_AggregationHashTable ../empty_src.cpp AggregationHashTable.hpp)
+add_library(quickstep_storage_AggregationStateHashTable ../empty_src.cpp AggregationStateHashTable.hpp)
+add_library(quickstep_storage_AggregationStateManager ../empty_src.cpp AggregationStateManager.hpp)
add_library(quickstep_storage_AggregationOperationState
AggregationOperationState.cpp
AggregationOperationState.hpp)
add_library(quickstep_storage_AggregationOperationState_proto ${storage_AggregationOperationState_proto_srcs})
+add_library(quickstep_storage_AggregationResultIterator ../empty_src.cpp AggregationResultIterator.hpp)
add_library(quickstep_storage_BasicColumnStoreTupleStorageSubBlock
BasicColumnStoreTupleStorageSubBlock.cpp
BasicColumnStoreTupleStorageSubBlock.hpp)
@@ -199,9 +201,6 @@ if (ENABLE_DISTRIBUTED)
endif()
add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
-add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp)
-add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp)
-add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp)
add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
@@ -271,9 +270,11 @@ add_library(quickstep_storage_WindowAggregationOperationState_proto ${storage_Wi
# Link dependencies:
-target_link_libraries(quickstep_storage_AggregationHashTable
+target_link_libraries(quickstep_storage_AggregationStateHashTable
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_AggregationResultIterator
+ quickstep_storage_AggregationStateManager
quickstep_storage_HashTableBase
quickstep_storage_HashTableUntypedKeyManager
quickstep_storage_StorageBlob
@@ -290,8 +291,18 @@ target_link_libraries(quickstep_storage_AggregationHashTable
quickstep_types_TypedValue
quickstep_utility_Alignment
quickstep_utility_HashPair
+ quickstep_utility_InlineMemcpy
quickstep_utility_Macros
- quickstep_utility_PrimeNumber)
+ quickstep_utility_PrimeNumber
+ quickstep_utility_ScopedBuffer)
+target_link_libraries(quickstep_storage_AggregationStateManager
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_aggregation_AggregationHandle
+ quickstep_threading_SpinMutex
+ quickstep_threading_SpinSharedMutex
+ quickstep_utility_InlineMemcpy
+ quickstep_utility_Macros
+ quickstep_utility_ScopedBuffer)
target_link_libraries(quickstep_storage_AggregationOperationState
glog
quickstep_catalog_CatalogDatabaseLite
@@ -302,13 +313,11 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_expressions_aggregation_AggregateFunction
quickstep_expressions_aggregation_AggregateFunctionFactory
quickstep_expressions_aggregation_AggregationHandle
- quickstep_expressions_aggregation_AggregationHandleDistinct
quickstep_expressions_aggregation_AggregationID
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
- quickstep_storage_AggregationHashTable
+ quickstep_storage_AggregationStateHashTable
quickstep_storage_AggregationOperationState_proto
- quickstep_storage_HashTable
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
quickstep_storage_HashTablePool
@@ -321,12 +330,17 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_types_containers_ColumnVector
quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_types_containers_Tuple
- quickstep_utility_Macros)
+ quickstep_utility_Macros
+ quickstep_utility_ScopedBuffer)
target_link_libraries(quickstep_storage_AggregationOperationState_proto
quickstep_expressions_Expressions_proto
quickstep_expressions_aggregation_AggregateFunction_proto
quickstep_storage_HashTable_proto
${PROTOBUF_LIBRARY})
+target_link_libraries(quickstep_storage_AggregationResultIterator
+ quickstep_storage_AggregationStateManager
+ quickstep_storage_HashTableUntypedKeyManager
+ quickstep_utility_Macros)
target_link_libraries(quickstep_storage_BasicColumnStoreTupleStorageSubBlock
quickstep_catalog_CatalogAttribute
quickstep_catalog_CatalogRelationSchema
@@ -654,53 +668,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy
quickstep_threading_SpinMutex
quickstep_threading_SpinSharedMutex
quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTable
- quickstep_catalog_CatalogTypedefs
- quickstep_storage_HashTableBase
- quickstep_storage_StorageBlob
- quickstep_storage_StorageBlockInfo
- quickstep_storage_StorageConstants
- quickstep_storage_StorageManager
- quickstep_storage_TupleReference
- quickstep_storage_ValueAccessor
- quickstep_storage_ValueAccessorUtil
- quickstep_threading_SpinMutex
- quickstep_threading_SpinSharedMutex
- quickstep_types_Type
- quickstep_types_TypedValue
- quickstep_utility_BloomFilter
- quickstep_utility_HashPair
- quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTableFactory
- glog
- quickstep_storage_FastHashTable
- quickstep_storage_FastSeparateChainingHashTable
- quickstep_storage_HashTable
- quickstep_storage_HashTable_proto
- quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
- quickstep_storage_LinearOpenAddressingHashTable
- quickstep_storage_SeparateChainingHashTable
- quickstep_storage_SimpleScalarSeparateChainingHashTable
- quickstep_storage_TupleReference
- quickstep_types_TypeFactory
- quickstep_utility_BloomFilter
- quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
- quickstep_storage_HashTableBase
- quickstep_storage_HashTableKeyManager
- quickstep_storage_StorageBlob
- quickstep_storage_StorageBlockInfo
- quickstep_storage_StorageConstants
- quickstep_storage_StorageManager
- quickstep_threading_SpinSharedMutex
- quickstep_types_Type
- quickstep_types_TypedValue
- quickstep_utility_Alignment
- quickstep_utility_Macros
- quickstep_utility_PrimeNumber)
target_link_libraries(quickstep_storage_FileManager
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
@@ -786,9 +753,7 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
target_link_libraries(quickstep_storage_HashTablePool
glog
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_AggregationHashTable
- quickstep_storage_FastHashTable
- quickstep_storage_FastHashTableFactory
+ quickstep_storage_AggregationStateHashTable
quickstep_storage_HashTableBase
quickstep_threading_SpinMutex
quickstep_utility_Macros
@@ -799,6 +764,7 @@ target_link_libraries(quickstep_storage_HashTableUntypedKeyManager
quickstep_types_Type
quickstep_types_TypeFunctors
quickstep_types_TypedValue
+ quickstep_utility_InlineMemcpy
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_IndexSubBlock
quickstep_catalog_CatalogTypedefs
@@ -820,6 +786,7 @@ target_link_libraries(quickstep_storage_InsertDestination
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_AggregationResultIterator
quickstep_storage_InsertDestinationInterface
quickstep_storage_InsertDestination_proto
quickstep_storage_StorageBlock
@@ -859,6 +826,7 @@ target_link_libraries(quickstep_storage_PackedRowStoreTupleStorageSubBlock
quickstep_catalog_CatalogRelationSchema
quickstep_catalog_CatalogTypedefs
quickstep_expressions_predicate_PredicateCost
+ quickstep_storage_AggregationResultIterator
quickstep_storage_PackedRowStoreValueAccessor
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageBlockLayout_proto
@@ -994,6 +962,7 @@ target_link_libraries(quickstep_storage_StorageBlock
quickstep_expressions_aggregation_AggregationHandle
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
+ quickstep_storage_AggregationResultIterator
quickstep_storage_BasicColumnStoreTupleStorageSubBlock
quickstep_storage_BloomFilterIndexSubBlock
quickstep_storage_CSBTreeIndexSubBlock
@@ -1022,7 +991,8 @@ target_link_libraries(quickstep_storage_StorageBlock
quickstep_types_containers_Tuple
quickstep_types_operations_comparisons_ComparisonUtil
quickstep_utility_Macros
- quickstep_utility_PtrVector)
+ quickstep_utility_PtrVector
+ quickstep_utility_ScopedBuffer)
# CMAKE_VALIDATE_IGNORE_BEGIN
if(QUICKSTEP_HAVE_BITWEAVING)
target_link_libraries(quickstep_storage_StorageBlock
@@ -1184,9 +1154,6 @@ target_link_libraries(quickstep_storage
quickstep_storage_EvictionPolicy
quickstep_storage_FileManager
quickstep_storage_FileManagerLocal
- quickstep_storage_FastHashTable
- quickstep_storage_FastHashTableFactory
- quickstep_storage_FastSeparateChainingHashTable
quickstep_storage_HashTable
quickstep_storage_HashTable_proto
quickstep_storage_HashTableBase
@@ -1208,6 +1175,9 @@ target_link_libraries(quickstep_storage
quickstep_storage_SimpleScalarSeparateChainingHashTable
quickstep_storage_SplitRowStoreTupleStorageSubBlock
quickstep_storage_SplitRowStoreValueAccessor
+ quickstep_storage_AggregationResultIterator
+ quickstep_storage_AggregationStateHashTable
+ quickstep_storage_AggregationStateManager
quickstep_storage_StorageBlob
quickstep_storage_StorageBlock
quickstep_storage_StorageBlockBase