You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/05 01:36:06 UTC
[2/8] incubator-quickstep git commit: Initial commit.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27380a69/storage/PackedPayloadHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
new file mode 100644
index 0000000..43a295c
--- /dev/null
+++ b/storage/PackedPayloadHashTable.cpp
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/PackedPayloadHashTable.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableKeyManager.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "threading/SpinMutex.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "types/Type.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/Alignment.hpp"
+#include "utility/Macros.hpp"
+#include "utility/PrimeNumber.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+PackedPayloadHashTable::PackedPayloadHashTable(
+ const std::vector<const Type *> &key_types,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager)
+ : key_types_(key_types),
+ num_handles_(handles.size()),
+ handles_(handles),
+ total_payload_size_(ComputeTotalPayloadSize(handles)),
+ storage_manager_(storage_manager),
+ kBucketAlignment(alignof(std::atomic<std::size_t>)),
+ kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
+ key_manager_(key_types_, kValueOffset + total_payload_size_),
+ bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
+ std::size_t payload_offset_running_sum = sizeof(SpinMutex);
+ for (const auto *handle : handles) {
+ payload_offsets_.emplace_back(payload_offset_running_sum);
+ payload_offset_running_sum += handle->getPayloadSize();
+ }
+
+ // NOTE(jianqiao): Potential memory leak / double freeing by copying from
+ // init_payload to buckets if payload contains out of line data.
+ init_payload_ =
+ static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
+ DCHECK(init_payload_ != nullptr);
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
+ }
+
+ // Bucket size always rounds up to the alignment requirement of the atomic
+ // size_t "next" pointer at the front or a ValueT, whichever is larger.
+ //
+ // Give base HashTable information about what key components are stored
+ // inline from 'key_manager_'.
+ setKeyInline(key_manager_.getKeyInline());
+
+ // Pick out a prime number of slots and calculate storage requirements.
+ std::size_t num_slots_tmp =
+ get_next_prime_number(num_entries * kHashTableLoadFactor);
+ std::size_t required_memory =
+ sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
+ (num_slots_tmp / kHashTableLoadFactor) *
+ (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
+ std::size_t num_storage_slots =
+ this->storage_manager_->SlotsNeededForBytes(required_memory);
+ if (num_storage_slots == 0) {
+ FATAL_ERROR(
+ "Storage requirement for SeparateChainingHashTable "
+ "exceeds maximum allocation size.");
+ }
+
+ // Get a StorageBlob to hold the hash table.
+ const block_id blob_id =
+ this->storage_manager_->createBlob(num_storage_slots);
+ this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
+
+ void *aligned_memory_start = this->blob_->getMemoryMutable();
+ std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
+ if (align(alignof(Header),
+ sizeof(Header),
+ aligned_memory_start,
+ available_memory) == nullptr) {
+ // With current values from StorageConstants.hpp, this should be
+ // impossible. A blob is at least 1 MB, while a Header has alignment
+ // requirement of just kCacheLineBytes (64 bytes).
+ FATAL_ERROR(
+ "StorageBlob used to hold resizable "
+ "SeparateChainingHashTable is too small to meet alignment "
+ "requirements of SeparateChainingHashTable::Header.");
+ } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
+ // This should also be impossible, since the StorageManager allocates slots
+ // aligned to kCacheLineBytes.
+ DEV_WARNING("StorageBlob memory adjusted by "
+ << (num_storage_slots * kSlotSizeBytes - available_memory)
+ << " bytes to meet alignment requirement for "
+ << "SeparateChainingHashTable::Header.");
+ }
+
+ // Locate the header.
+ header_ = static_cast<Header *>(aligned_memory_start);
+ aligned_memory_start =
+ static_cast<char *>(aligned_memory_start) + sizeof(Header);
+ available_memory -= sizeof(Header);
+
+ // Recompute the number of slots & buckets using the actual available memory.
+ // Most likely, we got some extra free bucket space due to "rounding up" to
+ // the storage blob's size. It's also possible (though very unlikely) that we
+ // will wind up with fewer buckets than we initially wanted because of screwy
+ // alignment requirements for ValueT.
+ std::size_t num_buckets_tmp =
+ available_memory /
+ (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
+ key_manager_.getEstimatedVariableKeySize());
+ num_slots_tmp =
+ get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor);
+ num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
+ DEBUG_ASSERT(num_slots_tmp > 0);
+ DEBUG_ASSERT(num_buckets_tmp > 0);
+
+ // Locate the slot array.
+ slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
+ aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+ sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+ available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+
+ // Locate the buckets.
+ buckets_ = aligned_memory_start;
+ // Extra-paranoid: If ValueT has an alignment requirement greater than that
+ // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
+ // array.
+ if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) ==
+ nullptr) {
+ FATAL_ERROR(
+ "StorageBlob used to hold resizable "
+ "SeparateChainingHashTable is too small to meet "
+ "alignment requirements of buckets.");
+ } else if (buckets_ != aligned_memory_start) {
+ DEV_WARNING(
+ "Bucket array start position adjusted to meet alignment "
+ "requirement for SeparateChainingHashTable's value type.");
+ if (num_buckets_tmp * bucket_size_ > available_memory) {
+ --num_buckets_tmp;
+ }
+ }
+
+ // Fill in the header.
+ header_->num_slots = num_slots_tmp;
+ header_->num_buckets = num_buckets_tmp;
+ header_->buckets_allocated.store(0, std::memory_order_relaxed);
+ header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+ available_memory -= bucket_size_ * (header_->num_buckets);
+
+ // Locate variable-length key storage region, and give it all the remaining
+ // bytes in the blob.
+ key_manager_.setVariableLengthStorageInfo(
+ static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_,
+ available_memory,
+ &(header_->variable_length_bytes_allocated));
+}
+
+PackedPayloadHashTable::~PackedPayloadHashTable() {
+ if (blob_.valid()) {
+ const block_id blob_id = blob_->getID();
+ blob_.release();
+ storage_manager_->deleteBlockOrBlobFile(blob_id);
+ }
+ std::free(init_payload_);
+}
+
+void PackedPayloadHashTable::clear() {
+ const std::size_t used_buckets =
+ header_->buckets_allocated.load(std::memory_order_relaxed);
+ // Destroy existing values, if necessary.
+ destroyPayload();
+
+ // Zero-out slot array.
+ std::memset(
+ slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
+
+ // Zero-out used buckets.
+ std::memset(buckets_, 0x0, used_buckets * bucket_size_);
+
+ header_->buckets_allocated.store(0, std::memory_order_relaxed);
+ header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+ key_manager_.zeroNextVariableLengthKeyOffset();
+}
+
+void PackedPayloadHashTable::destroyPayload() {
+ const std::size_t num_buckets =
+ header_->buckets_allocated.load(std::memory_order_relaxed);
+ void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset;
+ for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
+ for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) {
+ void *value_internal_ptr =
+ static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id];
+ handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr));
+ }
+ bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_;
+ }
+}
+
+bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_attr_ids,
+ const ValueAccessorMultiplexer &accessor_mux) {
+ DCHECK(accessor_mux.getDerivedAccessor()->getImplementationType()
+ == ValueAccessor::Implementation::kColumnVectors);
+ ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+ ColumnVectorsValueAccessor *derived_accessor =
+ static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
+
+ if (derived_accessor == nullptr) {
+ return upsertValueAccessorCompositeKeyInternal<false>(argument_ids,
+ key_attr_ids,
+ base_accessor,
+ derived_accessor);
+ } else {
+ return upsertValueAccessorCompositeKeyInternal<true>(argument_ids,
+ key_attr_ids,
+ base_accessor,
+ derived_accessor);
+ }
+}
+
+void PackedPayloadHashTable::resize(const std::size_t extra_buckets,
+ const std::size_t extra_variable_storage,
+ const std::size_t retry_num) {
+ // A retry should never be necessary with this implementation of HashTable.
+ // Separate chaining ensures that any resized hash table with more buckets
+ // than the original table will be able to hold more entries than the
+ // original.
+ DEBUG_ASSERT(retry_num == 0);
+
+ SpinSharedMutexExclusiveLock<true> write_lock(this->resize_shared_mutex_);
+
+ // Recheck whether the hash table is still full. Note that multiple threads
+ // might wait to rebuild this hash table simultaneously. Only the first one
+ // should do the rebuild.
+ if (!isFull(extra_variable_storage)) {
+ return;
+ }
+
+ // Approximately double the number of buckets and slots.
+ //
+ // TODO(chasseur): It may be worth it to more than double the number of
+ // buckets here so that we can maintain a good, sparse fill factor for a
+ // longer time as more values are inserted. Such behavior should take into
+ // account kHashTableLoadFactor.
+ std::size_t resized_num_slots = get_next_prime_number(
+ (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2);
+ std::size_t variable_storage_required =
+ (resized_num_slots / kHashTableLoadFactor) *
+ key_manager_.getEstimatedVariableKeySize();
+ const std::size_t original_variable_storage_used =
+ header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+ // If this resize was triggered by a too-large variable-length key, bump up
+ // the variable-length storage requirement.
+ if ((extra_variable_storage > 0) &&
+ (extra_variable_storage + original_variable_storage_used >
+ key_manager_.getVariableLengthKeyStorageSize())) {
+ variable_storage_required += extra_variable_storage;
+ }
+
+ const std::size_t resized_memory_required =
+ sizeof(Header) + resized_num_slots * sizeof(std::atomic<std::size_t>) +
+ (resized_num_slots / kHashTableLoadFactor) * bucket_size_ +
+ variable_storage_required;
+ const std::size_t resized_storage_slots =
+ this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
+ if (resized_storage_slots == 0) {
+ FATAL_ERROR(
+ "Storage requirement for resized SeparateChainingHashTable "
+ "exceeds maximum allocation size.");
+ }
+
+ // Get a new StorageBlob to hold the resized hash table.
+ const block_id resized_blob_id =
+ this->storage_manager_->createBlob(resized_storage_slots);
+ MutableBlobReference resized_blob =
+ this->storage_manager_->getBlobMutable(resized_blob_id);
+
+ // Locate data structures inside the new StorageBlob.
+ void *aligned_memory_start = resized_blob->getMemoryMutable();
+ std::size_t available_memory = resized_storage_slots * kSlotSizeBytes;
+ if (align(alignof(Header),
+ sizeof(Header),
+ aligned_memory_start,
+ available_memory) == nullptr) {
+ // Should be impossible, as noted in constructor.
+ FATAL_ERROR(
+ "StorageBlob used to hold resized SeparateChainingHashTable "
+ "is too small to meet alignment requirements of "
+ "LinearOpenAddressingHashTable::Header.");
+ } else if (aligned_memory_start != resized_blob->getMemoryMutable()) {
+ // Again, should be impossible.
+ DEV_WARNING("In SeparateChainingHashTable::resize(), StorageBlob "
+ << "memory adjusted by "
+ << (resized_num_slots * kSlotSizeBytes - available_memory)
+ << " bytes to meet alignment requirement for "
+ << "LinearOpenAddressingHashTable::Header.");
+ }
+
+ Header *resized_header = static_cast<Header *>(aligned_memory_start);
+ aligned_memory_start =
+ static_cast<char *>(aligned_memory_start) + sizeof(Header);
+ available_memory -= sizeof(Header);
+
+ // As in constructor, recompute the number of slots and buckets using the
+ // actual available memory.
+ std::size_t resized_num_buckets =
+ (available_memory - extra_variable_storage) /
+ (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
+ key_manager_.getEstimatedVariableKeySize());
+ resized_num_slots =
+ get_previous_prime_number(resized_num_buckets * kHashTableLoadFactor);
+ resized_num_buckets = resized_num_slots / kHashTableLoadFactor;
+
+ // Locate slot array.
+ std::atomic<std::size_t> *resized_slots =
+ static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
+ aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+ sizeof(std::atomic<std::size_t>) * resized_num_slots;
+ available_memory -= sizeof(std::atomic<std::size_t>) * resized_num_slots;
+
+ // As in constructor, we will be extra paranoid and use align() to locate the
+ // start of the array of buckets, as well.
+ void *resized_buckets = aligned_memory_start;
+ if (align(
+ kBucketAlignment, bucket_size_, resized_buckets, available_memory) ==
+ nullptr) {
+ FATAL_ERROR(
+ "StorageBlob used to hold resized SeparateChainingHashTable "
+ "is too small to meet alignment requirements of buckets.");
+ } else if (resized_buckets != aligned_memory_start) {
+ DEV_WARNING(
+ "Bucket array start position adjusted to meet alignment "
+ "requirement for SeparateChainingHashTable's value type.");
+ if (resized_num_buckets * bucket_size_ + variable_storage_required >
+ available_memory) {
+ --resized_num_buckets;
+ }
+ }
+ aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+ resized_num_buckets * bucket_size_;
+ available_memory -= resized_num_buckets * bucket_size_;
+
+ void *resized_variable_length_key_storage = aligned_memory_start;
+ const std::size_t resized_variable_length_key_storage_size = available_memory;
+
+ const std::size_t original_buckets_used =
+ header_->buckets_allocated.load(std::memory_order_relaxed);
+
+ // Initialize the header.
+ resized_header->num_slots = resized_num_slots;
+ resized_header->num_buckets = resized_num_buckets;
+ resized_header->buckets_allocated.store(original_buckets_used,
+ std::memory_order_relaxed);
+ resized_header->variable_length_bytes_allocated.store(
+ original_variable_storage_used, std::memory_order_relaxed);
+
+ // Bulk-copy buckets. This is safe because:
+ // 1. The "next" pointers will be adjusted when rebuilding chains below.
+ // 2. The hash codes will stay the same.
+ // 3. For key components:
+ // a. Inline keys will stay exactly the same.
+ // b. Offsets into variable-length storage will remain valid, because
+ // we also do a byte-for-byte copy of variable-length storage below.
+ // c. Absolute external pointers will still point to the same address.
+ // d. Relative pointers are not used with resizable hash tables.
+ // 4. If values are not trivially copyable, then we invoke ValueT's copy
+ // or move constructor with placement new.
+ // NOTE(harshad) - Regarding point 4 above, as this is a specialized
+ // hash table implemented for aggregation, the values are trivially copyable,
+ // therefore we don't need to invoke payload values' copy/move constructors.
+ std::memcpy(resized_buckets, buckets_, original_buckets_used * bucket_size_);
+
+ // Copy over variable-length key components, if any.
+ if (original_variable_storage_used > 0) {
+ DEBUG_ASSERT(original_variable_storage_used ==
+ key_manager_.getNextVariableLengthKeyOffset());
+ DEBUG_ASSERT(original_variable_storage_used <=
+ resized_variable_length_key_storage_size);
+ std::memcpy(resized_variable_length_key_storage,
+ key_manager_.getVariableLengthKeyStorage(),
+ original_variable_storage_used);
+ }
+
+ destroyPayload();
+
+ // Make resized structures active.
+ std::swap(this->blob_, resized_blob);
+ header_ = resized_header;
+ slots_ = resized_slots;
+ buckets_ = resized_buckets;
+ key_manager_.setVariableLengthStorageInfo(
+ resized_variable_length_key_storage,
+ resized_variable_length_key_storage_size,
+ &(resized_header->variable_length_bytes_allocated));
+
+ // Drop the old blob.
+ const block_id old_blob_id = resized_blob->getID();
+ resized_blob.release();
+ this->storage_manager_->deleteBlockOrBlobFile(old_blob_id);
+
+ // Rebuild chains.
+ void *current_bucket = buckets_;
+ for (std::size_t bucket_num = 0; bucket_num < original_buckets_used;
+ ++bucket_num) {
+ std::atomic<std::size_t> *next_ptr =
+ static_cast<std::atomic<std::size_t> *>(current_bucket);
+ const std::size_t hash_code = *reinterpret_cast<const std::size_t *>(
+ static_cast<const char *>(current_bucket) +
+ sizeof(std::atomic<std::size_t>));
+
+ const std::size_t slot_number = hash_code % header_->num_slots;
+ std::size_t slot_ptr_value = 0;
+ if (slots_[slot_number].compare_exchange_strong(
+ slot_ptr_value, bucket_num + 1, std::memory_order_relaxed)) {
+ // This bucket is the first in the chain for this block, so reset its
+ // next pointer to 0.
+ next_ptr->store(0, std::memory_order_relaxed);
+ } else {
+ // A chain already exists starting from this slot, so put this bucket at
+ // the head.
+ next_ptr->store(slot_ptr_value, std::memory_order_relaxed);
+ slots_[slot_number].store(bucket_num + 1, std::memory_order_relaxed);
+ }
+ current_bucket = static_cast<char *>(current_bucket) + bucket_size_;
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27380a69/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
new file mode 100644
index 0000000..b2becab
--- /dev/null
+++ b/storage/PackedPayloadHashTable.hpp
@@ -0,0 +1,997 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <limits>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/HashTableKeyManager.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "threading/SpinMutex.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/HashPair.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+class Type;
+class ValueAccessor;
+
+/** \addtogroup Storage
+ * @{
+ */
+
+/**
+ * @brief Aggregation hash table implementation in which the payload can be just
+ * a bunch of bytes. This implementation is suitable for aggregation with
+ * multiple aggregation handles (e.g. SUM, MAX, MIN etc).
+ *
+ * At present the hash table uses separate chaining to resolve collisions, i.e.
+ * Keys/values are stored in a separate region of memory from the base hash
+ * table slot array. Every bucket has a "next" pointer so that entries that
+ * collide (i.e. map to the same base slot) form chains of pointers with each
+ * other.
+ **/
+class PackedPayloadHashTable : public AggregationStateHashTableBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param key_types A vector of one or more types (>1 indicates a composite
+ * key).
+ * @param num_entries The estimated number of entries this hash table will
+ * hold.
+ * @param handles The aggregation handles.
+ * @param storage_manager The StorageManager to use (a StorageBlob will be
+ * allocated to hold this hash table's contents).
+ **/
+ PackedPayloadHashTable(
+ const std::vector<const Type *> &key_types,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager);
+
+ ~PackedPayloadHashTable() override;
+
+ /**
+ * @brief Erase all entries in this hash table.
+ *
+ * @warning This method is not guaranteed to be threadsafe.
+ **/
+ void clear();
+
+ void destroyPayload() override;
+
+ /**
+ * @brief Use aggregation handles to update (multiple) aggregation states in
+ * this hash table, with group-by keys and arguments drawn from the
+ * given ValueAccessors. New states are first inserted if not already
+ * present.
+ *
+ * @note This method is threadsafe with regard to other calls to
+ * upsertCompositeKey() and upsertValueAccessorCompositeKey().
+ *
+ * @param argument_ids The multi-source attribute IDs of each argument
+ * component to be read from \p accessor_mux.
+ * @param key_ids The multi-source attribute IDs of each group-by key
+ * component to be read from \p accessor_mux.
+ * @param accessor_mux A ValueAccessorMultiplexer object that contains the
+ * ValueAccessors which will be used to access keys. beginIteration()
+ * should be called on the accessors before calling this method.
+ * @return True on success, false if upsert failed because there was not
+ * enough space to insert new entries for all the keys in accessor
+ * (note that some entries may still have been upserted, and
+ * accessors' iterations will be left on the first tuple which could
+ * not be inserted).
+ **/
+ bool upsertValueAccessorCompositeKey(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ const ValueAccessorMultiplexer &accessor_mux) override;
+
+ /**
+ * @return The ID of the StorageBlob used to store this hash table.
+ **/
+ inline block_id getBlobId() const {
+ return blob_->getID();
+ }
+
+ /**
+ * @warning This method assumes that no concurrent calls to
+ * upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+ * taking place (i.e. that this HashTable is immutable for the
+ * duration of the call).
+ * Concurrent calls to getSingleCompositeKey(), forEach(), and
+ * forEachCompositeKey() are safe.
+ *
+ * @return The number of entries in this HashTable.
+ **/
+ inline std::size_t numEntries() const {
+ return header_->buckets_allocated.load(std::memory_order_relaxed);
+ }
+
+ /**
+ * @brief Use aggregation handles to merge the given aggregation states into
+ * the aggregation states mapped to the given key. New states are first
+ * inserted if not already present.
+ *
+ * @warning The key must not be null.
+ * @note This method is threadsafe with regard to other calls to
+ * upsertCompositeKey() and upsertValueAccessorCompositeKey().
+ *
+ * @param key The key.
+ * @param source_state The source aggregation states to be merged into this
+ * hash table.
+ * @return True on success, false if upsert failed because there was not
+ * enough space to insert a new entry in this hash table.
+ **/
+ inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
+ const std::uint8_t *source_state);
+
+ /**
+ * @brief Apply a functor to an aggregation state mapped to the given key.
+ * First inserting a new state if one is not already present.
+ *
+ * @warning The key must not be null.
+ * @note This method is threadsafe with regard to other calls to
+ * upsertCompositeKey() and upsertValueAccessorCompositeKey().
+ *
+ * @param key The key.
+ * @param functor A pointer to a functor, which should provide a call
+ * operator which takes an aggregation state (of type std::uint8_t *)
+ * as an argument.
+ * @param index The index of the target aggregation state among those states
+ * mapped to \p key.
+ * @return True on success, false if upsert failed because there was not
+ * enough space to insert a new entry in this hash table.
+ **/
+ template <typename FunctorT>
+ inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
+ FunctorT *functor,
+ const std::size_t index);
+
+ /**
+ * @brief Lookup a composite key against this hash table to find a matching
+ * entry.
+ *
+ * @warning The key must not be null.
+ * @warning This method assumes that no concurrent calls to
+ * upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+ * taking place (i.e. that this HashTable is immutable for the
+ * duration of the call and as long as the returned pointer may be
+ * dereferenced). Concurrent calls to getSingleCompositeKey(),
+ * forEach(), and forEachCompositeKey() are safe.
+ *
+ * @param key The key to look up.
+ * @return The value of a matched entry if a matching key is found.
+ * Otherwise, return NULL.
+ **/
+ inline const std::uint8_t* getSingleCompositeKey(
+ const std::vector<TypedValue> &key) const;
+
+ /**
+ * @brief Lookup a composite key against this hash table to find a matching
+ * entry. Then return the aggregation state component with the
+ * specified index.
+ *
+ * @warning The key must not be null.
+ * @warning This method assumes that no concurrent calls to
+ * upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+ * taking place (i.e. that this HashTable is immutable for the
+ * duration of the call and as long as the returned pointer may be
+ * dereferenced). Concurrent calls to getSingleCompositeKey(),
+ * forEach(), and forEachCompositeKey() are safe.
+ *
+ * @param key The key to look up.
+ * @param index The index of the target aggregation state among those states
+ * mapped to \p key.
+ * @return The aggregation state of the specified index if a matching key is
+ * found. Otherwise, return NULL.
+ **/
+ inline const std::uint8_t* getSingleCompositeKey(
+ const std::vector<TypedValue> &key,
+ const std::size_t index) const;
+
+ /**
+ * @brief Apply a functor to each (key, value) pair in this hash table.
+ *
+ * @warning This method assumes that no concurrent calls to
+ * upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+ * taking place (i.e. that this HashTable is immutable for the
+ * duration of the call and as long as the returned pointer may be
+ * dereferenced). Concurrent calls to getSingleCompositeKey(),
+ * forEach(), and forEachCompositeKey() are safe.
+ *
+ * @param functor A pointer to a functor, which should provide a call operator
+ * which takes 2 arguments: const TypedValue&, const std::uint8_t*.
+ * The call operator will be invoked once on each key, value pair in
+ * this hash table.
+ * @return The number of key-value pairs visited.
+ **/
+ template <typename FunctorT>
+ inline std::size_t forEach(FunctorT *functor) const;
+
+ /**
+ * @brief Apply a functor to each (key, aggregation state) pair in this hash
+ * table, where the aggregation state is retrieved from the value
+ * that maps to the corresponding key with the specified index.
+ *
+ * @warning This method assumes that no concurrent calls to
+ * upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+ * taking place (i.e. that this HashTable is immutable for the
+ * duration of the call and as long as the returned pointer may be
+ * dereferenced). Concurrent calls to getSingleCompositeKey(),
+ * forEach(), and forEachCompositeKey() are safe.
+ *
+ * @param functor A pointer to a functor, which should provide a call operator
+ * which takes 2 arguments: const TypedValue&, const std::uint8_t*.
+ * The call operator will be invoked once on each (key, aggregation state)
+ * pair in this hash table.
+ * @param index The index of the target aggregation state among those states
+ * mapped to \p key.
+ * @return The number of key-value pairs visited.
+ **/
+ template <typename FunctorT>
+ inline std::size_t forEach(FunctorT *functor, const int index) const;
+
+ /**
+ * @brief Apply a functor to each key, value pair in this hash table.
+ * Composite key version.
+ *
+ * @warning This method assumes that no concurrent calls to
+ * upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+ * taking place (i.e. that this HashTable is immutable for the
+ * duration of the call and as long as the returned pointer may be
+ * dereferenced). Concurrent calls to getSingleCompositeKey(),
+ * forEach(), and forEachCompositeKey() are safe.
+ *
+ * @param functor A pointer to a functor, which should provide a call operator
+ * which takes 2 arguments: const TypedValue&, const std::uint8_t*.
+ * The call operator will be invoked once on each key, value pair in
+ * this hash table.
+ * @return The number of key-value pairs visited.
+ **/
+ template <typename FunctorT>
+ inline std::size_t forEachCompositeKey(FunctorT *functor) const;
+
+ /**
+ * @brief Apply a functor to each (key, aggregation state) pair in this hash
+ * table, where the aggregation state is retrieved from the value
+ * that maps to the corresponding key with the specified index.
+ * Composite key version.
+ *
+ * @warning This method assumes that no concurrent calls to
+ * upsertCompositeKey() or upsertValueAccessorCompositeKey() are
+ * taking place (i.e. that this HashTable is immutable for the
+ * duration of the call and as long as the returned pointer may be
+ * dereferenced). Concurrent calls to getSingleCompositeKey(),
+ * forEach(), and forEachCompositeKey() are safe.
+ *
+ * @param functor A pointer to a functor, which should provide a call operator
+ * which takes 2 arguments: const TypedValue&, const std::uint8_t*.
+ * The call operator will be invoked once on each (key, aggregation state)
+ * pair in this hash table.
+ * @param index The index of the target aggregation state among those states
+ * mapped to \p key.
+ * @return The number of key-value pairs visited.
+ **/
+ template <typename FunctorT>
+ inline std::size_t forEachCompositeKey(FunctorT *functor,
+ const std::size_t index) const;
+
+ private:
+ void resize(const std::size_t extra_buckets,
+ const std::size_t extra_variable_storage,
+ const std::size_t retry_num = 0);
+
+ inline std::size_t calculateVariableLengthCompositeKeyCopySize(
+ const std::vector<TypedValue> &key) const {
+ std::size_t total = 0;
+ for (std::vector<TypedValue>::size_type idx = 0; idx < key.size(); ++idx) {
+ if (!(*key_inline_)[idx]) {
+ total += key[idx].getDataSize();
+ }
+ }
+ return total;
+ }
+
+ inline bool getNextEntry(TypedValue *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const;
+
+ inline bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const;
+
+ inline std::uint8_t* upsertCompositeKeyInternal(
+ const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size);
+
+ template <bool use_two_accessors>
+ inline bool upsertValueAccessorCompositeKeyInternal(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ ValueAccessor *base_accessor,
+ ColumnVectorsValueAccessor *derived_accessor);
+
+ // Generate a hash for a composite key by hashing each component of 'key' and
+ // mixing their bits with CombineHashes().
+ inline std::size_t hashCompositeKey(const std::vector<TypedValue> &key) const;
+
+ // Set information about which key components are stored inline. This usually
+ // comes from a HashTableKeyManager, and is set by the constructor of a
+ // subclass of HashTable.
+ inline void setKeyInline(const std::vector<bool> *key_inline) {
+ scalar_key_inline_ = key_inline->front();
+ key_inline_ = key_inline;
+ }
+
+ inline static std::size_t ComputeTotalPayloadSize(
+ const std::vector<AggregationHandle *> &handles) {
+ std::size_t total_payload_size = sizeof(SpinMutex);
+ for (const auto *handle : handles) {
+ total_payload_size += handle->getPayloadSize();
+ }
+ return total_payload_size;
+ }
+
+ // Assign '*key_vector' with the attribute values specified by 'key_ids' at
+ // the current position of 'accessor'. If 'check_for_null_keys' is true, stops
+ // and returns true if any of the values is null, otherwise returns false.
+ template <bool use_two_accessors,
+ bool check_for_null_keys,
+ typename ValueAccessorT>
+ inline static bool GetCompositeKeyFromValueAccessor(
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ const ValueAccessorT *accessor,
+ const ColumnVectorsValueAccessor *derived_accessor,
+ std::vector<TypedValue> *key_vector) {
+ for (std::size_t key_idx = 0; key_idx < key_ids.size(); ++key_idx) {
+ const MultiSourceAttributeId &key_id = key_ids[key_idx];
+ if (use_two_accessors && key_id.source == ValueAccessorSource::kDerived) {
+ (*key_vector)[key_idx] = derived_accessor->getTypedValue(key_id.attr_id);
+ } else {
+ (*key_vector)[key_idx] = accessor->getTypedValue(key_id.attr_id);
+ }
+ if (check_for_null_keys && (*key_vector)[key_idx].isNull()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ struct Header {
+ std::size_t num_slots;
+ std::size_t num_buckets;
+ alignas(kCacheLineBytes) std::atomic<std::size_t> buckets_allocated;
+ alignas(kCacheLineBytes)
+ std::atomic<std::size_t> variable_length_bytes_allocated;
+ };
+
+ // Type(s) of keys.
+ const std::vector<const Type *> key_types_;
+
+ // Information about whether key components are stored inline or in a
+ // separate variable-length storage region. This is usually determined by a
+ // HashTableKeyManager and set by calling setKeyInline().
+ bool scalar_key_inline_;
+ const std::vector<bool> *key_inline_;
+
+ const std::size_t num_handles_;
+ const std::vector<AggregationHandle *> handles_;
+
+ std::size_t total_payload_size_;
+ std::vector<std::size_t> payload_offsets_;
+ std::uint8_t *init_payload_;
+
+ StorageManager *storage_manager_;
+ MutableBlobReference blob_;
+
+ // Locked in shared mode for most operations, exclusive mode during resize.
+ // Not locked at all for non-resizable HashTables.
+ alignas(kCacheLineBytes) SpinSharedMutex<true> resize_shared_mutex_;
+
+ std::size_t kBucketAlignment;
+
+ // Value's offset in a bucket is the first alignof(ValueT) boundary after the
+ // next pointer and hash code.
+ std::size_t kValueOffset;
+
+ // Round bucket size up to a multiple of kBucketAlignment.
+ constexpr std::size_t ComputeBucketSize(const std::size_t fixed_key_size) {
+ return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) /
+ kBucketAlignment) +
+ 1) *
+ kBucketAlignment;
+ }
+
+ // Attempt to find an empty bucket to insert 'hash_code' into, starting after
+ // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
+ // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
+ // empty bucket is found. Returns false if 'allow_duplicate_keys' is false
+ // and a hash collision is found (caller should then check whether there is a
+ // genuine key collision or the hash collision is spurious). Returns false
+ // and sets '*bucket' to NULL if there are no more empty buckets in the hash
+ // table. If 'variable_key_allocation_required' is nonzero, this method will
+ // attempt to allocate storage for a variable-length key BEFORE allocating a
+ // bucket, so that no bucket number below 'header_->num_buckets' is ever
+ // deallocated after being allocated.
+ inline bool locateBucketForInsertion(
+ const std::size_t hash_code,
+ const std::size_t variable_key_allocation_required,
+ void **bucket,
+ std::atomic<std::size_t> **pending_chain_ptr,
+ std::size_t *pending_chain_ptr_finish_value);
+
+ // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was
+ // found by locateBucketForInsertion(). Assumes that storage for a
+ // variable-length key copy (if any) was already allocated by a successful
+ // call to allocateVariableLengthKeyStorage().
+ inline void writeScalarKeyToBucket(
+ const TypedValue &key,
+ const std::size_t hash_code,
+ void *bucket);
+
+ // Write a composite 'key' and its 'hash_code' into the '*bucket', which was
+ // found by locateBucketForInsertion(). Assumes that storage for
+ // variable-length key copies (if any) was already allocated by a successful
+ // call to allocateVariableLengthKeyStorage().
+ inline void writeCompositeKeyToBucket(
+ const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ void *bucket);
+
+ // Determine whether it is actually necessary to resize this hash table.
+ // Checks that there is at least one unallocated bucket, and that there is
+ // at least 'extra_variable_storage' bytes of variable-length storage free.
+ inline bool isFull(const std::size_t extra_variable_storage) const;
+
+ // Helper object to manage key storage.
+ HashTableKeyManager<false, true> key_manager_;
+
+ // In-memory structure is as follows:
+ // - SeparateChainingHashTable::Header
+ // - Array of slots, interpreted as follows:
+ // - 0 = Points to nothing (empty)
+ // - SIZE_T_MAX = Pending (some thread is starting a chain from this
+ // slot and will overwrite it soon)
+ // - Anything else = The number of the first bucket in the chain for
+ // this slot PLUS ONE (i.e. subtract one to get the actual bucket
+ // number).
+ // - Array of buckets, each of which is:
+ // - atomic size_t "next" pointer, interpreted the same as slots above.
+ // - size_t hash value
+ // - possibly some unused bytes as needed so that ValueT's alignment
+ // requirement is met
+ // - ValueT value slot
+ // - fixed-length key storage (which may include pointers to external
+ // memory or offsets of variable length keys stored within this hash
+ // table)
+ // - possibly some additional unused bytes so that bucket size is a
+ // multiple of both alignof(std::atomic<std::size_t>) and
+ // alignof(ValueT)
+ // - Variable-length key storage region (referenced by offsets stored in
+ // fixed-length keys).
+ Header *header_;
+
+ std::atomic<std::size_t> *slots_;
+ void *buckets_;
+ const std::size_t bucket_size_;
+
+ DISALLOW_COPY_AND_ASSIGN(PackedPayloadHashTable);
+};
+
+/** @} */
+
+// ----------------------------------------------------------------------------
+// Implementations of template class methods follow.
+
+class HashTableMerger {
+ public:
+ /**
+ * @brief Constructor
+ *
+ * @param handle The Aggregation handle being used.
+ * @param destination_hash_table The destination hash table to which other
+ * hash tables will be merged.
+ **/
+ explicit HashTableMerger(PackedPayloadHashTable *destination_hash_table)
+ : destination_hash_table_(destination_hash_table) {}
+
+ /**
+ * @brief The operator for the functor.
+ *
+ * @param group_by_key The group by key being merged.
+ * @param source_state The aggregation state for the given key in the source
+ * aggregation hash table.
+ **/
+ inline void operator()(const std::vector<TypedValue> &group_by_key,
+ const std::uint8_t *source_state) {
+ destination_hash_table_->upsertCompositeKey(group_by_key, source_state);
+ }
+
+ private:
+ PackedPayloadHashTable *destination_hash_table_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
+};
+
+inline std::size_t PackedPayloadHashTable::hashCompositeKey(
+ const std::vector<TypedValue> &key) const {
+ DEBUG_ASSERT(!key.empty());
+ DEBUG_ASSERT(key.size() == key_types_.size());
+ std::size_t hash = key.front().getHash();
+ for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
+ key_it != key.end();
+ ++key_it) {
+ hash = CombineHashes(hash, key_it->getHash());
+ }
+ return hash;
+}
+
+inline bool PackedPayloadHashTable::getNextEntry(TypedValue *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const {
+ if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
+ *key = key_manager_.getKeyComponentTyped(bucket, 0);
+ *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ ++(*entry_num);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+
+inline bool PackedPayloadHashTable::getNextEntryCompositeKey(
+ std::vector<TypedValue> *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const {
+ if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
+ for (std::vector<const Type *>::size_type key_idx = 0;
+ key_idx < this->key_types_.size();
+ ++key_idx) {
+ key->emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx));
+ }
+ *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ ++(*entry_num);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+
+inline bool PackedPayloadHashTable::locateBucketForInsertion(
+ const std::size_t hash_code,
+ const std::size_t variable_key_allocation_required,
+ void **bucket,
+ std::atomic<std::size_t> **pending_chain_ptr,
+ std::size_t *pending_chain_ptr_finish_value) {
+ if (*bucket == nullptr) {
+ *pending_chain_ptr = &(slots_[hash_code % header_->num_slots]);
+ } else {
+ *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
+ }
+ for (;;) {
+ std::size_t existing_chain_ptr = 0;
+ if ((*pending_chain_ptr)
+ ->compare_exchange_strong(existing_chain_ptr,
+ std::numeric_limits<std::size_t>::max(),
+ std::memory_order_acq_rel)) {
+ // Got to the end of the chain. Allocate a new bucket.
+
+ // First, allocate variable-length key storage, if needed (i.e. if this
+ // is an upsert and we didn't allocate up-front).
+ if (!key_manager_.allocateVariableLengthKeyStorage(
+ variable_key_allocation_required)) {
+ // Ran out of variable-length storage.
+ (*pending_chain_ptr)->store(0, std::memory_order_release);
+ *bucket = nullptr;
+ return false;
+ }
+
+ const std::size_t allocated_bucket_num =
+ header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed);
+ if (allocated_bucket_num >= header_->num_buckets) {
+ // Ran out of buckets.
+ header_->buckets_allocated.fetch_sub(1, std::memory_order_relaxed);
+ (*pending_chain_ptr)->store(0, std::memory_order_release);
+ *bucket = nullptr;
+ return false;
+ } else {
+ *bucket =
+ static_cast<char *>(buckets_) + allocated_bucket_num * bucket_size_;
+ *pending_chain_ptr_finish_value = allocated_bucket_num + 1;
+ return true;
+ }
+ }
+ // Spin until the real "next" pointer is available.
+ while (existing_chain_ptr == std::numeric_limits<std::size_t>::max()) {
+ existing_chain_ptr =
+ (*pending_chain_ptr)->load(std::memory_order_acquire);
+ }
+ if (existing_chain_ptr == 0) {
+ // Other thread had to roll back, so try again.
+ continue;
+ }
+ // Chase the next pointer.
+ *bucket =
+ static_cast<char *>(buckets_) + (existing_chain_ptr - 1) * bucket_size_;
+ *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
+ const std::size_t hash_in_bucket = *reinterpret_cast<const std::size_t *>(
+ static_cast<const char *>(*bucket) +
+ sizeof(std::atomic<std::size_t>));
+ if (hash_in_bucket == hash_code) {
+ return false;
+ }
+ }
+}
+
+inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey(
+ const std::vector<TypedValue> &key) const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ }
+ bucket_ref =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ }
+
+ // Reached the end of the chain and didn't find a match.
+ return nullptr;
+}
+
+inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey(
+ const std::vector<TypedValue> &key,
+ const std::size_t index) const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset) +
+ this->payload_offsets_[index];
+ }
+ bucket_ref =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ }
+
+ // Reached the end of the chain and didn't find a match.
+ return nullptr;
+}
+
+inline bool PackedPayloadHashTable::upsertCompositeKey(
+ const std::vector<TypedValue> &key,
+ const std::uint8_t *source_state) {
+ const std::size_t variable_size =
+ calculateVariableLengthCompositeKeyCopySize(key);
+ for (;;) {
+ {
+ SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
+ std::uint8_t *value =
+ upsertCompositeKeyInternal(key, variable_size);
+ if (value != nullptr) {
+ SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+ for (unsigned int k = 0; k < num_handles_; ++k) {
+ handles_[k]->mergeStates(source_state + payload_offsets_[k],
+ value + payload_offsets_[k]);
+ }
+ return true;
+ }
+ }
+ resize(0, variable_size);
+ }
+}
+
+template <typename FunctorT>
+inline bool PackedPayloadHashTable::upsertCompositeKey(
+ const std::vector<TypedValue> &key,
+ FunctorT *functor,
+ const std::size_t index) {
+ const std::size_t variable_size =
+ calculateVariableLengthCompositeKeyCopySize(key);
+ for (;;) {
+ {
+ SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
+ std::uint8_t *value =
+ upsertCompositeKeyInternal(key, variable_size);
+ if (value != nullptr) {
+ (*functor)(value + payload_offsets_[index]);
+ return true;
+ }
+ }
+ resize(0, variable_size);
+ }
+}
+
+
+inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
+ const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size) {
+ if (variable_key_size > 0) {
+ // Don't allocate yet, since the key may already be present. However, we
+ // do check if either the allocated variable storage space OR the free
+ // space is big enough to hold the key (at least one must be true: either
+ // the key is already present and allocated, or we need to be able to
+ // allocate enough space for it).
+ std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(
+ std::memory_order_relaxed);
+ if ((allocated_bytes < variable_key_size) &&
+ (allocated_bytes + variable_key_size >
+ key_manager_.getVariableLengthKeyStorageSize())) {
+ return nullptr;
+ }
+ }
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ variable_key_size,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets or variable-key space.
+ return nullptr;
+ } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Found an already-existing entry for this key.
+ return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) +
+ kValueOffset);
+ }
+ }
+
+ // We are now writing to an empty bucket.
+ // Write the key and hash.
+ writeCompositeKeyToBucket(key, hash_code, bucket);
+
+ std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
+ std::memcpy(value, init_payload_, this->total_payload_size_);
+
+ // Update the previous chaing pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value,
+ std::memory_order_release);
+
+ // Return the value.
+ return value;
+}
+
+template <bool use_two_accessors>
+inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ ValueAccessor *base_accessor,
+ ColumnVectorsValueAccessor *derived_accessor) {
+ std::size_t variable_size;
+ std::vector<TypedValue> key_vector;
+ key_vector.resize(key_ids.size());
+
+ return InvokeOnAnyValueAccessor(
+ base_accessor,
+ [&](auto *accessor) -> bool { // NOLINT(build/c++11)
+ bool continuing = true;
+ while (continuing) {
+ {
+ continuing = false;
+ SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
+ while (accessor->next()) {
+ if (use_two_accessors) {
+ derived_accessor->next();
+ }
+ if (this->GetCompositeKeyFromValueAccessor<use_two_accessors, true>(
+ key_ids,
+ accessor,
+ derived_accessor,
+ &key_vector)) {
+ continue;
+ }
+ variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
+ std::uint8_t *value = this->upsertCompositeKeyInternal(
+ key_vector, variable_size);
+ if (value == nullptr) {
+ continuing = true;
+ break;
+ } else {
+ SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+ for (unsigned int k = 0; k < num_handles_; ++k) {
+ const auto &ids = argument_ids[k];
+ if (ids.empty()) {
+ handles_[k]->updateStateNullary(value + payload_offsets_[k]);
+ } else {
+ const MultiSourceAttributeId &arg_id = ids.front();
+ if (use_two_accessors && arg_id.source == ValueAccessorSource::kDerived) {
+ DCHECK_NE(arg_id.attr_id, kInvalidAttributeID);
+ handles_[k]->updateStateUnary(derived_accessor->getTypedValue(arg_id.attr_id),
+ value + payload_offsets_[k]);
+ } else {
+ handles_[k]->updateStateUnary(accessor->getTypedValue(arg_id.attr_id),
+ value + payload_offsets_[k]);
+ }
+ }
+ }
+ }
+ }
+ }
+ if (continuing) {
+ this->resize(0, variable_size);
+ accessor->previous();
+ if (use_two_accessors) {
+ derived_accessor->previous();
+ }
+ }
+ }
+ return true;
+ });
+}
+
+inline void PackedPayloadHashTable::writeScalarKeyToBucket(
+ const TypedValue &key,
+ const std::size_t hash_code,
+ void *bucket) {
+ *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
+ sizeof(std::atomic<std::size_t>)) =
+ hash_code;
+ key_manager_.writeKeyComponentToBucket(key, 0, bucket, nullptr);
+}
+
+inline void PackedPayloadHashTable::writeCompositeKeyToBucket(
+ const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ void *bucket) {
+ DEBUG_ASSERT(key.size() == this->key_types_.size());
+ *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
+ sizeof(std::atomic<std::size_t>)) =
+ hash_code;
+ for (std::size_t idx = 0; idx < this->key_types_.size(); ++idx) {
+ key_manager_.writeKeyComponentToBucket(key[idx], idx, bucket, nullptr);
+ }
+}
+
+inline bool PackedPayloadHashTable::isFull(
+ const std::size_t extra_variable_storage) const {
+ if (header_->buckets_allocated.load(std::memory_order_relaxed) >=
+ header_->num_buckets) {
+ // All buckets are allocated.
+ return true;
+ }
+
+ if (extra_variable_storage > 0) {
+ if (extra_variable_storage +
+ header_->variable_length_bytes_allocated.load(
+ std::memory_order_relaxed) >
+ key_manager_.getVariableLengthKeyStorageSize()) {
+ // Not enough variable-length key storage space.
+ return true;
+ }
+ }
+
+ return false;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEach(FunctorT *functor) const {
+ std::size_t entries_visited = 0;
+ std::size_t entry_num = 0;
+ TypedValue key;
+ const std::uint8_t *value_ptr;
+ while (getNextEntry(&key, &value_ptr, &entry_num)) {
+ ++entries_visited;
+ (*functor)(key, value_ptr);
+ }
+ return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEach(
+ FunctorT *functor, const int index) const {
+ std::size_t entries_visited = 0;
+ std::size_t entry_num = 0;
+ TypedValue key;
+ const std::uint8_t *value_ptr;
+ while (getNextEntry(&key, &value_ptr, &entry_num)) {
+ ++entries_visited;
+ (*functor)(key, value_ptr + payload_offsets_[index]);
+ key.clear();
+ }
+ return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEachCompositeKey(
+ FunctorT *functor) const {
+ std::size_t entries_visited = 0;
+ std::size_t entry_num = 0;
+ std::vector<TypedValue> key;
+ const std::uint8_t *value_ptr;
+ while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
+ ++entries_visited;
+ (*functor)(key, value_ptr);
+ key.clear();
+ }
+ return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEachCompositeKey(
+ FunctorT *functor,
+ const std::size_t index) const {
+ std::size_t entries_visited = 0;
+ std::size_t entry_num = 0;
+ std::vector<TypedValue> key;
+ const std::uint8_t *value_ptr;
+ while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
+ ++entries_visited;
+ (*functor)(key, value_ptr + payload_offsets_[index]);
+ key.clear();
+ }
+ return entries_visited;
+}
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27380a69/storage/PartitionedHashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp
index 95d1810..0e62511 100644
--- a/storage/PartitionedHashTablePool.hpp
+++ b/storage/PartitionedHashTablePool.hpp
@@ -21,22 +21,19 @@
#define QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_
#include <algorithm>
-#include <chrono>
+#include <cstddef>
#include <memory>
-#include <utility>
#include <vector>
-#include "expressions/aggregation/AggregationHandle.hpp"
#include "storage/HashTableBase.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/FastHashTableFactory.hpp"
+#include "storage/HashTableFactory.hpp"
#include "utility/Macros.hpp"
-#include "utility/StringUtil.hpp"
#include "glog/logging.h"
namespace quickstep {
+class AggregationHandle;
class StorageManager;
class Type;
@@ -54,33 +51,6 @@ class PartitionedHashTablePool {
/**
* @brief Constructor.
*
- * @param estimated_num_entries The maximum number of entries in a hash table.
- * @param num_partitions The number of partitions (i.e. number of HashTables)
- * @param hash_table_impl_type The type of hash table implementation.
- * @param group_by_types A vector of pointer of types which form the group by
- * key.
- * @param agg_handle The aggregation handle.
- * @param storage_manager A pointer to the storage manager.
- **/
- PartitionedHashTablePool(const std::size_t estimated_num_entries,
- const std::size_t num_partitions,
- const HashTableImplType hash_table_impl_type,
- const std::vector<const Type *> &group_by_types,
- AggregationHandle *agg_handle,
- StorageManager *storage_manager)
- : estimated_num_entries_(
- setHashTableSize(estimated_num_entries, num_partitions)),
- num_partitions_(num_partitions),
- hash_table_impl_type_(hash_table_impl_type),
- group_by_types_(group_by_types),
- agg_handle_(DCHECK_NOTNULL(agg_handle)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {
- initializeAllHashTables();
- }
-
- /**
- * @brief Constructor.
- *
* @note This constructor is relevant for the HashTable specialized for
* aggregation.
*
@@ -89,8 +59,6 @@ class PartitionedHashTablePool {
* @param hash_table_impl_type The type of hash table implementation.
* @param group_by_types A vector of pointer of types which form the group by
* key.
- * @param payload_sizes The sizes of the payload elements (i.e.
- * AggregationStates).
* @param handles The aggregation handles.
* @param storage_manager A pointer to the storage manager.
**/
@@ -98,7 +66,6 @@ class PartitionedHashTablePool {
const std::size_t num_partitions,
const HashTableImplType hash_table_impl_type,
const std::vector<const Type *> &group_by_types,
- const std::vector<std::size_t> &payload_sizes,
const std::vector<AggregationHandle *> &handles,
StorageManager *storage_manager)
: estimated_num_entries_(
@@ -106,7 +73,6 @@ class PartitionedHashTablePool {
num_partitions_(num_partitions),
hash_table_impl_type_(hash_table_impl_type),
group_by_types_(group_by_types),
- payload_sizes_(payload_sizes),
handles_(handles),
storage_manager_(DCHECK_NOTNULL(storage_manager)) {
initializeAllHashTables();
@@ -150,25 +116,17 @@ class PartitionedHashTablePool {
private:
void initializeAllHashTables() {
for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) {
- AggregationStateHashTableBase *part_hash_table = createNewHashTableFast();
+ AggregationStateHashTableBase *part_hash_table = createNewHashTable();
hash_tables_.push_back(
std::unique_ptr<AggregationStateHashTableBase>(part_hash_table));
}
}
AggregationStateHashTableBase* createNewHashTable() {
- return agg_handle_->createGroupByHashTable(hash_table_impl_type_,
- group_by_types_,
- estimated_num_entries_,
- storage_manager_);
- }
-
- AggregationStateHashTableBase* createNewHashTableFast() {
- return AggregationStateFastHashTableFactory::CreateResizable(
+ return AggregationStateHashTableFactory::CreateResizable(
hash_table_impl_type_,
group_by_types_,
estimated_num_entries_,
- payload_sizes_,
handles_,
storage_manager_);
}
@@ -189,10 +147,6 @@ class PartitionedHashTablePool {
const HashTableImplType hash_table_impl_type_;
const std::vector<const Type *> group_by_types_;
-
- std::vector<std::size_t> payload_sizes_;
-
- AggregationHandle *agg_handle_;
const std::vector<AggregationHandle *> handles_;
StorageManager *storage_manager_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27380a69/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index de2d25b..0cc7735 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -19,8 +19,8 @@
#include "storage/StorageBlock.hpp"
-#include <climits>
#include <memory>
+#include <random>
#include <type_traits>
#include <unordered_map>
#include <utility>
@@ -28,7 +28,6 @@
#include "catalog/CatalogRelationSchema.hpp"
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "storage/BasicColumnStoreTupleStorageSubBlock.hpp"
@@ -37,7 +36,6 @@
#include "storage/CompressedColumnStoreTupleStorageSubBlock.hpp"
#include "storage/CompressedPackedRowStoreTupleStorageSubBlock.hpp"
#include "storage/CountedReference.hpp"
-#include "storage/HashTableBase.hpp"
#include "storage/IndexSubBlock.hpp"
#include "storage/InsertDestinationInterface.hpp"
#include "storage/SMAIndexSubBlock.hpp"
@@ -396,166 +394,6 @@ void StorageBlock::selectSimple(const std::vector<attribute_id> &selection,
accessor.get());
}
-AggregationState* StorageBlock::aggregate(
- const AggregationHandle &handle,
- const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<attribute_id> *arguments_as_attributes,
- const TupleIdSequence *filter) const {
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all the arguments to this aggregate are plain relation attributes,
- // aggregate directly on a ValueAccessor from this block to avoid a copy.
- if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
- DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
- << "Mismatch between number of arguments and number of attribute_ids";
- return aggregateHelperValueAccessor(handle, *arguments_as_attributes, filter);
- }
- // TODO(shoban): We may want to optimize for ScalarLiteral here.
-#endif
-
- // Call aggregateHelperColumnVector() to materialize each argument as a
- // ColumnVector, then aggregate over those.
- return aggregateHelperColumnVector(handle, arguments, filter);
-}
-
-void StorageBlock::aggregateGroupBy(
- const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const TupleIdSequence *filter,
- AggregationStateHashTableBase *hash_table,
- std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
- DCHECK_GT(group_by.size(), 0u)
- << "Called aggregateGroupBy() with zero GROUP BY expressions";
-
- SubBlocksReference sub_blocks_ref(*tuple_store_,
- indices_,
- indices_consistent_);
-
- // IDs of 'arguments' as attributes in the ValueAccessor we create below.
- std::vector<attribute_id> argument_ids;
-
- // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
- std::vector<attribute_id> key_ids;
-
- // An intermediate ValueAccessor that stores the materialized 'arguments' for
- // this aggregate, as well as the GROUP BY expression values.
- ColumnVectorsValueAccessor temp_result;
- {
- std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
- attribute_id attr_id = 0;
-
- // First, put GROUP BY keys into 'temp_result'.
- if (reuse_group_by_vectors->empty()) {
- // Compute GROUP BY values from group_by Scalars, and store them in
- // reuse_group_by_vectors for reuse by other aggregates on this same
- // block.
- reuse_group_by_vectors->reserve(group_by.size());
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
- reuse_group_by_vectors->emplace_back(
- group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
- temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
- key_ids.push_back(attr_id++);
- }
- } else {
- // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
- DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
- << "Wrong number of reuse_group_by_vectors";
- for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
- temp_result.addColumn(reuse_cv.get(), false);
- key_ids.push_back(attr_id++);
- }
- }
-
- // Compute argument vectors and add them to 'temp_result'.
- for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
- for (const std::unique_ptr<const Scalar> &args : argument) {
- temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
- argument_ids.push_back(attr_id++);
- }
- if (argument.empty()) {
- argument_ids.push_back(kInvalidAttributeID);
- }
- }
- }
-
- hash_table->upsertValueAccessorCompositeKeyFast(argument_ids,
- &temp_result,
- key_ids,
- true);
-}
-
-
-void StorageBlock::aggregateDistinct(
- const AggregationHandle &handle,
- const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<attribute_id> *arguments_as_attributes,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const TupleIdSequence *filter,
- AggregationStateHashTableBase *distinctify_hash_table,
- std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
- DCHECK_GT(arguments.size(), 0u)
- << "Called aggregateDistinct() with zero argument expressions";
- DCHECK((group_by.size() == 0 || reuse_group_by_vectors != nullptr));
-
- std::vector<attribute_id> key_ids;
-
- // An intermediate ValueAccessor that stores the materialized 'arguments' for
- // this aggregate, as well as the GROUP BY expression values.
- ColumnVectorsValueAccessor temp_result;
- {
- std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all the arguments to this aggregate are plain relation attributes,
- // aggregate directly on a ValueAccessor from this block to avoid a copy.
- if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
- DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
- << "Mismatch between number of arguments and number of attribute_ids";
- DCHECK_EQ(group_by.size(), 0u);
- handle.insertValueAccessorIntoDistinctifyHashTable(
- accessor.get(), *arguments_as_attributes, distinctify_hash_table);
- return;
- }
-#endif
-
- SubBlocksReference sub_blocks_ref(*tuple_store_,
- indices_,
- indices_consistent_);
- attribute_id attr_id = 0;
-
- if (!group_by.empty()) {
- // Put GROUP BY keys into 'temp_result'.
- if (reuse_group_by_vectors->empty()) {
- // Compute GROUP BY values from group_by Scalars, and store them in
- // reuse_group_by_vectors for reuse by other aggregates on this same
- // block.
- reuse_group_by_vectors->reserve(group_by.size());
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
- reuse_group_by_vectors->emplace_back(
- group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
- temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
- key_ids.push_back(attr_id++);
- }
- } else {
- // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
- DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
- << "Wrong number of reuse_group_by_vectors";
- for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
- temp_result.addColumn(reuse_cv.get(), false);
- key_ids.push_back(attr_id++);
- }
- }
- }
- // Compute argument vectors and add them to 'temp_result'.
- for (const std::unique_ptr<const Scalar> &argument : arguments) {
- temp_result.addColumn(argument->getAllValues(accessor.get(), &sub_blocks_ref));
- key_ids.push_back(attr_id++);
- }
- }
-
- handle.insertValueAccessorIntoDistinctifyHashTable(
- &temp_result, key_ids, distinctify_hash_table);
-}
-
// TODO(chasseur): Vectorization for updates.
StorageBlock::UpdateResult StorageBlock::update(
const unordered_map<attribute_id, unique_ptr<const Scalar>> &assignments,
@@ -1262,61 +1100,6 @@ std::unordered_map<attribute_id, TypedValue>* StorageBlock::generateUpdatedValue
return update_map;
}
-AggregationState* StorageBlock::aggregateHelperColumnVector(
- const AggregationHandle &handle,
- const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const TupleIdSequence *matches) const {
- if (arguments.empty()) {
- // Special case. This is a nullary aggregate (i.e. COUNT(*)).
- return handle.accumulateNullary(matches == nullptr ? tuple_store_->numTuples()
- : matches->size());
- } else {
- // Set up a ValueAccessor that will be used when materializing argument
- // values below (possibly filtered based on the '*matches' to a filter
- // predicate).
- std::unique_ptr<ValueAccessor> accessor;
- if (matches == nullptr) {
- accessor.reset(tuple_store_->createValueAccessor());
- } else {
- accessor.reset(tuple_store_->createValueAccessor(matches));
- }
-
- SubBlocksReference sub_blocks_ref(*tuple_store_,
- indices_,
- indices_consistent_);
-
- // Materialize each argument's values for this block as a ColumnVector.
- std::vector<std::unique_ptr<ColumnVector>> column_vectors;
- for (const std::unique_ptr<const Scalar> &argument : arguments) {
- column_vectors.emplace_back(argument->getAllValues(accessor.get(), &sub_blocks_ref));
- }
-
- // Have the AggregationHandle actually do the aggregation.
- return handle.accumulateColumnVectors(column_vectors);
- }
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* StorageBlock::aggregateHelperValueAccessor(
- const AggregationHandle &handle,
- const std::vector<attribute_id> &argument_ids,
- const TupleIdSequence *matches) const {
- // Set up a ValueAccessor to aggregate over (possibly filtered based on the
- // '*matches' to a filter predicate).
- std::unique_ptr<ValueAccessor> accessor;
- if (matches == nullptr) {
- accessor.reset(tuple_store_->createValueAccessor());
- } else {
- accessor.reset(tuple_store_->createValueAccessor(matches));
- }
-
- // Have the AggregationHandle actually do the aggregation.
- return handle.accumulateValueAccessor(
- accessor.get(),
- argument_ids);
-}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
void StorageBlock::updateHeader() {
DEBUG_ASSERT(*static_cast<const int*>(block_memory_) == block_header_.ByteSize());
@@ -1346,59 +1129,4 @@ const std::size_t StorageBlock::getNumTuples() const {
return tuple_store_->numTuples();
}
-void StorageBlock::aggregateGroupByPartitioned(
- const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const TupleIdSequence *filter,
- const std::size_t num_partitions,
- ColumnVectorsValueAccessor *temp_result,
- std::vector<attribute_id> *argument_ids,
- std::vector<attribute_id> *key_ids,
- std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
- DCHECK(!group_by.empty())
- << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions";
-
- SubBlocksReference sub_blocks_ref(*tuple_store_,
- indices_,
- indices_consistent_);
-
- std::unique_ptr<ValueAccessor> accessor(
- tuple_store_->createValueAccessor(filter));
-
- attribute_id attr_id = 0;
-
- // First, put GROUP BY keys into 'temp_result'.
- if (reuse_group_by_vectors->empty()) {
- // Compute GROUP BY values from group_by Scalars, and store them in
- // reuse_group_by_vectors for reuse by other aggregates on this same
- // block.
- reuse_group_by_vectors->reserve(group_by.size());
- for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
- reuse_group_by_vectors->emplace_back(
- group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
- temp_result->addColumn(reuse_group_by_vectors->back().get(), false);
- key_ids->push_back(attr_id++);
- }
- } else {
- // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
- DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
- << "Wrong number of reuse_group_by_vectors";
- for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
- temp_result->addColumn(reuse_cv.get(), false);
- key_ids->push_back(attr_id++);
- }
- }
-
- // Compute argument vectors and add them to 'temp_result'.
- for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
- for (const std::unique_ptr<const Scalar> &args : argument) {
- temp_result->addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
- argument_ids->push_back(attr_id++);
- }
- if (argument.empty()) {
- argument_ids->push_back(kInvalidAttributeID);
- }
- }
-}
-
} // namespace quickstep