You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/04 04:44:29 UTC
[1/3] incubator-quickstep git commit: Updates
Repository: incubator-quickstep
Updated Branches:
refs/heads/collision-free-agg 3bcb5c892 -> 1e7a92a94
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/PackedPayloadHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
new file mode 100644
index 0000000..adf4b5c
--- /dev/null
+++ b/storage/PackedPayloadHashTable.cpp
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/PackedPayloadHashTable.hpp"
+
+namespace quickstep {
+
+PackedPayloadHashTable::PackedPayloadHashTable(
+ const std::vector<const Type *> &key_types,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager)
+ : key_types_(key_types),
+ num_handles_(handles.size()),
+ handles_(handles),
+ total_payload_size_(ComputeTotalPayloadSize(handles)),
+ storage_manager_(storage_manager),
+ kBucketAlignment(alignof(std::atomic<std::size_t>)),
+ kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
+ key_manager_(key_types_, kValueOffset + total_payload_size_),
+ bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
+ std::size_t payload_offset_running_sum = sizeof(SpinMutex);
+ for (const auto *handle : handles) {
+ payload_offsets_.emplace_back(payload_offset_running_sum);
+ payload_offset_running_sum += handle->getPayloadSize();
+ }
+
+ // NOTE(jianqiao): Potential memory leak / double freeing by copying from
+ // init_payload to buckets if payload contains out of line data.
+ init_payload_ =
+ static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
+ DCHECK(init_payload_ != nullptr);
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
+ }
+
+ // Bucket size always rounds up to the alignment requirement of the atomic
+ // size_t "next" pointer at the front or a ValueT, whichever is larger.
+ //
+ // Give base HashTable information about what key components are stored
+ // inline from 'key_manager_'.
+ setKeyInline(key_manager_.getKeyInline());
+
+ // Pick out a prime number of slots and calculate storage requirements.
+ std::size_t num_slots_tmp =
+ get_next_prime_number(num_entries * kHashTableLoadFactor);
+ std::size_t required_memory =
+ sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
+ (num_slots_tmp / kHashTableLoadFactor) *
+ (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
+ std::size_t num_storage_slots =
+ this->storage_manager_->SlotsNeededForBytes(required_memory);
+ if (num_storage_slots == 0) {
+ FATAL_ERROR(
+ "Storage requirement for SeparateChainingHashTable "
+ "exceeds maximum allocation size.");
+ }
+
+ // Get a StorageBlob to hold the hash table.
+ const block_id blob_id =
+ this->storage_manager_->createBlob(num_storage_slots);
+ this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
+
+ void *aligned_memory_start = this->blob_->getMemoryMutable();
+ std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
+ if (align(alignof(Header),
+ sizeof(Header),
+ aligned_memory_start,
+ available_memory) == nullptr) {
+ // With current values from StorageConstants.hpp, this should be
+ // impossible. A blob is at least 1 MB, while a Header has alignment
+ // requirement of just kCacheLineBytes (64 bytes).
+ FATAL_ERROR(
+ "StorageBlob used to hold resizable "
+ "SeparateChainingHashTable is too small to meet alignment "
+ "requirements of SeparateChainingHashTable::Header.");
+ } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
+ // This should also be impossible, since the StorageManager allocates slots
+ // aligned to kCacheLineBytes.
+ DEV_WARNING("StorageBlob memory adjusted by "
+ << (num_storage_slots * kSlotSizeBytes - available_memory)
+ << " bytes to meet alignment requirement for "
+ << "SeparateChainingHashTable::Header.");
+ }
+
+ // Locate the header.
+ header_ = static_cast<Header *>(aligned_memory_start);
+ aligned_memory_start =
+ static_cast<char *>(aligned_memory_start) + sizeof(Header);
+ available_memory -= sizeof(Header);
+
+ // Recompute the number of slots & buckets using the actual available memory.
+ // Most likely, we got some extra free bucket space due to "rounding up" to
+ // the storage blob's size. It's also possible (though very unlikely) that we
+ // will wind up with fewer buckets than we initially wanted because of screwy
+ // alignment requirements for ValueT.
+ std::size_t num_buckets_tmp =
+ available_memory /
+ (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
+ key_manager_.getEstimatedVariableKeySize());
+ num_slots_tmp =
+ get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor);
+ num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
+ DEBUG_ASSERT(num_slots_tmp > 0);
+ DEBUG_ASSERT(num_buckets_tmp > 0);
+
+ // Locate the slot array.
+ slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
+ aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+ sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+ available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+
+ // Locate the buckets.
+ buckets_ = aligned_memory_start;
+ // Extra-paranoid: If ValueT has an alignment requirement greater than that
+ // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
+ // array.
+ if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) ==
+ nullptr) {
+ FATAL_ERROR(
+ "StorageBlob used to hold resizable "
+ "SeparateChainingHashTable is too small to meet "
+ "alignment requirements of buckets.");
+ } else if (buckets_ != aligned_memory_start) {
+ DEV_WARNING(
+ "Bucket array start position adjusted to meet alignment "
+ "requirement for SeparateChainingHashTable's value type.");
+ if (num_buckets_tmp * bucket_size_ > available_memory) {
+ --num_buckets_tmp;
+ }
+ }
+
+ // Fill in the header.
+ header_->num_slots = num_slots_tmp;
+ header_->num_buckets = num_buckets_tmp;
+ header_->buckets_allocated.store(0, std::memory_order_relaxed);
+ header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+ available_memory -= bucket_size_ * (header_->num_buckets);
+
+ // Locate variable-length key storage region, and give it all the remaining
+ // bytes in the blob.
+ key_manager_.setVariableLengthStorageInfo(
+ static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_,
+ available_memory,
+ &(header_->variable_length_bytes_allocated));
+}
+
+PackedPayloadHashTable::~PackedPayloadHashTable() {
+ if (blob_.valid()) {
+ const block_id blob_id = blob_->getID();
+ blob_.release();
+ storage_manager_->deleteBlockOrBlobFile(blob_id);
+ }
+ std::free(init_payload_);
+}
+
+void PackedPayloadHashTable::clear() {
+ const std::size_t used_buckets =
+ header_->buckets_allocated.load(std::memory_order_relaxed);
+ // Destroy existing values, if necessary.
+ destroyPayload();
+
+ // Zero-out slot array.
+ std::memset(
+ slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
+
+ // Zero-out used buckets.
+ std::memset(buckets_, 0x0, used_buckets * bucket_size_);
+
+ header_->buckets_allocated.store(0, std::memory_order_relaxed);
+ header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+ key_manager_.zeroNextVariableLengthKeyOffset();
+}
+
+void PackedPayloadHashTable::destroyPayload() {
+ const std::size_t num_buckets =
+ header_->buckets_allocated.load(std::memory_order_relaxed);
+ void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset;
+ for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
+ for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) {
+ void *value_internal_ptr =
+ static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id];
+ handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr));
+ }
+ bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_;
+ }
+}
+
+bool PackedPayloadHashTable::upsertValueAccessor(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_attr_ids,
+ ValueAccessorMultiplexer *accessor_mux) {
+ DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
+ == ValueAccessor::Implementation::kColumnVectors);
+ ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
+ ColumnVectorsValueAccessor *derived_accessor =
+ static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor());
+
+ if (derived_accessor == nullptr) {
+ return upsertValueAccessorCompositeKeyInternal<false>(argument_ids,
+ key_attr_ids,
+ base_accessor,
+ derived_accessor);
+ } else {
+ return upsertValueAccessorCompositeKeyInternal<true>(argument_ids,
+ key_attr_ids,
+ base_accessor,
+ derived_accessor);
+ }
+}
+
+void PackedPayloadHashTable::resize(const std::size_t extra_buckets,
+ const std::size_t extra_variable_storage,
+ const std::size_t retry_num) {
+ // A retry should never be necessary with this implementation of HashTable.
+ // Separate chaining ensures that any resized hash table with more buckets
+ // than the original table will be able to hold more entries than the
+ // original.
+ DEBUG_ASSERT(retry_num == 0);
+
+ SpinSharedMutexExclusiveLock<true> write_lock(this->resize_shared_mutex_);
+
+ // Recheck whether the hash table is still full. Note that multiple threads
+ // might wait to rebuild this hash table simultaneously. Only the first one
+ // should do the rebuild.
+ if (!isFull(extra_variable_storage)) {
+ return;
+ }
+
+ // Approximately double the number of buckets and slots.
+ //
+ // TODO(chasseur): It may be worth it to more than double the number of
+ // buckets here so that we can maintain a good, sparse fill factor for a
+ // longer time as more values are inserted. Such behavior should take into
+ // account kHashTableLoadFactor.
+ std::size_t resized_num_slots = get_next_prime_number(
+ (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2);
+ std::size_t variable_storage_required =
+ (resized_num_slots / kHashTableLoadFactor) *
+ key_manager_.getEstimatedVariableKeySize();
+ const std::size_t original_variable_storage_used =
+ header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+ // If this resize was triggered by a too-large variable-length key, bump up
+ // the variable-length storage requirement.
+ if ((extra_variable_storage > 0) &&
+ (extra_variable_storage + original_variable_storage_used >
+ key_manager_.getVariableLengthKeyStorageSize())) {
+ variable_storage_required += extra_variable_storage;
+ }
+
+ const std::size_t resized_memory_required =
+ sizeof(Header) + resized_num_slots * sizeof(std::atomic<std::size_t>) +
+ (resized_num_slots / kHashTableLoadFactor) * bucket_size_ +
+ variable_storage_required;
+ const std::size_t resized_storage_slots =
+ this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
+ if (resized_storage_slots == 0) {
+ FATAL_ERROR(
+ "Storage requirement for resized SeparateChainingHashTable "
+ "exceeds maximum allocation size.");
+ }
+
+ // Get a new StorageBlob to hold the resized hash table.
+ const block_id resized_blob_id =
+ this->storage_manager_->createBlob(resized_storage_slots);
+ MutableBlobReference resized_blob =
+ this->storage_manager_->getBlobMutable(resized_blob_id);
+
+ // Locate data structures inside the new StorageBlob.
+ void *aligned_memory_start = resized_blob->getMemoryMutable();
+ std::size_t available_memory = resized_storage_slots * kSlotSizeBytes;
+ if (align(alignof(Header),
+ sizeof(Header),
+ aligned_memory_start,
+ available_memory) == nullptr) {
+ // Should be impossible, as noted in constructor.
+ FATAL_ERROR(
+ "StorageBlob used to hold resized SeparateChainingHashTable "
+ "is too small to meet alignment requirements of "
+ "LinearOpenAddressingHashTable::Header.");
+ } else if (aligned_memory_start != resized_blob->getMemoryMutable()) {
+ // Again, should be impossible.
+ DEV_WARNING("In SeparateChainingHashTable::resize(), StorageBlob "
+ << "memory adjusted by "
+ << (resized_num_slots * kSlotSizeBytes - available_memory)
+ << " bytes to meet alignment requirement for "
+ << "LinearOpenAddressingHashTable::Header.");
+ }
+
+ Header *resized_header = static_cast<Header *>(aligned_memory_start);
+ aligned_memory_start =
+ static_cast<char *>(aligned_memory_start) + sizeof(Header);
+ available_memory -= sizeof(Header);
+
+ // As in constructor, recompute the number of slots and buckets using the
+ // actual available memory.
+ std::size_t resized_num_buckets =
+ (available_memory - extra_variable_storage) /
+ (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
+ key_manager_.getEstimatedVariableKeySize());
+ resized_num_slots =
+ get_previous_prime_number(resized_num_buckets * kHashTableLoadFactor);
+ resized_num_buckets = resized_num_slots / kHashTableLoadFactor;
+
+ // Locate slot array.
+ std::atomic<std::size_t> *resized_slots =
+ static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
+ aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+ sizeof(std::atomic<std::size_t>) * resized_num_slots;
+ available_memory -= sizeof(std::atomic<std::size_t>) * resized_num_slots;
+
+ // As in constructor, we will be extra paranoid and use align() to locate the
+ // start of the array of buckets, as well.
+ void *resized_buckets = aligned_memory_start;
+ if (align(
+ kBucketAlignment, bucket_size_, resized_buckets, available_memory) ==
+ nullptr) {
+ FATAL_ERROR(
+ "StorageBlob used to hold resized SeparateChainingHashTable "
+ "is too small to meet alignment requirements of buckets.");
+ } else if (resized_buckets != aligned_memory_start) {
+ DEV_WARNING(
+ "Bucket array start position adjusted to meet alignment "
+ "requirement for SeparateChainingHashTable's value type.");
+ if (resized_num_buckets * bucket_size_ + variable_storage_required >
+ available_memory) {
+ --resized_num_buckets;
+ }
+ }
+ aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+ resized_num_buckets * bucket_size_;
+ available_memory -= resized_num_buckets * bucket_size_;
+
+ void *resized_variable_length_key_storage = aligned_memory_start;
+ const std::size_t resized_variable_length_key_storage_size = available_memory;
+
+ const std::size_t original_buckets_used =
+ header_->buckets_allocated.load(std::memory_order_relaxed);
+
+ // Initialize the header.
+ resized_header->num_slots = resized_num_slots;
+ resized_header->num_buckets = resized_num_buckets;
+ resized_header->buckets_allocated.store(original_buckets_used,
+ std::memory_order_relaxed);
+ resized_header->variable_length_bytes_allocated.store(
+ original_variable_storage_used, std::memory_order_relaxed);
+
+ // Bulk-copy buckets. This is safe because:
+ // 1. The "next" pointers will be adjusted when rebuilding chains below.
+ // 2. The hash codes will stay the same.
+ // 3. For key components:
+ // a. Inline keys will stay exactly the same.
+ // b. Offsets into variable-length storage will remain valid, because
+ // we also do a byte-for-byte copy of variable-length storage below.
+ // c. Absolute external pointers will still point to the same address.
+ // d. Relative pointers are not used with resizable hash tables.
+ // 4. If values are not trivially copyable, then we invoke ValueT's copy
+ // or move constructor with placement new.
+ // NOTE(harshad) - Regarding point 4 above, as this is a specialized
+ // hash table implemented for aggregation, the values are trivially copyable,
+ // therefore we don't need to invoke payload values' copy/move constructors.
+ std::memcpy(resized_buckets, buckets_, original_buckets_used * bucket_size_);
+
+ // Copy over variable-length key components, if any.
+ if (original_variable_storage_used > 0) {
+ DEBUG_ASSERT(original_variable_storage_used ==
+ key_manager_.getNextVariableLengthKeyOffset());
+ DEBUG_ASSERT(original_variable_storage_used <=
+ resized_variable_length_key_storage_size);
+ std::memcpy(resized_variable_length_key_storage,
+ key_manager_.getVariableLengthKeyStorage(),
+ original_variable_storage_used);
+ }
+
+ destroyPayload();
+
+ // Make resized structures active.
+ std::swap(this->blob_, resized_blob);
+ header_ = resized_header;
+ slots_ = resized_slots;
+ buckets_ = resized_buckets;
+ key_manager_.setVariableLengthStorageInfo(
+ resized_variable_length_key_storage,
+ resized_variable_length_key_storage_size,
+ &(resized_header->variable_length_bytes_allocated));
+
+ // Drop the old blob.
+ const block_id old_blob_id = resized_blob->getID();
+ resized_blob.release();
+ this->storage_manager_->deleteBlockOrBlobFile(old_blob_id);
+
+ // Rebuild chains.
+ void *current_bucket = buckets_;
+ for (std::size_t bucket_num = 0; bucket_num < original_buckets_used;
+ ++bucket_num) {
+ std::atomic<std::size_t> *next_ptr =
+ static_cast<std::atomic<std::size_t> *>(current_bucket);
+ const std::size_t hash_code = *reinterpret_cast<const std::size_t *>(
+ static_cast<const char *>(current_bucket) +
+ sizeof(std::atomic<std::size_t>));
+
+ const std::size_t slot_number = hash_code % header_->num_slots;
+ std::size_t slot_ptr_value = 0;
+ if (slots_[slot_number].compare_exchange_strong(
+ slot_ptr_value, bucket_num + 1, std::memory_order_relaxed)) {
+ // This bucket is the first in the chain for this block, so reset its
+ // next pointer to 0.
+ next_ptr->store(0, std::memory_order_relaxed);
+ } else {
+ // A chain already exists starting from this slot, so put this bucket at
+ // the head.
+ next_ptr->store(slot_ptr_value, std::memory_order_relaxed);
+ slots_[slot_number].store(bucket_num + 1, std::memory_order_relaxed);
+ }
+ current_bucket = static_cast<char *>(current_bucket) + bucket_size_;
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
new file mode 100644
index 0000000..a871d29
--- /dev/null
+++ b/storage/PackedPayloadHashTable.hpp
@@ -0,0 +1,798 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdlib>
+#include <limits>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/HashTableKeyManager.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleReference.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "threading/SpinMutex.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "types/Type.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/Alignment.hpp"
+#include "utility/HashPair.hpp"
+#include "utility/Macros.hpp"
+#include "utility/PrimeNumber.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ * @{
+ */
+
+class PackedPayloadHashTable : public AggregationStateHashTableBase {
+ public:
+ PackedPayloadHashTable(
+ const std::vector<const Type *> &key_types,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager);
+
+ ~PackedPayloadHashTable() override;
+
+ void clear();
+
+ void destroyPayload() override;
+
+ bool upsertValueAccessor(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ ValueAccessorMultiplexer *accessor_mux) override;
+
+ inline block_id getBlobId() const {
+ return blob_->getID();
+ }
+
+ inline std::size_t numEntries() const {
+ return header_->buckets_allocated.load(std::memory_order_relaxed);
+ }
+
+ inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
+ const std::uint8_t *source_state);
+
+ template <typename FunctorT>
+ inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
+ FunctorT *functor,
+ int index);
+
+ inline const std::uint8_t* getSingleCompositeKey(
+ const std::vector<TypedValue> &key) const;
+
+ inline const std::uint8_t* getSingleCompositeKey(
+ const std::vector<TypedValue> &key,
+ const int index) const;
+
+ template <typename FunctorT>
+ inline std::size_t forEach(FunctorT *functor) const;
+
+ template <typename FunctorT>
+ inline std::size_t forEach(FunctorT *functor, const int index) const;
+
+ template <typename FunctorT>
+ inline std::size_t forEachCompositeKey(FunctorT *functor) const;
+
+ template <typename FunctorT>
+ inline std::size_t forEachCompositeKey(FunctorT *functor,
+ const std::size_t index) const;
+
+ private:
+ void resize(const std::size_t extra_buckets,
+ const std::size_t extra_variable_storage,
+ const std::size_t retry_num = 0);
+
+ inline std::size_t calculateVariableLengthCompositeKeyCopySize(
+ const std::vector<TypedValue> &key) const {
+ std::size_t total = 0;
+ for (std::vector<TypedValue>::size_type idx = 0; idx < key.size(); ++idx) {
+ if (!(*key_inline_)[idx]) {
+ total += key[idx].getDataSize();
+ }
+ }
+ return total;
+ }
+
+ inline bool getNextEntry(TypedValue *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const;
+
+ inline bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const;
+
+ inline std::uint8_t* upsertCompositeKeyInternal(
+ const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size);
+
+ template <bool use_two_accessors>
+ inline bool upsertValueAccessorCompositeKeyInternal(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ ValueAccessor *base_accessor,
+ ColumnVectorsValueAccessor *derived_accessor);
+
+ // Generate a hash for a composite key by hashing each component of 'key' and
+ // mixing their bits with CombineHashes().
+ inline std::size_t hashCompositeKey(const std::vector<TypedValue> &key) const;
+
+ // Set information about which key components are stored inline. This usually
+ // comes from a HashTableKeyManager, and is set by the constructor of a
+ // subclass of HashTable.
+ inline void setKeyInline(const std::vector<bool> *key_inline) {
+ scalar_key_inline_ = key_inline->front();
+ key_inline_ = key_inline;
+ }
+
+ inline static std::size_t ComputeTotalPayloadSize(
+ const std::vector<AggregationHandle *> &handles) {
+ std::size_t total_payload_size = sizeof(SpinMutex);
+ for (const auto *handle : handles) {
+ total_payload_size += handle->getPayloadSize();
+ }
+ return total_payload_size;
+ }
+
+ // Assign '*key_vector' with the attribute values specified by 'key_ids' at
+ // the current position of 'accessor'. If 'check_for_null_keys' is true, stops
+ // and returns true if any of the values is null, otherwise returns false.
+ template <bool use_two_accessors,
+ bool check_for_null_keys,
+ typename ValueAccessorT>
+ inline static bool GetCompositeKeyFromValueAccessor(
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ const ValueAccessorT *accessor,
+ const ColumnVectorsValueAccessor *derived_accessor,
+ std::vector<TypedValue> *key_vector) {
+ for (std::size_t key_idx = 0; key_idx < key_ids.size(); ++key_idx) {
+ const MultiSourceAttributeId &key_id = key_ids[key_idx];
+ if (use_two_accessors && key_id.source == ValueAccessorSource::kDerived) {
+ (*key_vector)[key_idx] = derived_accessor->getTypedValue(key_id.attr_id);
+ } else {
+ (*key_vector)[key_idx] = accessor->getTypedValue(key_id.attr_id);
+ }
+ if (check_for_null_keys && (*key_vector)[key_idx].isNull()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ struct Header {
+ std::size_t num_slots;
+ std::size_t num_buckets;
+ alignas(kCacheLineBytes) std::atomic<std::size_t> buckets_allocated;
+ alignas(kCacheLineBytes)
+ std::atomic<std::size_t> variable_length_bytes_allocated;
+ };
+
+ // Type(s) of keys.
+ const std::vector<const Type *> key_types_;
+
+ // Information about whether key components are stored inline or in a
+ // separate variable-length storage region. This is usually determined by a
+ // HashTableKeyManager and set by calling setKeyInline().
+ bool scalar_key_inline_;
+ const std::vector<bool> *key_inline_;
+
+ const std::size_t num_handles_;
+ const std::vector<AggregationHandle *> handles_;
+
+ std::size_t total_payload_size_;
+ std::vector<std::size_t> payload_offsets_;
+ std::uint8_t *init_payload_;
+
+ StorageManager *storage_manager_;
+ MutableBlobReference blob_;
+
+ // Locked in shared mode for most operations, exclusive mode during resize.
+ // Not locked at all for non-resizable HashTables.
+ alignas(kCacheLineBytes) SpinSharedMutex<true> resize_shared_mutex_;
+
+ std::size_t kBucketAlignment;
+
+ // Value's offset in a bucket is the first alignof(ValueT) boundary after the
+ // next pointer and hash code.
+ std::size_t kValueOffset;
+
+ // Round bucket size up to a multiple of kBucketAlignment.
+ constexpr std::size_t ComputeBucketSize(const std::size_t fixed_key_size) {
+ return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) /
+ kBucketAlignment) +
+ 1) *
+ kBucketAlignment;
+ }
+
+ // Attempt to find an empty bucket to insert 'hash_code' into, starting after
+ // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
+ // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
+ // empty bucket is found. Returns false if 'allow_duplicate_keys' is false
+ // and a hash collision is found (caller should then check whether there is a
+ // genuine key collision or the hash collision is spurious). Returns false
+ // and sets '*bucket' to NULL if there are no more empty buckets in the hash
+ // table. If 'variable_key_allocation_required' is nonzero, this method will
+ // attempt to allocate storage for a variable-length key BEFORE allocating a
+ // bucket, so that no bucket number below 'header_->num_buckets' is ever
+ // deallocated after being allocated.
+ inline bool locateBucketForInsertion(
+ const std::size_t hash_code,
+ const std::size_t variable_key_allocation_required,
+ void **bucket,
+ std::atomic<std::size_t> **pending_chain_ptr,
+ std::size_t *pending_chain_ptr_finish_value);
+
+ // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was
+ // found by locateBucketForInsertion(). Assumes that storage for a
+ // variable-length key copy (if any) was already allocated by a successful
+ // call to allocateVariableLengthKeyStorage().
+ inline void writeScalarKeyToBucket(
+ const TypedValue &key,
+ const std::size_t hash_code,
+ void *bucket);
+
+ // Write a composite 'key' and its 'hash_code' into the '*bucket', which was
+ // found by locateBucketForInsertion(). Assumes that storage for
+ // variable-length key copies (if any) was already allocated by a successful
+ // call to allocateVariableLengthKeyStorage().
+ inline void writeCompositeKeyToBucket(
+ const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ void *bucket);
+
+ // Determine whether it is actually necessary to resize this hash table.
+ // Checks that there is at least one unallocated bucket, and that there is
+ // at least 'extra_variable_storage' bytes of variable-length storage free.
+ inline bool isFull(const std::size_t extra_variable_storage) const;
+
+ // Helper object to manage key storage.
+ HashTableKeyManager<false, true> key_manager_;
+
+ // In-memory structure is as follows:
+ // - SeparateChainingHashTable::Header
+ // - Array of slots, interpreted as follows:
+ // - 0 = Points to nothing (empty)
+ // - SIZE_T_MAX = Pending (some thread is starting a chain from this
+ // slot and will overwrite it soon)
+ // - Anything else = The number of the first bucket in the chain for
+ // this slot PLUS ONE (i.e. subtract one to get the actual bucket
+ // number).
+ // - Array of buckets, each of which is:
+ // - atomic size_t "next" pointer, interpreted the same as slots above.
+ // - size_t hash value
+ // - possibly some unused bytes as needed so that ValueT's alignment
+ // requirement is met
+ // - ValueT value slot
+ // - fixed-length key storage (which may include pointers to external
+ // memory or offsets of variable length keys stored within this hash
+ // table)
+ // - possibly some additional unused bytes so that bucket size is a
+ // multiple of both alignof(std::atomic<std::size_t>) and
+ // alignof(ValueT)
+ // - Variable-length key storage region (referenced by offsets stored in
+ // fixed-length keys).
+ Header *header_;
+
+ std::atomic<std::size_t> *slots_;
+ void *buckets_;
+ const std::size_t bucket_size_;
+
+ DISALLOW_COPY_AND_ASSIGN(PackedPayloadHashTable);
+};
+
+/** @} */
+
+// ----------------------------------------------------------------------------
+// Implementations of template class methods follow.
+
+class HashTableMerger {
+ public:
+ /**
+ * @brief Constructor
+ *
+ * @param handle The Aggregation handle being used.
+ * @param destination_hash_table The destination hash table to which other
+ * hash tables will be merged.
+ **/
+ explicit HashTableMerger(PackedPayloadHashTable *destination_hash_table)
+ : destination_hash_table_(destination_hash_table) {}
+
+ /**
+ * @brief The operator for the functor.
+ *
+ * @param group_by_key The group by key being merged.
+ * @param source_state The aggregation state for the given key in the source
+ * aggregation hash table.
+ **/
+ inline void operator()(const std::vector<TypedValue> &group_by_key,
+ const std::uint8_t *source_state) {
+ destination_hash_table_->upsertCompositeKey(group_by_key, source_state);
+ }
+
+ private:
+ PackedPayloadHashTable *destination_hash_table_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
+};
+
+inline std::size_t PackedPayloadHashTable::hashCompositeKey(
+ const std::vector<TypedValue> &key) const {
+ DEBUG_ASSERT(!key.empty());
+ DEBUG_ASSERT(key.size() == key_types_.size());
+ std::size_t hash = key.front().getHash();
+ for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
+ key_it != key.end();
+ ++key_it) {
+ hash = CombineHashes(hash, key_it->getHash());
+ }
+ return hash;
+}
+
+inline bool PackedPayloadHashTable::getNextEntry(TypedValue *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const {
+ if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
+ *key = key_manager_.getKeyComponentTyped(bucket, 0);
+ *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ ++(*entry_num);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+
+inline bool PackedPayloadHashTable::getNextEntryCompositeKey(
+ std::vector<TypedValue> *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const {
+ if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
+ for (std::vector<const Type *>::size_type key_idx = 0;
+ key_idx < this->key_types_.size();
+ ++key_idx) {
+ key->emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx));
+ }
+ *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ ++(*entry_num);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+
+inline bool PackedPayloadHashTable::locateBucketForInsertion(
+ const std::size_t hash_code,
+ const std::size_t variable_key_allocation_required,
+ void **bucket,
+ std::atomic<std::size_t> **pending_chain_ptr,
+ std::size_t *pending_chain_ptr_finish_value) {
+ if (*bucket == nullptr) {
+ *pending_chain_ptr = &(slots_[hash_code % header_->num_slots]);
+ } else {
+ *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
+ }
+ for (;;) {
+ std::size_t existing_chain_ptr = 0;
+ if ((*pending_chain_ptr)
+ ->compare_exchange_strong(existing_chain_ptr,
+ std::numeric_limits<std::size_t>::max(),
+ std::memory_order_acq_rel)) {
+ // Got to the end of the chain. Allocate a new bucket.
+
+ // First, allocate variable-length key storage, if needed (i.e. if this
+ // is an upsert and we didn't allocate up-front).
+ if (!key_manager_.allocateVariableLengthKeyStorage(
+ variable_key_allocation_required)) {
+ // Ran out of variable-length storage.
+ (*pending_chain_ptr)->store(0, std::memory_order_release);
+ *bucket = nullptr;
+ return false;
+ }
+
+ const std::size_t allocated_bucket_num =
+ header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed);
+ if (allocated_bucket_num >= header_->num_buckets) {
+ // Ran out of buckets.
+ header_->buckets_allocated.fetch_sub(1, std::memory_order_relaxed);
+ (*pending_chain_ptr)->store(0, std::memory_order_release);
+ *bucket = nullptr;
+ return false;
+ } else {
+ *bucket =
+ static_cast<char *>(buckets_) + allocated_bucket_num * bucket_size_;
+ *pending_chain_ptr_finish_value = allocated_bucket_num + 1;
+ return true;
+ }
+ }
+ // Spin until the real "next" pointer is available.
+ while (existing_chain_ptr == std::numeric_limits<std::size_t>::max()) {
+ existing_chain_ptr =
+ (*pending_chain_ptr)->load(std::memory_order_acquire);
+ }
+ if (existing_chain_ptr == 0) {
+ // Other thread had to roll back, so try again.
+ continue;
+ }
+ // Chase the next pointer.
+ *bucket =
+ static_cast<char *>(buckets_) + (existing_chain_ptr - 1) * bucket_size_;
+ *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
+ const std::size_t hash_in_bucket = *reinterpret_cast<const std::size_t *>(
+ static_cast<const char *>(*bucket) +
+ sizeof(std::atomic<std::size_t>));
+ if (hash_in_bucket == hash_code) {
+ return false;
+ }
+ }
+}
+
+inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey(
+ const std::vector<TypedValue> &key) const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ }
+ bucket_ref =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ }
+
+ // Reached the end of the chain and didn't find a match.
+ return nullptr;
+}
+
+inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey(
+ const std::vector<TypedValue> &key,
+ const int index) const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset) +
+ this->payload_offsets_[index];
+ }
+ bucket_ref =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ }
+
+ // Reached the end of the chain and didn't find a match.
+ return nullptr;
+}
+
+inline bool PackedPayloadHashTable::upsertCompositeKey(
+ const std::vector<TypedValue> &key,
+ const std::uint8_t *source_state) {
+ const std::size_t variable_size =
+ calculateVariableLengthCompositeKeyCopySize(key);
+ for (;;) {
+ {
+ SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
+ std::uint8_t *value =
+ upsertCompositeKeyInternal(key, variable_size);
+ if (value != nullptr) {
+ SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+ for (unsigned int k = 0; k < num_handles_; ++k) {
+ handles_[k]->mergeStates(source_state + payload_offsets_[k],
+ value + payload_offsets_[k]);
+ }
+ return true;
+ }
+ }
+ resize(0, variable_size);
+ }
+}
+
+template <typename FunctorT>
+inline bool PackedPayloadHashTable::upsertCompositeKey(
+ const std::vector<TypedValue> &key,
+ FunctorT *functor,
+ int index) {
+ const std::size_t variable_size =
+ calculateVariableLengthCompositeKeyCopySize(key);
+ for (;;) {
+ {
+ SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
+ std::uint8_t *value =
+ upsertCompositeKeyInternal(key, variable_size);
+ if (value != nullptr) {
+ (*functor)(value + payload_offsets_[index]);
+ return true;
+ }
+ }
+ resize(0, variable_size);
+ }
+}
+
+
+inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
+ const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size) {
+ if (variable_key_size > 0) {
+ // Don't allocate yet, since the key may already be present. However, we
+ // do check if either the allocated variable storage space OR the free
+ // space is big enough to hold the key (at least one must be true: either
+ // the key is already present and allocated, or we need to be able to
+ // allocate enough space for it).
+ std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(
+ std::memory_order_relaxed);
+ if ((allocated_bytes < variable_key_size) &&
+ (allocated_bytes + variable_key_size >
+ key_manager_.getVariableLengthKeyStorageSize())) {
+ return nullptr;
+ }
+ }
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ variable_key_size,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets or variable-key space.
+ return nullptr;
+ } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Found an already-existing entry for this key.
+ return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) +
+ kValueOffset);
+ }
+ }
+
+ // We are now writing to an empty bucket.
+ // Write the key and hash.
+ writeCompositeKeyToBucket(key, hash_code, bucket);
+
+ std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
+ std::memcpy(value, init_payload_, this->total_payload_size_);
+
+ // Update the previous chaing pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value,
+ std::memory_order_release);
+
+ // Return the value.
+ return value;
+}
+
+template <bool use_two_accessors>
+inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ ValueAccessor *base_accessor,
+ ColumnVectorsValueAccessor *derived_accessor) {
+ std::size_t variable_size;
+ std::vector<TypedValue> key_vector;
+ key_vector.resize(key_ids.size());
+
+ return InvokeOnAnyValueAccessor(
+ base_accessor,
+ [&](auto *accessor) -> bool { // NOLINT(build/c++11)
+ bool continuing = true;
+ while (continuing) {
+ {
+ continuing = false;
+ SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
+ while (accessor->next()) {
+ if (use_two_accessors) {
+ derived_accessor->next();
+ }
+ if (this->GetCompositeKeyFromValueAccessor<use_two_accessors, true>(
+ key_ids,
+ accessor,
+ derived_accessor,
+ &key_vector)) {
+ continue;
+ }
+ variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
+ std::uint8_t *value = this->upsertCompositeKeyInternal(
+ key_vector, variable_size);
+ if (value == nullptr) {
+ continuing = true;
+ break;
+ } else {
+ SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+ for (unsigned int k = 0; k < num_handles_; ++k) {
+ const auto &ids = argument_ids[k];
+ if (ids.empty()) {
+ handles_[k]->updateStateNullary(value + payload_offsets_[k]);
+ } else {
+ const MultiSourceAttributeId &arg_id = ids.front();
+ if (use_two_accessors && arg_id.source == ValueAccessorSource::kDerived) {
+ DCHECK_NE(arg_id.attr_id, kInvalidAttributeID);
+ handles_[k]->updateStateUnary(derived_accessor->getTypedValue(arg_id.attr_id),
+ value + payload_offsets_[k]);
+ } else {
+ handles_[k]->updateStateUnary(accessor->getTypedValue(arg_id.attr_id),
+ value + payload_offsets_[k]);
+ }
+ }
+ }
+ }
+ }
+ }
+ if (continuing) {
+ this->resize(0, variable_size);
+ accessor->previous();
+ if (use_two_accessors) {
+ derived_accessor->previous();
+ }
+ }
+ }
+ return true;
+ });
+}
+
+inline void PackedPayloadHashTable::writeScalarKeyToBucket(
+ const TypedValue &key,
+ const std::size_t hash_code,
+ void *bucket) {
+ *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
+ sizeof(std::atomic<std::size_t>)) =
+ hash_code;
+ key_manager_.writeKeyComponentToBucket(key, 0, bucket, nullptr);
+}
+
+inline void PackedPayloadHashTable::writeCompositeKeyToBucket(
+ const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ void *bucket) {
+ DEBUG_ASSERT(key.size() == this->key_types_.size());
+ *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
+ sizeof(std::atomic<std::size_t>)) =
+ hash_code;
+ for (std::size_t idx = 0; idx < this->key_types_.size(); ++idx) {
+ key_manager_.writeKeyComponentToBucket(key[idx], idx, bucket, nullptr);
+ }
+}
+
+inline bool PackedPayloadHashTable::isFull(
+ const std::size_t extra_variable_storage) const {
+ if (header_->buckets_allocated.load(std::memory_order_relaxed) >=
+ header_->num_buckets) {
+ // All buckets are allocated.
+ return true;
+ }
+
+ if (extra_variable_storage > 0) {
+ if (extra_variable_storage +
+ header_->variable_length_bytes_allocated.load(
+ std::memory_order_relaxed) >
+ key_manager_.getVariableLengthKeyStorageSize()) {
+ // Not enough variable-length key storage space.
+ return true;
+ }
+ }
+
+ return false;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEach(FunctorT *functor) const {
+ std::size_t entries_visited = 0;
+ std::size_t entry_num = 0;
+ TypedValue key;
+ const std::uint8_t *value_ptr;
+ while (getNextEntry(&key, &value_ptr, &entry_num)) {
+ ++entries_visited;
+ (*functor)(key, value_ptr);
+ }
+ return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEach(
+ FunctorT *functor, const int index) const {
+ std::size_t entries_visited = 0;
+ std::size_t entry_num = 0;
+ TypedValue key;
+ const std::uint8_t *value_ptr;
+ while (getNextEntry(&key, &value_ptr, &entry_num)) {
+ ++entries_visited;
+ (*functor)(key, value_ptr + payload_offsets_[index]);
+ key.clear();
+ }
+ return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEachCompositeKey(
+ FunctorT *functor) const {
+ std::size_t entries_visited = 0;
+ std::size_t entry_num = 0;
+ std::vector<TypedValue> key;
+ const std::uint8_t *value_ptr;
+ while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
+ ++entries_visited;
+ (*functor)(key, value_ptr);
+ key.clear();
+ }
+ return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEachCompositeKey(
+ FunctorT *functor,
+ const std::size_t index) const {
+ std::size_t entries_visited = 0;
+ std::size_t entry_num = 0;
+ std::vector<TypedValue> key;
+ const std::uint8_t *value_ptr;
+ while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
+ ++entries_visited;
+ (*functor)(key, value_ptr + payload_offsets_[index]);
+ key.clear();
+ }
+ return entries_visited;
+}
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
[3/3] incubator-quickstep git commit: Updates
Posted by ji...@apache.org.
Updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1e7a92a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1e7a92a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1e7a92a9
Branch: refs/heads/collision-free-agg
Commit: 1e7a92a94e0076d89151ea4a2ab4f68caa0572c0
Parents: 3bcb5c8
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Fri Feb 3 22:44:37 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Fri Feb 3 22:44:37 2017 -0600
----------------------------------------------------------------------
.../aggregation/AggregationConcreteHandle.cpp | 4 +-
.../aggregation/AggregationConcreteHandle.hpp | 21 +-
expressions/aggregation/CMakeLists.txt | 2 +-
storage/AggregationOperationState.cpp | 31 +-
storage/CMakeLists.txt | 24 +-
.../CollisionFreeAggregationStateHashTable.cpp | 285 -------
.../CollisionFreeAggregationStateHashTable.hpp | 621 --------------
storage/CollisionFreeVectorTable.cpp | 283 +++++++
storage/CollisionFreeVectorTable.hpp | 621 ++++++++++++++
storage/HashTableFactory.hpp | 8 +-
.../PackedPayloadAggregationStateHashTable.cpp | 439 ----------
.../PackedPayloadAggregationStateHashTable.hpp | 805 -------------------
storage/PackedPayloadHashTable.cpp | 436 ++++++++++
storage/PackedPayloadHashTable.hpp | 798 ++++++++++++++++++
14 files changed, 2180 insertions(+), 2198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index 5fd7e0f..bbce29f 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -24,7 +24,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "storage/HashTableFactory.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
#include "storage/ValueAccessorMultiplexer.hpp"
namespace quickstep {
@@ -57,7 +57,7 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
concatenated_ids.emplace_back(arg_id);
}
- static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(distinctify_hash_table)
+ static_cast<PackedPayloadHashTable *>(distinctify_hash_table)
->upsertValueAccessor({}, concatenated_ids, accessor_mux);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 5b49d0d..c8d61ff 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -29,7 +29,7 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/aggregation/AggregationID.hpp"
#include "storage/HashTableBase.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
#include "storage/ValueAccessorMultiplexer.hpp"
#include "threading/SpinMutex.hpp"
#include "types/TypedValue.hpp"
@@ -151,7 +151,7 @@ class AggregationConcreteHandle : public AggregationHandle {
const std::size_t index,
const std::vector<TypedValue> &group_key) const {
const std::uint8_t *group_state =
- static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(hash_table)
+ static_cast<const PackedPayloadHashTable &>(hash_table)
.getSingleCompositeKey(group_key, index);
DCHECK(group_state != nullptr)
<< "Could not find entry for specified group_key in HashTable";
@@ -217,8 +217,7 @@ StateT* AggregationConcreteHandle::
};
const auto &hash_table =
- static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
- distinctify_hash_table);
+ static_cast<const PackedPayloadHashTable &>(distinctify_hash_table);
// Invoke the lambda function "aggregate_functor" on each key from the
// distinctify hash table.
hash_table.forEach(&aggregate_functor);
@@ -233,9 +232,8 @@ void AggregationConcreteHandle::
const std::size_t index,
AggregationStateHashTableBase *aggregation_hash_table) const {
const HandleT &handle = static_cast<const HandleT &>(*this);
- PackedPayloadSeparateChainingAggregationStateHashTable *target_hash_table =
- static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
- aggregation_hash_table);
+ PackedPayloadHashTable *target_hash_table =
+ static_cast<PackedPayloadHashTable *>(aggregation_hash_table);
// A lambda function which will be called on each key-value pair from the
// distinctify hash table.
@@ -256,9 +254,8 @@ void AggregationConcreteHandle::
target_hash_table->upsertCompositeKey(key, &upserter, index);
};
- const PackedPayloadSeparateChainingAggregationStateHashTable &source_hash_table =
- static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
- distinctify_hash_table);
+ const PackedPayloadHashTable &source_hash_table =
+ static_cast<const PackedPayloadHashTable &>(distinctify_hash_table);
// Invoke the lambda function "aggregate_functor" on each composite key vector
// from the distinctify hash table.
source_hash_table.forEachCompositeKey(&aggregate_functor);
@@ -271,8 +268,8 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
const std::size_t index,
std::vector<std::vector<TypedValue>> *group_by_keys) const {
const HandleT &handle = static_cast<const HandleT &>(*this);
- const PackedPayloadSeparateChainingAggregationStateHashTable &hash_table_concrete =
- static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(hash_table);
+ const PackedPayloadHashTable &hash_table_concrete =
+ static_cast<const PackedPayloadHashTable &>(hash_table);
if (group_by_keys->empty()) {
if (NativeColumnVector::UsableForType(result_type)) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 0816db3..7203c8c 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -146,7 +146,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
- quickstep_storage_PackedPayloadAggregationStateHashTable
+ quickstep_storage_PackedPayloadHashTable
quickstep_storage_ValueAccessorMultiplexer
quickstep_threading_SpinMutex
quickstep_types_TypedValue
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index a393185..4ffd418 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -38,10 +38,11 @@
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "storage/AggregationOperationState.pb.h"
+#include "storage/CollisionFreeVectorTable.hpp"
#include "storage/HashTableFactory.hpp"
#include "storage/HashTableBase.hpp"
#include "storage/InsertDestination.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
@@ -353,7 +354,7 @@ bool AggregationOperationState::ProtoIsValid(
std::size_t AggregationOperationState::getNumPartitions() const {
if (is_aggregate_collision_free_) {
- return static_cast<CollisionFreeAggregationStateHashTable *>(
+ return static_cast<CollisionFreeVectorTable *>(
collision_free_hashtable_.get())->getNumFinalizationPartitions();
} else if (is_aggregate_partitioned_) {
return partitioned_group_by_hashtable_pool_->getNumPartitions();
@@ -364,7 +365,7 @@ std::size_t AggregationOperationState::getNumPartitions() const {
std::size_t AggregationOperationState::getNumInitializationPartitions() const {
if (is_aggregate_collision_free_) {
- return static_cast<CollisionFreeAggregationStateHashTable *>(
+ return static_cast<CollisionFreeVectorTable *>(
collision_free_hashtable_.get())->getNumInitializationPartitions();
} else {
return 0u;
@@ -373,7 +374,7 @@ std::size_t AggregationOperationState::getNumInitializationPartitions() const {
void AggregationOperationState::initializeState(const std::size_t partition_id) {
if (is_aggregate_collision_free_) {
- static_cast<CollisionFreeAggregationStateHashTable *>(
+ static_cast<CollisionFreeVectorTable *>(
collision_free_hashtable_.get())->initialize(partition_id);
} else {
LOG(FATAL) << "AggregationOperationState::initializeState() "
@@ -512,10 +513,10 @@ void AggregationOperationState::mergeSingleState(
}
void AggregationOperationState::mergeGroupByHashTables(
- AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) const {
- HashTableMerger merger(dst);
- static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src)
- ->forEachCompositeKey(&merger);
+ AggregationStateHashTableBase *src,
+ AggregationStateHashTableBase *dst) const {
+ HashTableMerger merger(static_cast<PackedPayloadHashTable *>(dst));
+ static_cast<PackedPayloadHashTable *>(src)->forEachCompositeKey(&merger);
}
void AggregationOperationState::aggregateBlockHashTable(
@@ -661,9 +662,8 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree(
const std::size_t partition_id,
InsertDestination *output_destination) {
std::vector<std::unique_ptr<ColumnVector>> final_values;
- CollisionFreeAggregationStateHashTable *hash_table =
- static_cast<CollisionFreeAggregationStateHashTable *>(
- collision_free_hashtable_.get());
+ CollisionFreeVectorTable *hash_table =
+ static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get());
// TODO
const std::size_t max_length =
@@ -696,8 +696,8 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree(
void AggregationOperationState::finalizeHashTableImplPartitioned(
const std::size_t partition_id,
InsertDestination *output_destination) {
- PackedPayloadSeparateChainingAggregationStateHashTable *hash_table =
- static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
+ PackedPayloadHashTable *hash_table =
+ static_cast<PackedPayloadHashTable *>(
partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
// Each element of 'group_by_keys' is a vector of values for a particular
@@ -790,9 +790,8 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
hash_table->destroyPayload();
}
- PackedPayloadSeparateChainingAggregationStateHashTable *final_hash_table =
- static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
- final_hash_table_ptr.get());
+ PackedPayloadHashTable *final_hash_table =
+ static_cast<PackedPayloadHashTable *>(final_hash_table_ptr.get());
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index d43d7a2..8fbb4ea 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -165,9 +165,9 @@ if(QUICKSTEP_HAVE_BITWEAVING)
bitweaving/BitWeavingVIndexSubBlock.hpp)
endif()
# CMAKE_VALIDATE_IGNORE_END
-add_library(quickstep_storage_CollisionFreeAggregationStateHashTable
- CollisionFreeAggregationStateHashTable.cpp
- CollisionFreeAggregationStateHashTable.hpp)
+add_library(quickstep_storage_CollisionFreeVectorTable
+ CollisionFreeVectorTable.cpp
+ CollisionFreeVectorTable.hpp)
add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp)
add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp)
add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -225,9 +225,7 @@ add_library(quickstep_storage_InsertDestination_proto
add_library(quickstep_storage_LinearOpenAddressingHashTable
../empty_src.cpp
LinearOpenAddressingHashTable.hpp)
-add_library(quickstep_storage_PackedPayloadAggregationStateHashTable
- PackedPayloadAggregationStateHashTable.cpp
- PackedPayloadAggregationStateHashTable.hpp)
+add_library(quickstep_storage_PackedPayloadHashTable PackedPayloadHashTable.cpp PackedPayloadHashTable.hpp)
add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
@@ -284,7 +282,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_storage_HashTablePool
quickstep_storage_InsertDestination
quickstep_storage_PartitionedHashTablePool
- quickstep_storage_PackedPayloadAggregationStateHashTable
+ quickstep_storage_PackedPayloadHashTable
quickstep_storage_StorageBlock
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
@@ -435,7 +433,7 @@ if(QUICKSTEP_HAVE_BITWEAVING)
quickstep_utility_Macros)
endif()
# CMAKE_VALIDATE_IGNORE_END
-target_link_libraries(quickstep_storage_CollisionFreeAggregationStateHashTable
+target_link_libraries(quickstep_storage_CollisionFreeVectorTable
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationHandle
quickstep_expressions_aggregation_AggregationID
@@ -714,12 +712,12 @@ target_link_libraries(quickstep_storage_HashTable_proto
${PROTOBUF_LIBRARY})
target_link_libraries(quickstep_storage_HashTableFactory
glog
- quickstep_storage_CollisionFreeAggregationStateHashTable
+ quickstep_storage_CollisionFreeVectorTable
quickstep_storage_HashTable
quickstep_storage_HashTable_proto
quickstep_storage_HashTableBase
quickstep_storage_LinearOpenAddressingHashTable
- quickstep_storage_PackedPayloadAggregationStateHashTable
+ quickstep_storage_PackedPayloadHashTable
quickstep_storage_SeparateChainingHashTable
quickstep_storage_SimpleScalarSeparateChainingHashTable
quickstep_storage_TupleReference
@@ -798,7 +796,7 @@ target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable
quickstep_utility_Alignment
quickstep_utility_Macros
quickstep_utility_PrimeNumber)
-target_link_libraries(quickstep_storage_PackedPayloadAggregationStateHashTable
+target_link_libraries(quickstep_storage_PackedPayloadHashTable
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationHandle
quickstep_storage_HashTableBase
@@ -1115,7 +1113,7 @@ target_link_libraries(quickstep_storage
quickstep_storage_BasicColumnStoreValueAccessor
quickstep_storage_BloomFilterIndexSubBlock
quickstep_storage_CSBTreeIndexSubBlock
- quickstep_storage_CollisionFreeAggregationStateHashTable
+ quickstep_storage_CollisionFreeVectorTable
quickstep_storage_ColumnStoreUtil
quickstep_storage_CompressedBlockBuilder
quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -1141,7 +1139,7 @@ target_link_libraries(quickstep_storage
quickstep_storage_InsertDestination_proto
quickstep_storage_LinearOpenAddressingHashTable
quickstep_storage_PartitionedHashTablePool
- quickstep_storage_PackedPayloadAggregationStateHashTable
+ quickstep_storage_PackedPayloadHashTable
quickstep_storage_PreloaderThread
quickstep_storage_SMAIndexSubBlock
quickstep_storage_SeparateChainingHashTable
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp b/storage/CollisionFreeAggregationStateHashTable.cpp
deleted file mode 100644
index 2f3b336..0000000
--- a/storage/CollisionFreeAggregationStateHashTable.cpp
+++ /dev/null
@@ -1,285 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "storage/CollisionFreeAggregationStateHashTable.hpp"
-
-#include <algorithm>
-#include <atomic>
-#include <cstddef>
-#include <cstdint>
-#include <cstdlib>
-#include <map>
-#include <memory>
-#include <vector>
-
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageManager.hpp"
-#include "storage/ValueAccessor.hpp"
-#include "storage/ValueAccessorMultiplexer.hpp"
-#include "storage/ValueAccessorUtil.hpp"
-#include "types/containers/ColumnVectorsValueAccessor.hpp"
-#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
-
-namespace quickstep {
-
-CollisionFreeAggregationStateHashTable::CollisionFreeAggregationStateHashTable(
- const std::vector<const Type *> &key_types,
- const std::size_t num_entries,
- const std::vector<AggregationHandle *> &handles,
- StorageManager *storage_manager)
- : key_type_(key_types.front()),
- num_entries_(num_entries),
- num_handles_(handles.size()),
- handles_(handles),
- num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)),
- storage_manager_(storage_manager) {
- CHECK_EQ(1u, key_types.size());
- DCHECK_GT(num_entries, 0u);
-
- std::map<std::string, std::size_t> memory_offsets;
- std::size_t required_memory = 0;
-
- memory_offsets.emplace("existence_map", required_memory);
- required_memory += CacheLineAlignedBytes(
- BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
-
- for (std::size_t i = 0; i < num_handles_; ++i) {
- const AggregationHandle *handle = handles_[i];
- const std::vector<const Type *> argument_types = handle->getArgumentTypes();
-
- std::size_t state_size = 0;
- switch (handle->getAggregationID()) {
- case AggregationID::kCount: {
- state_size = sizeof(std::atomic<std::size_t>);
- break;
- }
- case AggregationID::kSum: {
- CHECK_EQ(1u, argument_types.size());
- switch (argument_types.front()->getTypeID()) {
- case TypeID::kInt: // Fall through
- case TypeID::kLong:
- state_size = sizeof(std::atomic<std::int64_t>);
- break;
- case TypeID::kFloat: // Fall through
- case TypeID::kDouble:
- state_size = sizeof(std::atomic<double>);
- break;
- default:
- LOG(FATAL) << "Not implemented";
- }
- break;
- }
- default:
- LOG(FATAL) << "Not implemented";
- }
-
- memory_offsets.emplace(std::string("state") + std::to_string(i),
- required_memory);
- required_memory += CacheLineAlignedBytes(state_size * num_entries);
- }
-
- const std::size_t num_storage_slots =
- storage_manager_->SlotsNeededForBytes(required_memory);
-
- const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
- blob_ = storage_manager_->getBlobMutable(blob_id);
-
- void *memory_start = blob_->getMemoryMutable();
- existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
- reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"),
- num_entries,
- false /* initialize */));
-
- for (std::size_t i = 0; i < num_handles_; ++i) {
- vec_tables_.emplace_back(
- reinterpret_cast<char *>(memory_start) +
- memory_offsets.at(std::string("state") + std::to_string(i)));
- }
-
- memory_size_ = required_memory;
- num_init_partitions_ =
- std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL));
-}
-
-CollisionFreeAggregationStateHashTable::~CollisionFreeAggregationStateHashTable() {
- const block_id blob_id = blob_->getID();
- blob_.release();
- storage_manager_->deleteBlockOrBlobFile(blob_id);
-}
-
-void CollisionFreeAggregationStateHashTable::destroyPayload() {
-}
-
-bool CollisionFreeAggregationStateHashTable::upsertValueAccessor(
- const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
- const std::vector<MultiSourceAttributeId> &key_ids,
- ValueAccessorMultiplexer *accessor_mux) {
- DCHECK_EQ(1u, key_ids.size());
-
- const ValueAccessorSource key_source = key_ids.front().source;
- const attribute_id key_id = key_ids.front().attr_id;
- const bool is_key_nullable = key_type_->isNullable();
-
- if (handles_.empty()) {
- InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
- accessor_mux->getValueAccessorBySource(key_source),
- [&](auto *accessor) -> void { // NOLINT(build/c++11)
- upsertValueAccessorKeyOnlyHelper(is_key_nullable,
- key_type_,
- key_id,
- accessor);
- });
- return true;
- }
-
- DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
- == ValueAccessor::Implementation::kColumnVectors);
- ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
- ColumnVectorsValueAccessor *derived_accesor =
- static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor());
-
- for (std::size_t i = 0; i < num_handles_; ++i) {
- DCHECK_LE(argument_ids[i].size(), 1u);
-
- const AggregationHandle *handle = handles_[i];
- const auto &argument_types = handle->getArgumentTypes();
- const auto &argument_ids_i = argument_ids[i];
-
- ValueAccessorSource argument_source;
- attribute_id argument_id;
- const Type *argument_type;
- bool is_argument_nullable;
-
- if (argument_ids_i.empty()) {
- argument_source = ValueAccessorSource::kInvalid;
- argument_id = kInvalidAttributeID;
-
- DCHECK(argument_types.empty());
- argument_type = nullptr;
- is_argument_nullable = false;
- } else {
- DCHECK_EQ(1u, argument_ids_i.size());
- argument_source = argument_ids_i.front().source;
- argument_id = argument_ids_i.front().attr_id;
-
- DCHECK_EQ(1u, argument_types.size());
- argument_type = argument_types.front();
- is_argument_nullable = argument_type->isNullable();
- }
-
- InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
- base_accessor,
- [&](auto *accessor) -> void { // NOLINT(build/c++11)
- if (key_source == ValueAccessorSource::kBase) {
- if (argument_source == ValueAccessorSource::kBase) {
- upsertValueAccessorDispatchHelper<false>(is_key_nullable,
- is_argument_nullable,
- key_type_,
- argument_type,
- handle->getAggregationID(),
- key_id,
- argument_id,
- vec_tables_[i],
- accessor,
- accessor);
- } else {
- upsertValueAccessorDispatchHelper<true>(is_key_nullable,
- is_argument_nullable,
- key_type_,
- argument_type,
- handle->getAggregationID(),
- key_id,
- argument_id,
- vec_tables_[i],
- accessor,
- derived_accesor);
- }
- } else {
- if (argument_source == ValueAccessorSource::kBase) {
- upsertValueAccessorDispatchHelper<true>(is_key_nullable,
- is_argument_nullable,
- key_type_,
- argument_type,
- handle->getAggregationID(),
- key_id,
- argument_id,
- vec_tables_[i],
- derived_accesor,
- accessor);
- } else {
- upsertValueAccessorDispatchHelper<false>(is_key_nullable,
- is_argument_nullable,
- key_type_,
- argument_type,
- handle->getAggregationID(),
- key_id,
- argument_id,
- vec_tables_[i],
- derived_accesor,
- derived_accesor);
- }
- }
- });
- }
- return true;
-}
-
-void CollisionFreeAggregationStateHashTable::finalizeKey(
- const std::size_t partition_id,
- NativeColumnVector *output_cv) const {
- const std::size_t start_position =
- calculatePartitionStartPosition(partition_id);
- const std::size_t end_position =
- calculatePartitionEndPosition(partition_id);
-
- switch (key_type_->getTypeID()) {
- case TypeID::kInt:
- finalizeKeyInternal<int>(start_position, end_position, output_cv);
- return;
- case TypeID::kLong:
- finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
- return;
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-void CollisionFreeAggregationStateHashTable::finalizeState(
- const std::size_t partition_id,
- std::size_t handle_id,
- NativeColumnVector *output_cv) const {
- const std::size_t start_position =
- calculatePartitionStartPosition(partition_id);
- const std::size_t end_position =
- calculatePartitionEndPosition(partition_id);
-
- const AggregationHandle *handle = handles_[handle_id];
- const auto &argument_types = handle->getArgumentTypes();
- const Type *argument_type =
- argument_types.empty() ? nullptr : argument_types.front();
-
- finalizeStateDispatchHelper(handle->getAggregationID(),
- argument_type,
- vec_tables_[handle_id],
- start_position,
- end_position,
- output_cv);
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeAggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.hpp b/storage/CollisionFreeAggregationStateHashTable.hpp
deleted file mode 100644
index d738e4e..0000000
--- a/storage/CollisionFreeAggregationStateHashTable.hpp
+++ /dev/null
@@ -1,621 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
-#define QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
-
-#include <atomic>
-#include <cstddef>
-#include <cstdint>
-#include <cstring>
-#include <memory>
-#include <type_traits>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
-#include "storage/HashTableBase.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageConstants.hpp"
-#include "storage/ValueAccessor.hpp"
-#include "storage/ValueAccessorMultiplexer.hpp"
-#include "types/Type.hpp"
-#include "types/TypeID.hpp"
-#include "types/TypedValue.hpp"
-#include "types/containers/ColumnVector.hpp"
-#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class ColumnVectorsValueAccessor;
-class StorageMnager;
-
-/** \addtogroup Storage
- * @{
- */
-
-class CollisionFreeAggregationStateHashTable : public AggregationStateHashTableBase {
- public:
- CollisionFreeAggregationStateHashTable(
- const std::vector<const Type *> &key_types,
- const std::size_t num_entries,
- const std::vector<AggregationHandle *> &handles,
- StorageManager *storage_manager);
-
- ~CollisionFreeAggregationStateHashTable() override;
-
- void destroyPayload() override;
-
- inline std::size_t getNumInitializationPartitions() const {
- return num_init_partitions_;
- }
-
- inline std::size_t getNumFinalizationPartitions() const {
- return num_finalize_partitions_;
- }
-
- inline std::size_t getNumTuplesInPartition(
- const std::size_t partition_id) const {
- const std::size_t start_position =
- calculatePartitionStartPosition(partition_id);
- const std::size_t end_position =
- calculatePartitionEndPosition(partition_id);
- return existence_map_->onesCountInRange(start_position, end_position);
- }
-
- inline void initialize(const std::size_t partition_id) {
- const std::size_t memory_segment_size =
- (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_;
- const std::size_t memory_start = memory_segment_size * partition_id;
- std::memset(reinterpret_cast<char *>(blob_->getMemoryMutable()) + memory_start,
- 0,
- std::min(memory_segment_size, memory_size_ - memory_start));
- }
-
- bool upsertValueAccessor(
- const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
- const std::vector<MultiSourceAttributeId> &key_ids,
- ValueAccessorMultiplexer *accessor_mux) override;
-
- void finalizeKey(const std::size_t partition_id,
- NativeColumnVector *output_cv) const;
-
- void finalizeState(const std::size_t partition_id,
- std::size_t handle_id,
- NativeColumnVector *output_cv) const;
-
- private:
- inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
- return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
- }
-
- inline std::size_t calculatePartitionLength() const {
- const std::size_t partition_length =
- (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_;
- DCHECK_GE(partition_length, 0u);
- return partition_length;
- }
-
- inline std::size_t calculatePartitionStartPosition(
- const std::size_t partition_id) const {
- return calculatePartitionLength() * partition_id;
- }
-
- inline std::size_t calculatePartitionEndPosition(
- const std::size_t partition_id) const {
- return std::min(calculatePartitionLength() * (partition_id + 1),
- num_entries_);
- }
-
- template <bool use_two_accessors, typename ...ArgTypes>
- inline void upsertValueAccessorDispatchHelper(
- const bool is_key_nullable,
- const bool is_argument_nullable,
- ArgTypes &&...args);
-
- template <bool ...bool_values, typename ...ArgTypes>
- inline void upsertValueAccessorDispatchHelper(
- const Type *key_type,
- ArgTypes &&...args);
-
- template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ...ArgTypes>
- inline void upsertValueAccessorDispatchHelper(
- const Type *argument_type,
- const AggregationID agg_id,
- ArgTypes &&...args);
-
- template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
- inline void upsertValueAccessorCountHelper(
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- void *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor);
-
- template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
- inline void upsertValueAccessorSumHelper(
- const Type *argument_type,
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- void *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor);
-
- template <typename ...ArgTypes>
- inline void upsertValueAccessorKeyOnlyHelper(
- const bool is_key_nullable,
- const Type *key_type,
- ArgTypes &&...args);
-
- template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
- inline void upsertValueAccessorKeyOnly(
- const attribute_id key_attr_id,
- KeyValueAccessorT *key_accessor);
-
- template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
- inline void upsertValueAccessorCountNullary(
- const attribute_id key_attr_id,
- std::atomic<std::size_t> *vec_table,
- KeyValueAccessorT *key_accessor);
-
- template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
- inline void upsertValueAccessorCountUnary(
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<std::size_t> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor);
-
- template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ArgumentT, typename StateT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
- inline void upsertValueAccessorIntegerSum(
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<StateT> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor);
-
- template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ArgumentT, typename StateT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
- inline void upsertValueAccessorGenericSum(
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<StateT> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor);
-
- template <typename KeyT>
- inline void finalizeKeyInternal(const std::size_t start_position,
- const std::size_t end_position,
- NativeColumnVector *output_cv) const {
- std::size_t loc = start_position - 1;
- while ((loc = existence_map_->nextOne(loc)) < end_position) {
- *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
- }
- }
-
- template <typename ...ArgTypes>
- inline void finalizeStateDispatchHelper(const AggregationID agg_id,
- const Type *argument_type,
- const void *vec_table,
- ArgTypes &&...args) const {
- switch (agg_id) {
- case AggregationID::kCount:
- finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table),
- std::forward<ArgTypes>(args)...);
- return;
- case AggregationID::kSum:
- finalizeStateSumHelper(argument_type,
- vec_table,
- std::forward<ArgTypes>(args)...);
- return;
- default:
- LOG(FATAL) << "Not supported";
- }
- }
-
- template <typename ...ArgTypes>
- inline void finalizeStateSumHelper(const Type *argument_type,
- const void *vec_table,
- ArgTypes &&...args) const {
- DCHECK(argument_type != nullptr);
-
- switch (argument_type->getTypeID()) {
- case TypeID::kInt: // Fall through
- case TypeID::kLong:
- finalizeStateSum<std::int64_t>(
- static_cast<const std::atomic<std::int64_t> *>(vec_table),
- std::forward<ArgTypes>(args)...);
- return;
- case TypeID::kFloat: // Fall through
- case TypeID::kDouble:
- finalizeStateSum<double>(
- static_cast<const std::atomic<double> *>(vec_table),
- std::forward<ArgTypes>(args)...);
- return;
- default:
- LOG(FATAL) << "Not supported";
- }
- }
-
- inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table,
- const std::size_t start_position,
- const std::size_t end_position,
- NativeColumnVector *output_cv) const {
- std::size_t loc = start_position - 1;
- while ((loc = existence_map_->nextOne(loc)) < end_position) {
- *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) =
- vec_table[loc].load(std::memory_order_relaxed);
- }
- }
-
- template <typename ResultT, typename StateT>
- inline void finalizeStateSum(const std::atomic<StateT> *vec_table,
- const std::size_t start_position,
- const std::size_t end_position,
- NativeColumnVector *output_cv) const {
- std::size_t loc = start_position - 1;
- while ((loc = existence_map_->nextOne(loc)) < end_position) {
- *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) =
- vec_table[loc].load(std::memory_order_relaxed);
- }
- }
-
- const Type *key_type_;
- const std::size_t num_entries_;
-
- const std::size_t num_handles_;
- const std::vector<AggregationHandle *> handles_;
-
- std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_;
- std::vector<void *> vec_tables_;
-
- const std::size_t num_finalize_partitions_;
-
- StorageManager *storage_manager_;
- MutableBlobReference blob_;
-
- std::size_t memory_size_;
- std::size_t num_init_partitions_;
-
- DISALLOW_COPY_AND_ASSIGN(CollisionFreeAggregationStateHashTable);
-};
-
-// ----------------------------------------------------------------------------
-// Implementations of template methods follow.
-
-template <bool use_two_accessors, typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
- ::upsertValueAccessorDispatchHelper(const bool is_key_nullable,
- const bool is_argument_nullable,
- ArgTypes &&...args) {
- if (is_key_nullable) {
- if (is_argument_nullable) {
- upsertValueAccessorDispatchHelper<use_two_accessors, true, true>(
- std::forward<ArgTypes>(args)...);
- } else {
- upsertValueAccessorDispatchHelper<use_two_accessors, true, false>(
- std::forward<ArgTypes>(args)...);
- }
- } else {
- if (is_argument_nullable) {
- upsertValueAccessorDispatchHelper<use_two_accessors, false, true>(
- std::forward<ArgTypes>(args)...);
- } else {
- upsertValueAccessorDispatchHelper<use_two_accessors, false, false>(
- std::forward<ArgTypes>(args)...);
- }
- }
-}
-
-template <bool ...bool_values, typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
- ::upsertValueAccessorDispatchHelper(const Type *key_type,
- ArgTypes &&...args) {
- switch (key_type->getTypeID()) {
- case TypeID::kInt:
- upsertValueAccessorDispatchHelper<bool_values..., int>(
- std::forward<ArgTypes>(args)...);
- return;
- case TypeID::kLong:
- upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>(
- std::forward<ArgTypes>(args)...);
- return;
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
- ::upsertValueAccessorDispatchHelper(const Type *argument_type,
- const AggregationID agg_id,
- ArgTypes &&...args) {
- switch (agg_id) {
- case AggregationID::kCount:
- upsertValueAccessorCountHelper<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
- std::forward<ArgTypes>(args)...);
- return;
- case AggregationID::kSum:
- upsertValueAccessorSumHelper<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
- argument_type, std::forward<ArgTypes>(args)...);
- return;
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-template <typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
- ::upsertValueAccessorKeyOnlyHelper(const bool is_key_nullable,
- const Type *key_type,
- ArgTypes &&...args) {
- switch (key_type->getTypeID()) {
- case TypeID::kInt: {
- if (is_key_nullable) {
- upsertValueAccessorKeyOnly<true, int>(std::forward<ArgTypes>(args)...);
- } else {
- upsertValueAccessorKeyOnly<false, int>(std::forward<ArgTypes>(args)...);
- }
- return;
- }
- case TypeID::kLong: {
- if (is_key_nullable) {
- upsertValueAccessorKeyOnly<true, std::int64_t>(std::forward<ArgTypes>(args)...);
- } else {
- upsertValueAccessorKeyOnly<false, std::int64_t>(std::forward<ArgTypes>(args)...);
- }
- return;
- }
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
- ::upsertValueAccessorKeyOnly(const attribute_id key_attr_id,
- ValueAccessorT *accessor) {
- accessor->beginIteration();
- while (accessor->next()) {
- const KeyT *key = static_cast<const KeyT *>(
- accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
- if (is_key_nullable && key == nullptr) {
- continue;
- }
- existence_map_->setBit(*key);
- }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
- ::upsertValueAccessorCountHelper(const attribute_id key_attr_id,
- const attribute_id argument_id,
- void *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor) {
- DCHECK_GE(key_attr_id, 0u);
-
- if (is_argument_nullable && argument_id != kInvalidAttributeID) {
- upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>(
- key_attr_id,
- argument_id,
- static_cast<std::atomic<std::size_t> *>(vec_table),
- key_accessor,
- argument_accessor);
- return;
- } else {
- upsertValueAccessorCountNullary<is_key_nullable, KeyT>(
- key_attr_id,
- static_cast<std::atomic<std::size_t> *>(vec_table),
- key_accessor);
- return;
- }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
- ::upsertValueAccessorSumHelper(const Type *argument_type,
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- void *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor) {
- DCHECK_GE(key_attr_id, 0u);
- DCHECK_GE(argument_id, 0u);
- DCHECK(argument_type != nullptr);
-
- switch (argument_type->getTypeID()) {
- case TypeID::kInt:
- upsertValueAccessorIntegerSum<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>(
- key_attr_id,
- argument_id,
- static_cast<std::atomic<std::int64_t> *>(vec_table),
- key_accessor,
- argument_accessor);
- return;
- case TypeID::kLong:
- upsertValueAccessorIntegerSum<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>(
- key_attr_id,
- argument_id,
- static_cast<std::atomic<std::int64_t> *>(vec_table),
- key_accessor,
- argument_accessor);
- return;
- case TypeID::kFloat:
- upsertValueAccessorGenericSum<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>(
- key_attr_id,
- argument_id,
- static_cast<std::atomic<double> *>(vec_table),
- key_accessor,
- argument_accessor);
- return;
- case TypeID::kDouble:
- upsertValueAccessorGenericSum<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>(
- key_attr_id,
- argument_id,
- static_cast<std::atomic<double> *>(vec_table),
- key_accessor,
- argument_accessor);
- return;
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
- ::upsertValueAccessorCountNullary(const attribute_id key_attr_id,
- std::atomic<std::size_t> *vec_table,
- ValueAccessorT *accessor) {
- accessor->beginIteration();
- while (accessor->next()) {
- const KeyT *key = static_cast<const KeyT *>(
- accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
- if (is_key_nullable && key == nullptr) {
- continue;
- }
- const std::size_t loc = *key;
- vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
- existence_map_->setBit(loc);
- }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
- ::upsertValueAccessorCountUnary(const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<std::size_t> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor) {
- key_accessor->beginIteration();
- if (use_two_accessors) {
- argument_accessor->beginIteration();
- }
- while (key_accessor->next()) {
- if (use_two_accessors) {
- argument_accessor->next();
- }
- const KeyT *key = static_cast<const KeyT *>(
- key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
- if (is_key_nullable && key == nullptr) {
- continue;
- }
- const std::size_t loc = *key;
- existence_map_->setBit(loc);
- if (argument_accessor->getUntypedValue(argument_id) == nullptr) {
- continue;
- }
- vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
- }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ArgumentT, typename StateT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
- ::upsertValueAccessorIntegerSum(const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<StateT> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor) {
- key_accessor->beginIteration();
- if (use_two_accessors) {
- argument_accessor->beginIteration();
- }
- while (key_accessor->next()) {
- if (use_two_accessors) {
- argument_accessor->next();
- }
- const KeyT *key = static_cast<const KeyT *>(
- key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
- if (is_key_nullable && key == nullptr) {
- continue;
- }
- const std::size_t loc = *key;
- existence_map_->setBit(loc);
- const ArgumentT *argument = static_cast<const ArgumentT *>(
- argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
- if (is_argument_nullable && argument == nullptr) {
- continue;
- }
- vec_table[loc].fetch_add(*argument, std::memory_order_relaxed);
- }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ArgumentT, typename StateT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
- ::upsertValueAccessorGenericSum(const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<StateT> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor) {
- key_accessor->beginIteration();
- if (use_two_accessors) {
- argument_accessor->beginIteration();
- }
- while (key_accessor->next()) {
- if (use_two_accessors) {
- argument_accessor->next();
- }
- const KeyT *key = static_cast<const KeyT *>(
- key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
- if (is_key_nullable && key == nullptr) {
- continue;
- }
- const std::size_t loc = *key;
- existence_map_->setBit(loc);
- const ArgumentT *argument = static_cast<const ArgumentT *>(
- argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
- if (is_argument_nullable && argument == nullptr) {
- continue;
- }
- const ArgumentT arg_val = *argument;
- std::atomic<StateT> &state = vec_table[loc];
- StateT state_val = state.load(std::memory_order_relaxed);
- while(!state.compare_exchange_weak(state_val, state_val + arg_val)) {}
- }
-}
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeVectorTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
new file mode 100644
index 0000000..8065cd9
--- /dev/null
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/CollisionFreeVectorTable.hpp"
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+
+namespace quickstep {
+
+CollisionFreeVectorTable::CollisionFreeVectorTable(
+ const std::vector<const Type *> &key_types,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager)
+ : key_type_(key_types.front()),
+ num_entries_(num_entries),
+ num_handles_(handles.size()),
+ handles_(handles),
+ num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)),
+ storage_manager_(storage_manager) {
+ CHECK_EQ(1u, key_types.size());
+ DCHECK_GT(num_entries, 0u);
+
+ std::map<std::string, std::size_t> memory_offsets;
+ std::size_t required_memory = 0;
+
+ memory_offsets.emplace("existence_map", required_memory);
+ required_memory += CacheLineAlignedBytes(
+ BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ const AggregationHandle *handle = handles_[i];
+ const std::vector<const Type *> argument_types = handle->getArgumentTypes();
+
+ std::size_t state_size = 0;
+ switch (handle->getAggregationID()) {
+ case AggregationID::kCount: {
+ state_size = sizeof(std::atomic<std::size_t>);
+ break;
+ }
+ case AggregationID::kSum: {
+ CHECK_EQ(1u, argument_types.size());
+ switch (argument_types.front()->getTypeID()) {
+ case TypeID::kInt: // Fall through
+ case TypeID::kLong:
+ state_size = sizeof(std::atomic<std::int64_t>);
+ break;
+ case TypeID::kFloat: // Fall through
+ case TypeID::kDouble:
+ state_size = sizeof(std::atomic<double>);
+ break;
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+
+ memory_offsets.emplace(std::string("state") + std::to_string(i),
+ required_memory);
+ required_memory += CacheLineAlignedBytes(state_size * num_entries);
+ }
+
+ const std::size_t num_storage_slots =
+ storage_manager_->SlotsNeededForBytes(required_memory);
+
+ const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
+ blob_ = storage_manager_->getBlobMutable(blob_id);
+
+ void *memory_start = blob_->getMemoryMutable();
+ existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
+ reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"),
+ num_entries,
+ false /* initialize */));
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ vec_tables_.emplace_back(
+ reinterpret_cast<char *>(memory_start) +
+ memory_offsets.at(std::string("state") + std::to_string(i)));
+ }
+
+ memory_size_ = required_memory;
+ num_init_partitions_ =
+ std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL));
+}
+
+CollisionFreeVectorTable::~CollisionFreeVectorTable() {
+ const block_id blob_id = blob_->getID();
+ blob_.release();
+ storage_manager_->deleteBlockOrBlobFile(blob_id);
+}
+
+void CollisionFreeVectorTable::destroyPayload() {
+}
+
+bool CollisionFreeVectorTable::upsertValueAccessor(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ ValueAccessorMultiplexer *accessor_mux) {
+ DCHECK_EQ(1u, key_ids.size());
+
+ const ValueAccessorSource key_source = key_ids.front().source;
+ const attribute_id key_id = key_ids.front().attr_id;
+ const bool is_key_nullable = key_type_->isNullable();
+
+ if (handles_.empty()) {
+ InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+ accessor_mux->getValueAccessorBySource(key_source),
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ upsertValueAccessorKeyOnlyHelper(is_key_nullable,
+ key_type_,
+ key_id,
+ accessor);
+ });
+ return true;
+ }
+
+ DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
+ == ValueAccessor::Implementation::kColumnVectors);
+ ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
+ ColumnVectorsValueAccessor *derived_accesor =
+ static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor());
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ DCHECK_LE(argument_ids[i].size(), 1u);
+
+ const AggregationHandle *handle = handles_[i];
+ const auto &argument_types = handle->getArgumentTypes();
+ const auto &argument_ids_i = argument_ids[i];
+
+ ValueAccessorSource argument_source;
+ attribute_id argument_id;
+ const Type *argument_type;
+ bool is_argument_nullable;
+
+ if (argument_ids_i.empty()) {
+ argument_source = ValueAccessorSource::kInvalid;
+ argument_id = kInvalidAttributeID;
+
+ DCHECK(argument_types.empty());
+ argument_type = nullptr;
+ is_argument_nullable = false;
+ } else {
+ DCHECK_EQ(1u, argument_ids_i.size());
+ argument_source = argument_ids_i.front().source;
+ argument_id = argument_ids_i.front().attr_id;
+
+ DCHECK_EQ(1u, argument_types.size());
+ argument_type = argument_types.front();
+ is_argument_nullable = argument_type->isNullable();
+ }
+
+ InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+ base_accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ if (key_source == ValueAccessorSource::kBase) {
+ if (argument_source == ValueAccessorSource::kBase) {
+ upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ key_id,
+ argument_id,
+ vec_tables_[i],
+ accessor,
+ accessor);
+ } else {
+ upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ key_id,
+ argument_id,
+ vec_tables_[i],
+ accessor,
+ derived_accesor);
+ }
+ } else {
+ if (argument_source == ValueAccessorSource::kBase) {
+ upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ key_id,
+ argument_id,
+ vec_tables_[i],
+ derived_accesor,
+ accessor);
+ } else {
+ upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+ is_argument_nullable,
+ key_type_,
+ argument_type,
+ handle->getAggregationID(),
+ key_id,
+ argument_id,
+ vec_tables_[i],
+ derived_accesor,
+ derived_accesor);
+ }
+ }
+ });
+ }
+ return true;
+}
+
+void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id,
+ NativeColumnVector *output_cv) const {
+ const std::size_t start_position =
+ calculatePartitionStartPosition(partition_id);
+ const std::size_t end_position =
+ calculatePartitionEndPosition(partition_id);
+
+ switch (key_type_->getTypeID()) {
+ case TypeID::kInt:
+ finalizeKeyInternal<int>(start_position, end_position, output_cv);
+ return;
+ case TypeID::kLong:
+ finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
+ return;
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id,
+ std::size_t handle_id,
+ NativeColumnVector *output_cv) const {
+ const std::size_t start_position =
+ calculatePartitionStartPosition(partition_id);
+ const std::size_t end_position =
+ calculatePartitionEndPosition(partition_id);
+
+ const AggregationHandle *handle = handles_[handle_id];
+ const auto &argument_types = handle->getArgumentTypes();
+ const Type *argument_type =
+ argument_types.empty() ? nullptr : argument_types.front();
+
+ finalizeStateDispatchHelper(handle->getAggregationID(),
+ argument_type,
+ vec_tables_[handle_id],
+ start_position,
+ end_position,
+ output_cv);
+}
+
+} // namespace quickstep
[2/3] incubator-quickstep git commit: Updates
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
new file mode 100644
index 0000000..25f7786
--- /dev/null
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ColumnVectorsValueAccessor;
+class StorageMnager;
+
+/** \addtogroup Storage
+ * @{
+ */
+
+class CollisionFreeVectorTable : public AggregationStateHashTableBase {
+ public:
+ CollisionFreeVectorTable(
+ const std::vector<const Type *> &key_types,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager);
+
+ ~CollisionFreeVectorTable() override;
+
+ void destroyPayload() override;
+
+ inline std::size_t getNumInitializationPartitions() const {
+ return num_init_partitions_;
+ }
+
+ inline std::size_t getNumFinalizationPartitions() const {
+ return num_finalize_partitions_;
+ }
+
+ inline std::size_t getNumTuplesInPartition(
+ const std::size_t partition_id) const {
+ const std::size_t start_position =
+ calculatePartitionStartPosition(partition_id);
+ const std::size_t end_position =
+ calculatePartitionEndPosition(partition_id);
+ return existence_map_->onesCountInRange(start_position, end_position);
+ }
+
+ inline void initialize(const std::size_t partition_id) {
+ const std::size_t memory_segment_size =
+ (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_;
+ const std::size_t memory_start = memory_segment_size * partition_id;
+ std::memset(reinterpret_cast<char *>(blob_->getMemoryMutable()) + memory_start,
+ 0,
+ std::min(memory_segment_size, memory_size_ - memory_start));
+ }
+
+ bool upsertValueAccessor(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_ids,
+ ValueAccessorMultiplexer *accessor_mux) override;
+
+ void finalizeKey(const std::size_t partition_id,
+ NativeColumnVector *output_cv) const;
+
+ void finalizeState(const std::size_t partition_id,
+ std::size_t handle_id,
+ NativeColumnVector *output_cv) const;
+
+ private:
+ inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
+ return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
+ }
+
+ inline std::size_t calculatePartitionLength() const {
+ const std::size_t partition_length =
+ (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_;
+ DCHECK_GE(partition_length, 0u);
+ return partition_length;
+ }
+
+ inline std::size_t calculatePartitionStartPosition(
+ const std::size_t partition_id) const {
+ return calculatePartitionLength() * partition_id;
+ }
+
+ inline std::size_t calculatePartitionEndPosition(
+ const std::size_t partition_id) const {
+ return std::min(calculatePartitionLength() * (partition_id + 1),
+ num_entries_);
+ }
+
+ template <bool use_two_accessors, typename ...ArgTypes>
+ inline void upsertValueAccessorDispatchHelper(
+ const bool is_key_nullable,
+ const bool is_argument_nullable,
+ ArgTypes &&...args);
+
+ template <bool ...bool_values, typename ...ArgTypes>
+ inline void upsertValueAccessorDispatchHelper(
+ const Type *key_type,
+ ArgTypes &&...args);
+
+ template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+ typename KeyT, typename ...ArgTypes>
+ inline void upsertValueAccessorDispatchHelper(
+ const Type *argument_type,
+ const AggregationID agg_id,
+ ArgTypes &&...args);
+
+ template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+ typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+ inline void upsertValueAccessorCountHelper(
+ const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ void *vec_table,
+ KeyValueAccessorT *key_accessor,
+ ArgumentValueAccessorT *argument_accessor);
+
+ template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+ typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+ inline void upsertValueAccessorSumHelper(
+ const Type *argument_type,
+ const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ void *vec_table,
+ KeyValueAccessorT *key_accessor,
+ ArgumentValueAccessorT *argument_accessor);
+
+ template <typename ...ArgTypes>
+ inline void upsertValueAccessorKeyOnlyHelper(
+ const bool is_key_nullable,
+ const Type *key_type,
+ ArgTypes &&...args);
+
+ template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
+ inline void upsertValueAccessorKeyOnly(
+ const attribute_id key_attr_id,
+ KeyValueAccessorT *key_accessor);
+
+ template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
+ inline void upsertValueAccessorCountNullary(
+ const attribute_id key_attr_id,
+ std::atomic<std::size_t> *vec_table,
+ KeyValueAccessorT *key_accessor);
+
+ template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
+ typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+ inline void upsertValueAccessorCountUnary(
+ const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ std::atomic<std::size_t> *vec_table,
+ KeyValueAccessorT *key_accessor,
+ ArgumentValueAccessorT *argument_accessor);
+
+ template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+ typename KeyT, typename ArgumentT, typename StateT,
+ typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+ inline void upsertValueAccessorIntegerSum(
+ const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ std::atomic<StateT> *vec_table,
+ KeyValueAccessorT *key_accessor,
+ ArgumentValueAccessorT *argument_accessor);
+
+ template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+ typename KeyT, typename ArgumentT, typename StateT,
+ typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+ inline void upsertValueAccessorGenericSum(
+ const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ std::atomic<StateT> *vec_table,
+ KeyValueAccessorT *key_accessor,
+ ArgumentValueAccessorT *argument_accessor);
+
+ template <typename KeyT>
+ inline void finalizeKeyInternal(const std::size_t start_position,
+ const std::size_t end_position,
+ NativeColumnVector *output_cv) const {
+ std::size_t loc = start_position - 1;
+ while ((loc = existence_map_->nextOne(loc)) < end_position) {
+ *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
+ }
+ }
+
+ template <typename ...ArgTypes>
+ inline void finalizeStateDispatchHelper(const AggregationID agg_id,
+ const Type *argument_type,
+ const void *vec_table,
+ ArgTypes &&...args) const {
+ switch (agg_id) {
+ case AggregationID::kCount:
+ finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table),
+ std::forward<ArgTypes>(args)...);
+ return;
+ case AggregationID::kSum:
+ finalizeStateSumHelper(argument_type,
+ vec_table,
+ std::forward<ArgTypes>(args)...);
+ return;
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+ }
+
+ template <typename ...ArgTypes>
+ inline void finalizeStateSumHelper(const Type *argument_type,
+ const void *vec_table,
+ ArgTypes &&...args) const {
+ DCHECK(argument_type != nullptr);
+
+ switch (argument_type->getTypeID()) {
+ case TypeID::kInt: // Fall through
+ case TypeID::kLong:
+ finalizeStateSum<std::int64_t>(
+ static_cast<const std::atomic<std::int64_t> *>(vec_table),
+ std::forward<ArgTypes>(args)...);
+ return;
+ case TypeID::kFloat: // Fall through
+ case TypeID::kDouble:
+ finalizeStateSum<double>(
+ static_cast<const std::atomic<double> *>(vec_table),
+ std::forward<ArgTypes>(args)...);
+ return;
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+ }
+
+ inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table,
+ const std::size_t start_position,
+ const std::size_t end_position,
+ NativeColumnVector *output_cv) const {
+ std::size_t loc = start_position - 1;
+ while ((loc = existence_map_->nextOne(loc)) < end_position) {
+ *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) =
+ vec_table[loc].load(std::memory_order_relaxed);
+ }
+ }
+
+ template <typename ResultT, typename StateT>
+ inline void finalizeStateSum(const std::atomic<StateT> *vec_table,
+ const std::size_t start_position,
+ const std::size_t end_position,
+ NativeColumnVector *output_cv) const {
+ std::size_t loc = start_position - 1;
+ while ((loc = existence_map_->nextOne(loc)) < end_position) {
+ *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) =
+ vec_table[loc].load(std::memory_order_relaxed);
+ }
+ }
+
+ const Type *key_type_;
+ const std::size_t num_entries_;
+
+ const std::size_t num_handles_;
+ const std::vector<AggregationHandle *> handles_;
+
+ std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_;
+ std::vector<void *> vec_tables_;
+
+ const std::size_t num_finalize_partitions_;
+
+ StorageManager *storage_manager_;
+ MutableBlobReference blob_;
+
+ std::size_t memory_size_;
+ std::size_t num_init_partitions_;
+
+ DISALLOW_COPY_AND_ASSIGN(CollisionFreeVectorTable);
+};
+
+// ----------------------------------------------------------------------------
+// Implementations of template methods follow.
+
+template <bool use_two_accessors, typename ...ArgTypes>
+inline void CollisionFreeVectorTable
+ ::upsertValueAccessorDispatchHelper(const bool is_key_nullable,
+ const bool is_argument_nullable,
+ ArgTypes &&...args) {
+ if (is_key_nullable) {
+ if (is_argument_nullable) {
+ upsertValueAccessorDispatchHelper<use_two_accessors, true, true>(
+ std::forward<ArgTypes>(args)...);
+ } else {
+ upsertValueAccessorDispatchHelper<use_two_accessors, true, false>(
+ std::forward<ArgTypes>(args)...);
+ }
+ } else {
+ if (is_argument_nullable) {
+ upsertValueAccessorDispatchHelper<use_two_accessors, false, true>(
+ std::forward<ArgTypes>(args)...);
+ } else {
+ upsertValueAccessorDispatchHelper<use_two_accessors, false, false>(
+ std::forward<ArgTypes>(args)...);
+ }
+ }
+}
+
+template <bool ...bool_values, typename ...ArgTypes>
+inline void CollisionFreeVectorTable
+ ::upsertValueAccessorDispatchHelper(const Type *key_type,
+ ArgTypes &&...args) {
+ switch (key_type->getTypeID()) {
+ case TypeID::kInt:
+ upsertValueAccessorDispatchHelper<bool_values..., int>(
+ std::forward<ArgTypes>(args)...);
+ return;
+ case TypeID::kLong:
+ upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>(
+ std::forward<ArgTypes>(args)...);
+ return;
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+ typename KeyT, typename ...ArgTypes>
+inline void CollisionFreeVectorTable
+ ::upsertValueAccessorDispatchHelper(const Type *argument_type,
+ const AggregationID agg_id,
+ ArgTypes &&...args) {
+ switch (agg_id) {
+ case AggregationID::kCount:
+ upsertValueAccessorCountHelper<
+ use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
+ std::forward<ArgTypes>(args)...);
+ return;
+ case AggregationID::kSum:
+ upsertValueAccessorSumHelper<
+ use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
+ argument_type, std::forward<ArgTypes>(args)...);
+ return;
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+template <typename ...ArgTypes>
+inline void CollisionFreeVectorTable
+ ::upsertValueAccessorKeyOnlyHelper(const bool is_key_nullable,
+ const Type *key_type,
+ ArgTypes &&...args) {
+ switch (key_type->getTypeID()) {
+ case TypeID::kInt: {
+ if (is_key_nullable) {
+ upsertValueAccessorKeyOnly<true, int>(std::forward<ArgTypes>(args)...);
+ } else {
+ upsertValueAccessorKeyOnly<false, int>(std::forward<ArgTypes>(args)...);
+ }
+ return;
+ }
+ case TypeID::kLong: {
+ if (is_key_nullable) {
+ upsertValueAccessorKeyOnly<true, std::int64_t>(std::forward<ArgTypes>(args)...);
+ } else {
+ upsertValueAccessorKeyOnly<false, std::int64_t>(std::forward<ArgTypes>(args)...);
+ }
+ return;
+ }
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
+inline void CollisionFreeVectorTable
+ ::upsertValueAccessorKeyOnly(const attribute_id key_attr_id,
+ ValueAccessorT *accessor) {
+ accessor->beginIteration();
+ while (accessor->next()) {
+ const KeyT *key = static_cast<const KeyT *>(
+ accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+ if (is_key_nullable && key == nullptr) {
+ continue;
+ }
+ existence_map_->setBit(*key);
+ }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+ typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeVectorTable
+ ::upsertValueAccessorCountHelper(const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ void *vec_table,
+ KeyValueAccessorT *key_accessor,
+ ArgumentValueAccessorT *argument_accessor) {
+ DCHECK_GE(key_attr_id, 0u);
+
+ if (is_argument_nullable && argument_id != kInvalidAttributeID) {
+ upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>(
+ key_attr_id,
+ argument_id,
+ static_cast<std::atomic<std::size_t> *>(vec_table),
+ key_accessor,
+ argument_accessor);
+ return;
+ } else {
+ upsertValueAccessorCountNullary<is_key_nullable, KeyT>(
+ key_attr_id,
+ static_cast<std::atomic<std::size_t> *>(vec_table),
+ key_accessor);
+ return;
+ }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+ typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeVectorTable
+ ::upsertValueAccessorSumHelper(const Type *argument_type,
+ const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ void *vec_table,
+ KeyValueAccessorT *key_accessor,
+ ArgumentValueAccessorT *argument_accessor) {
+ DCHECK_GE(key_attr_id, 0u);
+ DCHECK_GE(argument_id, 0u);
+ DCHECK(argument_type != nullptr);
+
+ switch (argument_type->getTypeID()) {
+ case TypeID::kInt:
+ upsertValueAccessorIntegerSum<
+ use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>(
+ key_attr_id,
+ argument_id,
+ static_cast<std::atomic<std::int64_t> *>(vec_table),
+ key_accessor,
+ argument_accessor);
+ return;
+ case TypeID::kLong:
+ upsertValueAccessorIntegerSum<
+ use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>(
+ key_attr_id,
+ argument_id,
+ static_cast<std::atomic<std::int64_t> *>(vec_table),
+ key_accessor,
+ argument_accessor);
+ return;
+ case TypeID::kFloat:
+ upsertValueAccessorGenericSum<
+ use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>(
+ key_attr_id,
+ argument_id,
+ static_cast<std::atomic<double> *>(vec_table),
+ key_accessor,
+ argument_accessor);
+ return;
+ case TypeID::kDouble:
+ upsertValueAccessorGenericSum<
+ use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>(
+ key_attr_id,
+ argument_id,
+ static_cast<std::atomic<double> *>(vec_table),
+ key_accessor,
+ argument_accessor);
+ return;
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
+inline void CollisionFreeVectorTable
+ ::upsertValueAccessorCountNullary(const attribute_id key_attr_id,
+ std::atomic<std::size_t> *vec_table,
+ ValueAccessorT *accessor) {
+ accessor->beginIteration();
+ while (accessor->next()) {
+ const KeyT *key = static_cast<const KeyT *>(
+ accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+ if (is_key_nullable && key == nullptr) {
+ continue;
+ }
+ const std::size_t loc = *key;
+ vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
+ existence_map_->setBit(loc);
+ }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
+ typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeVectorTable
+ ::upsertValueAccessorCountUnary(const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ std::atomic<std::size_t> *vec_table,
+ KeyValueAccessorT *key_accessor,
+ ArgumentValueAccessorT *argument_accessor) {
+ key_accessor->beginIteration();
+ if (use_two_accessors) {
+ argument_accessor->beginIteration();
+ }
+ while (key_accessor->next()) {
+ if (use_two_accessors) {
+ argument_accessor->next();
+ }
+ const KeyT *key = static_cast<const KeyT *>(
+ key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+ if (is_key_nullable && key == nullptr) {
+ continue;
+ }
+ const std::size_t loc = *key;
+ existence_map_->setBit(loc);
+ if (argument_accessor->getUntypedValue(argument_id) == nullptr) {
+ continue;
+ }
+ vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
+ }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+ typename KeyT, typename ArgumentT, typename StateT,
+ typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeVectorTable
+ ::upsertValueAccessorIntegerSum(const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ std::atomic<StateT> *vec_table,
+ KeyValueAccessorT *key_accessor,
+ ArgumentValueAccessorT *argument_accessor) {
+ key_accessor->beginIteration();
+ if (use_two_accessors) {
+ argument_accessor->beginIteration();
+ }
+ while (key_accessor->next()) {
+ if (use_two_accessors) {
+ argument_accessor->next();
+ }
+ const KeyT *key = static_cast<const KeyT *>(
+ key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+ if (is_key_nullable && key == nullptr) {
+ continue;
+ }
+ const std::size_t loc = *key;
+ existence_map_->setBit(loc);
+ const ArgumentT *argument = static_cast<const ArgumentT *>(
+ argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
+ if (is_argument_nullable && argument == nullptr) {
+ continue;
+ }
+ vec_table[loc].fetch_add(*argument, std::memory_order_relaxed);
+ }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+ typename KeyT, typename ArgumentT, typename StateT,
+ typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeVectorTable
+ ::upsertValueAccessorGenericSum(const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ std::atomic<StateT> *vec_table,
+ KeyValueAccessorT *key_accessor,
+ ArgumentValueAccessorT *argument_accessor) {
+ key_accessor->beginIteration();
+ if (use_two_accessors) {
+ argument_accessor->beginIteration();
+ }
+ while (key_accessor->next()) {
+ if (use_two_accessors) {
+ argument_accessor->next();
+ }
+ const KeyT *key = static_cast<const KeyT *>(
+ key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+ if (is_key_nullable && key == nullptr) {
+ continue;
+ }
+ const std::size_t loc = *key;
+ existence_map_->setBit(loc);
+ const ArgumentT *argument = static_cast<const ArgumentT *>(
+ argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
+ if (is_argument_nullable && argument == nullptr) {
+ continue;
+ }
+ const ArgumentT arg_val = *argument;
+ std::atomic<StateT> &state = vec_table[loc];
+ StateT state_val = state.load(std::memory_order_relaxed);
+ while(!state.compare_exchange_weak(state_val, state_val + arg_val)) {}
+ }
+}
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index d95362c..b88bf87 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -24,12 +24,12 @@
#include <string>
#include <vector>
-#include "storage/CollisionFreeAggregationStateHashTable.hpp"
+#include "storage/CollisionFreeVectorTable.hpp"
#include "storage/HashTable.hpp"
#include "storage/HashTableBase.hpp"
#include "storage/HashTable.pb.h"
#include "storage/LinearOpenAddressingHashTable.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
#include "storage/SeparateChainingHashTable.hpp"
#include "storage/SimpleScalarSeparateChainingHashTable.hpp"
#include "storage/TupleReference.hpp"
@@ -346,10 +346,10 @@ class AggregationStateHashTableFactory {
StorageManager *storage_manager) {
switch (hash_table_type) {
case HashTableImplType::kSeparateChaining:
- return new PackedPayloadSeparateChainingAggregationStateHashTable(
+ return new PackedPayloadHashTable(
key_types, num_entries, handles, storage_manager);
case HashTableImplType::kCollisionFreeVector:
- return new CollisionFreeAggregationStateHashTable(
+ return new CollisionFreeVectorTable(
key_types, num_entries, handles, storage_manager);
default: {
LOG(FATAL) << "Unrecognized HashTableImplType in "
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/PackedPayloadAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadAggregationStateHashTable.cpp b/storage/PackedPayloadAggregationStateHashTable.cpp
deleted file mode 100644
index 0292092..0000000
--- a/storage/PackedPayloadAggregationStateHashTable.cpp
+++ /dev/null
@@ -1,439 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
-
-namespace quickstep {
-
-PackedPayloadSeparateChainingAggregationStateHashTable
- ::PackedPayloadSeparateChainingAggregationStateHashTable(
- const std::vector<const Type *> &key_types,
- const std::size_t num_entries,
- const std::vector<AggregationHandle *> &handles,
- StorageManager *storage_manager)
- : key_types_(key_types),
- num_handles_(handles.size()),
- handles_(handles),
- total_payload_size_(ComputeTotalPayloadSize(handles)),
- storage_manager_(storage_manager),
- kBucketAlignment(alignof(std::atomic<std::size_t>)),
- kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
- key_manager_(key_types_, kValueOffset + total_payload_size_),
- bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
- std::size_t payload_offset_running_sum = sizeof(SpinMutex);
- for (const auto *handle : handles) {
- payload_offsets_.emplace_back(payload_offset_running_sum);
- payload_offset_running_sum += handle->getPayloadSize();
- }
-
- // NOTE(jianqiao): Potential memory leak / double freeing by copying from
- // init_payload to buckets if payload contains out of line data.
- init_payload_ =
- static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
- DCHECK(init_payload_ != nullptr);
-
- for (std::size_t i = 0; i < num_handles_; ++i) {
- handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
- }
-
- // Bucket size always rounds up to the alignment requirement of the atomic
- // size_t "next" pointer at the front or a ValueT, whichever is larger.
- //
- // Give base HashTable information about what key components are stored
- // inline from 'key_manager_'.
- setKeyInline(key_manager_.getKeyInline());
-
- // Pick out a prime number of slots and calculate storage requirements.
- std::size_t num_slots_tmp =
- get_next_prime_number(num_entries * kHashTableLoadFactor);
- std::size_t required_memory =
- sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
- (num_slots_tmp / kHashTableLoadFactor) *
- (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
- std::size_t num_storage_slots =
- this->storage_manager_->SlotsNeededForBytes(required_memory);
- if (num_storage_slots == 0) {
- FATAL_ERROR(
- "Storage requirement for SeparateChainingHashTable "
- "exceeds maximum allocation size.");
- }
-
- // Get a StorageBlob to hold the hash table.
- const block_id blob_id =
- this->storage_manager_->createBlob(num_storage_slots);
- this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
-
- void *aligned_memory_start = this->blob_->getMemoryMutable();
- std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
- if (align(alignof(Header),
- sizeof(Header),
- aligned_memory_start,
- available_memory) == nullptr) {
- // With current values from StorageConstants.hpp, this should be
- // impossible. A blob is at least 1 MB, while a Header has alignment
- // requirement of just kCacheLineBytes (64 bytes).
- FATAL_ERROR(
- "StorageBlob used to hold resizable "
- "SeparateChainingHashTable is too small to meet alignment "
- "requirements of SeparateChainingHashTable::Header.");
- } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
- // This should also be impossible, since the StorageManager allocates slots
- // aligned to kCacheLineBytes.
- DEV_WARNING("StorageBlob memory adjusted by "
- << (num_storage_slots * kSlotSizeBytes - available_memory)
- << " bytes to meet alignment requirement for "
- << "SeparateChainingHashTable::Header.");
- }
-
- // Locate the header.
- header_ = static_cast<Header *>(aligned_memory_start);
- aligned_memory_start =
- static_cast<char *>(aligned_memory_start) + sizeof(Header);
- available_memory -= sizeof(Header);
-
- // Recompute the number of slots & buckets using the actual available memory.
- // Most likely, we got some extra free bucket space due to "rounding up" to
- // the storage blob's size. It's also possible (though very unlikely) that we
- // will wind up with fewer buckets than we initially wanted because of screwy
- // alignment requirements for ValueT.
- std::size_t num_buckets_tmp =
- available_memory /
- (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
- key_manager_.getEstimatedVariableKeySize());
- num_slots_tmp =
- get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor);
- num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
- DEBUG_ASSERT(num_slots_tmp > 0);
- DEBUG_ASSERT(num_buckets_tmp > 0);
-
- // Locate the slot array.
- slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
- aligned_memory_start = static_cast<char *>(aligned_memory_start) +
- sizeof(std::atomic<std::size_t>) * num_slots_tmp;
- available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
-
- // Locate the buckets.
- buckets_ = aligned_memory_start;
- // Extra-paranoid: If ValueT has an alignment requirement greater than that
- // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
- // array.
- if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) ==
- nullptr) {
- FATAL_ERROR(
- "StorageBlob used to hold resizable "
- "SeparateChainingHashTable is too small to meet "
- "alignment requirements of buckets.");
- } else if (buckets_ != aligned_memory_start) {
- DEV_WARNING(
- "Bucket array start position adjusted to meet alignment "
- "requirement for SeparateChainingHashTable's value type.");
- if (num_buckets_tmp * bucket_size_ > available_memory) {
- --num_buckets_tmp;
- }
- }
-
- // Fill in the header.
- header_->num_slots = num_slots_tmp;
- header_->num_buckets = num_buckets_tmp;
- header_->buckets_allocated.store(0, std::memory_order_relaxed);
- header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
- available_memory -= bucket_size_ * (header_->num_buckets);
-
- // Locate variable-length key storage region, and give it all the remaining
- // bytes in the blob.
- key_manager_.setVariableLengthStorageInfo(
- static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_,
- available_memory,
- &(header_->variable_length_bytes_allocated));
-}
-
-PackedPayloadSeparateChainingAggregationStateHashTable
- ::~PackedPayloadSeparateChainingAggregationStateHashTable() {
- if (blob_.valid()) {
- const block_id blob_id = blob_->getID();
- blob_.release();
- storage_manager_->deleteBlockOrBlobFile(blob_id);
- }
- std::free(init_payload_);
-}
-
-void PackedPayloadSeparateChainingAggregationStateHashTable::clear() {
- const std::size_t used_buckets =
- header_->buckets_allocated.load(std::memory_order_relaxed);
- // Destroy existing values, if necessary.
- destroyPayload();
-
- // Zero-out slot array.
- std::memset(
- slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
-
- // Zero-out used buckets.
- std::memset(buckets_, 0x0, used_buckets * bucket_size_);
-
- header_->buckets_allocated.store(0, std::memory_order_relaxed);
- header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
- key_manager_.zeroNextVariableLengthKeyOffset();
-}
-
-void PackedPayloadSeparateChainingAggregationStateHashTable::destroyPayload() {
- const std::size_t num_buckets =
- header_->buckets_allocated.load(std::memory_order_relaxed);
- void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset;
- for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
- for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) {
- void *value_internal_ptr =
- static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id];
- handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr));
- }
- bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_;
- }
-}
-
-bool PackedPayloadSeparateChainingAggregationStateHashTable::upsertValueAccessor(
- const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
- const std::vector<MultiSourceAttributeId> &key_attr_ids,
- ValueAccessorMultiplexer *accessor_mux) {
- DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
- == ValueAccessor::Implementation::kColumnVectors);
- ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
- ColumnVectorsValueAccessor *derived_accessor =
- static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor());
-
- if (derived_accessor == nullptr) {
- return upsertValueAccessorCompositeKeyInternal<false>(argument_ids,
- key_attr_ids,
- base_accessor,
- derived_accessor);
- } else {
- return upsertValueAccessorCompositeKeyInternal<true>(argument_ids,
- key_attr_ids,
- base_accessor,
- derived_accessor);
- }
-}
-
-void PackedPayloadSeparateChainingAggregationStateHashTable
- ::resize(const std::size_t extra_buckets,
- const std::size_t extra_variable_storage,
- const std::size_t retry_num) {
- // A retry should never be necessary with this implementation of HashTable.
- // Separate chaining ensures that any resized hash table with more buckets
- // than the original table will be able to hold more entries than the
- // original.
- DEBUG_ASSERT(retry_num == 0);
-
- SpinSharedMutexExclusiveLock<true> write_lock(this->resize_shared_mutex_);
-
- // Recheck whether the hash table is still full. Note that multiple threads
- // might wait to rebuild this hash table simultaneously. Only the first one
- // should do the rebuild.
- if (!isFull(extra_variable_storage)) {
- return;
- }
-
- // Approximately double the number of buckets and slots.
- //
- // TODO(chasseur): It may be worth it to more than double the number of
- // buckets here so that we can maintain a good, sparse fill factor for a
- // longer time as more values are inserted. Such behavior should take into
- // account kHashTableLoadFactor.
- std::size_t resized_num_slots = get_next_prime_number(
- (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2);
- std::size_t variable_storage_required =
- (resized_num_slots / kHashTableLoadFactor) *
- key_manager_.getEstimatedVariableKeySize();
- const std::size_t original_variable_storage_used =
- header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
- // If this resize was triggered by a too-large variable-length key, bump up
- // the variable-length storage requirement.
- if ((extra_variable_storage > 0) &&
- (extra_variable_storage + original_variable_storage_used >
- key_manager_.getVariableLengthKeyStorageSize())) {
- variable_storage_required += extra_variable_storage;
- }
-
- const std::size_t resized_memory_required =
- sizeof(Header) + resized_num_slots * sizeof(std::atomic<std::size_t>) +
- (resized_num_slots / kHashTableLoadFactor) * bucket_size_ +
- variable_storage_required;
- const std::size_t resized_storage_slots =
- this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
- if (resized_storage_slots == 0) {
- FATAL_ERROR(
- "Storage requirement for resized SeparateChainingHashTable "
- "exceeds maximum allocation size.");
- }
-
- // Get a new StorageBlob to hold the resized hash table.
- const block_id resized_blob_id =
- this->storage_manager_->createBlob(resized_storage_slots);
- MutableBlobReference resized_blob =
- this->storage_manager_->getBlobMutable(resized_blob_id);
-
- // Locate data structures inside the new StorageBlob.
- void *aligned_memory_start = resized_blob->getMemoryMutable();
- std::size_t available_memory = resized_storage_slots * kSlotSizeBytes;
- if (align(alignof(Header),
- sizeof(Header),
- aligned_memory_start,
- available_memory) == nullptr) {
- // Should be impossible, as noted in constructor.
- FATAL_ERROR(
- "StorageBlob used to hold resized SeparateChainingHashTable "
- "is too small to meet alignment requirements of "
- "LinearOpenAddressingHashTable::Header.");
- } else if (aligned_memory_start != resized_blob->getMemoryMutable()) {
- // Again, should be impossible.
- DEV_WARNING("In SeparateChainingHashTable::resize(), StorageBlob "
- << "memory adjusted by "
- << (resized_num_slots * kSlotSizeBytes - available_memory)
- << " bytes to meet alignment requirement for "
- << "LinearOpenAddressingHashTable::Header.");
- }
-
- Header *resized_header = static_cast<Header *>(aligned_memory_start);
- aligned_memory_start =
- static_cast<char *>(aligned_memory_start) + sizeof(Header);
- available_memory -= sizeof(Header);
-
- // As in constructor, recompute the number of slots and buckets using the
- // actual available memory.
- std::size_t resized_num_buckets =
- (available_memory - extra_variable_storage) /
- (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
- key_manager_.getEstimatedVariableKeySize());
- resized_num_slots =
- get_previous_prime_number(resized_num_buckets * kHashTableLoadFactor);
- resized_num_buckets = resized_num_slots / kHashTableLoadFactor;
-
- // Locate slot array.
- std::atomic<std::size_t> *resized_slots =
- static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
- aligned_memory_start = static_cast<char *>(aligned_memory_start) +
- sizeof(std::atomic<std::size_t>) * resized_num_slots;
- available_memory -= sizeof(std::atomic<std::size_t>) * resized_num_slots;
-
- // As in constructor, we will be extra paranoid and use align() to locate the
- // start of the array of buckets, as well.
- void *resized_buckets = aligned_memory_start;
- if (align(
- kBucketAlignment, bucket_size_, resized_buckets, available_memory) ==
- nullptr) {
- FATAL_ERROR(
- "StorageBlob used to hold resized SeparateChainingHashTable "
- "is too small to meet alignment requirements of buckets.");
- } else if (resized_buckets != aligned_memory_start) {
- DEV_WARNING(
- "Bucket array start position adjusted to meet alignment "
- "requirement for SeparateChainingHashTable's value type.");
- if (resized_num_buckets * bucket_size_ + variable_storage_required >
- available_memory) {
- --resized_num_buckets;
- }
- }
- aligned_memory_start = static_cast<char *>(aligned_memory_start) +
- resized_num_buckets * bucket_size_;
- available_memory -= resized_num_buckets * bucket_size_;
-
- void *resized_variable_length_key_storage = aligned_memory_start;
- const std::size_t resized_variable_length_key_storage_size = available_memory;
-
- const std::size_t original_buckets_used =
- header_->buckets_allocated.load(std::memory_order_relaxed);
-
- // Initialize the header.
- resized_header->num_slots = resized_num_slots;
- resized_header->num_buckets = resized_num_buckets;
- resized_header->buckets_allocated.store(original_buckets_used,
- std::memory_order_relaxed);
- resized_header->variable_length_bytes_allocated.store(
- original_variable_storage_used, std::memory_order_relaxed);
-
- // Bulk-copy buckets. This is safe because:
- // 1. The "next" pointers will be adjusted when rebuilding chains below.
- // 2. The hash codes will stay the same.
- // 3. For key components:
- // a. Inline keys will stay exactly the same.
- // b. Offsets into variable-length storage will remain valid, because
- // we also do a byte-for-byte copy of variable-length storage below.
- // c. Absolute external pointers will still point to the same address.
- // d. Relative pointers are not used with resizable hash tables.
- // 4. If values are not trivially copyable, then we invoke ValueT's copy
- // or move constructor with placement new.
- // NOTE(harshad) - Regarding point 4 above, as this is a specialized
- // hash table implemented for aggregation, the values are trivially copyable,
- // therefore we don't need to invoke payload values' copy/move constructors.
- std::memcpy(resized_buckets, buckets_, original_buckets_used * bucket_size_);
-
- // Copy over variable-length key components, if any.
- if (original_variable_storage_used > 0) {
- DEBUG_ASSERT(original_variable_storage_used ==
- key_manager_.getNextVariableLengthKeyOffset());
- DEBUG_ASSERT(original_variable_storage_used <=
- resized_variable_length_key_storage_size);
- std::memcpy(resized_variable_length_key_storage,
- key_manager_.getVariableLengthKeyStorage(),
- original_variable_storage_used);
- }
-
- destroyPayload();
-
- // Make resized structures active.
- std::swap(this->blob_, resized_blob);
- header_ = resized_header;
- slots_ = resized_slots;
- buckets_ = resized_buckets;
- key_manager_.setVariableLengthStorageInfo(
- resized_variable_length_key_storage,
- resized_variable_length_key_storage_size,
- &(resized_header->variable_length_bytes_allocated));
-
- // Drop the old blob.
- const block_id old_blob_id = resized_blob->getID();
- resized_blob.release();
- this->storage_manager_->deleteBlockOrBlobFile(old_blob_id);
-
- // Rebuild chains.
- void *current_bucket = buckets_;
- for (std::size_t bucket_num = 0; bucket_num < original_buckets_used;
- ++bucket_num) {
- std::atomic<std::size_t> *next_ptr =
- static_cast<std::atomic<std::size_t> *>(current_bucket);
- const std::size_t hash_code = *reinterpret_cast<const std::size_t *>(
- static_cast<const char *>(current_bucket) +
- sizeof(std::atomic<std::size_t>));
-
- const std::size_t slot_number = hash_code % header_->num_slots;
- std::size_t slot_ptr_value = 0;
- if (slots_[slot_number].compare_exchange_strong(
- slot_ptr_value, bucket_num + 1, std::memory_order_relaxed)) {
- // This bucket is the first in the chain for this block, so reset its
- // next pointer to 0.
- next_ptr->store(0, std::memory_order_relaxed);
- } else {
- // A chain already exists starting from this slot, so put this bucket at
- // the head.
- next_ptr->store(slot_ptr_value, std::memory_order_relaxed);
- slots_[slot_number].store(bucket_num + 1, std::memory_order_relaxed);
- }
- current_bucket = static_cast<char *>(current_bucket) + bucket_size_;
- }
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/PackedPayloadAggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadAggregationStateHashTable.hpp b/storage/PackedPayloadAggregationStateHashTable.hpp
deleted file mode 100644
index 85d4f8a..0000000
--- a/storage/PackedPayloadAggregationStateHashTable.hpp
+++ /dev/null
@@ -1,805 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_
-#define QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_
-
-#include <algorithm>
-#include <atomic>
-#include <cstddef>
-#include <cstdlib>
-#include <limits>
-#include <memory>
-#include <type_traits>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/HashTableBase.hpp"
-#include "storage/HashTableKeyManager.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConstants.hpp"
-#include "storage/StorageManager.hpp"
-#include "storage/TupleReference.hpp"
-#include "storage/ValueAccessor.hpp"
-#include "storage/ValueAccessorMultiplexer.hpp"
-#include "storage/ValueAccessorUtil.hpp"
-#include "threading/SpinMutex.hpp"
-#include "threading/SpinSharedMutex.hpp"
-#include "types/Type.hpp"
-#include "types/TypedValue.hpp"
-#include "types/containers/ColumnVectorsValueAccessor.hpp"
-#include "utility/Alignment.hpp"
-#include "utility/HashPair.hpp"
-#include "utility/Macros.hpp"
-#include "utility/PrimeNumber.hpp"
-
-namespace quickstep {
-
-/** \addtogroup Storage
- * @{
- */
-
-class PackedPayloadSeparateChainingAggregationStateHashTable
- : public AggregationStateHashTableBase {
- public:
- PackedPayloadSeparateChainingAggregationStateHashTable(
- const std::vector<const Type *> &key_types,
- const std::size_t num_entries,
- const std::vector<AggregationHandle *> &handles,
- StorageManager *storage_manager);
-
- ~PackedPayloadSeparateChainingAggregationStateHashTable() override;
-
- void clear();
-
- void destroyPayload() override;
-
- bool upsertValueAccessor(
- const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
- const std::vector<MultiSourceAttributeId> &key_ids,
- ValueAccessorMultiplexer *accessor_mux) override;
-
- inline block_id getBlobId() const {
- return blob_->getID();
- }
-
- inline std::size_t numEntries() const {
- return header_->buckets_allocated.load(std::memory_order_relaxed);
- }
-
- inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
- const std::uint8_t *source_state);
-
- template <typename FunctorT>
- inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
- FunctorT *functor,
- int index);
-
- inline const std::uint8_t* getSingleCompositeKey(
- const std::vector<TypedValue> &key) const;
-
- inline const std::uint8_t* getSingleCompositeKey(
- const std::vector<TypedValue> &key,
- const int index) const;
-
- template <typename FunctorT>
- inline std::size_t forEach(FunctorT *functor) const;
-
- template <typename FunctorT>
- inline std::size_t forEach(FunctorT *functor, const int index) const;
-
- template <typename FunctorT>
- inline std::size_t forEachCompositeKey(FunctorT *functor) const;
-
- template <typename FunctorT>
- inline std::size_t forEachCompositeKey(FunctorT *functor,
- const int index) const;
-
- private:
- void resize(const std::size_t extra_buckets,
- const std::size_t extra_variable_storage,
- const std::size_t retry_num = 0);
-
- inline std::size_t calculateVariableLengthCompositeKeyCopySize(
- const std::vector<TypedValue> &key) const {
- std::size_t total = 0;
- for (std::vector<TypedValue>::size_type idx = 0; idx < key.size(); ++idx) {
- if (!(*key_inline_)[idx]) {
- total += key[idx].getDataSize();
- }
- }
- return total;
- }
-
- inline bool getNextEntry(TypedValue *key,
- const std::uint8_t **value,
- std::size_t *entry_num) const;
-
- inline bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
- const std::uint8_t **value,
- std::size_t *entry_num) const;
-
- inline std::uint8_t* upsertCompositeKeyInternal(
- const std::vector<TypedValue> &key,
- const std::size_t variable_key_size);
-
- template <bool use_two_accessors>
- inline bool upsertValueAccessorCompositeKeyInternal(
- const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
- const std::vector<MultiSourceAttributeId> &key_ids,
- ValueAccessor *base_accessor,
- ColumnVectorsValueAccessor *derived_accessor);
-
- // Generate a hash for a composite key by hashing each component of 'key' and
- // mixing their bits with CombineHashes().
- inline std::size_t hashCompositeKey(const std::vector<TypedValue> &key) const;
-
- // Set information about which key components are stored inline. This usually
- // comes from a HashTableKeyManager, and is set by the constructor of a
- // subclass of HashTable.
- inline void setKeyInline(const std::vector<bool> *key_inline) {
- scalar_key_inline_ = key_inline->front();
- key_inline_ = key_inline;
- }
-
- inline static std::size_t ComputeTotalPayloadSize(
- const std::vector<AggregationHandle *> &handles) {
- std::size_t total_payload_size = sizeof(SpinMutex);
- for (const auto *handle : handles) {
- total_payload_size += handle->getPayloadSize();
- }
- return total_payload_size;
- }
-
- // Assign '*key_vector' with the attribute values specified by 'key_ids' at
- // the current position of 'accessor'. If 'check_for_null_keys' is true, stops
- // and returns true if any of the values is null, otherwise returns false.
- template <bool use_two_accessors,
- bool check_for_null_keys,
- typename ValueAccessorT>
- inline static bool GetCompositeKeyFromValueAccessor(
- const std::vector<MultiSourceAttributeId> &key_ids,
- const ValueAccessorT *accessor,
- const ColumnVectorsValueAccessor *derived_accessor,
- std::vector<TypedValue> *key_vector) {
- for (std::size_t key_idx = 0; key_idx < key_ids.size(); ++key_idx) {
- const MultiSourceAttributeId &key_id = key_ids[key_idx];
- if (use_two_accessors && key_id.source == ValueAccessorSource::kDerived) {
- (*key_vector)[key_idx] = derived_accessor->getTypedValue(key_id.attr_id);
- } else {
- (*key_vector)[key_idx] = accessor->getTypedValue(key_id.attr_id);
- }
- if (check_for_null_keys && (*key_vector)[key_idx].isNull()) {
- return true;
- }
- }
- return false;
- }
-
- struct Header {
- std::size_t num_slots;
- std::size_t num_buckets;
- alignas(kCacheLineBytes) std::atomic<std::size_t> buckets_allocated;
- alignas(kCacheLineBytes)
- std::atomic<std::size_t> variable_length_bytes_allocated;
- };
-
- // Type(s) of keys.
- const std::vector<const Type *> key_types_;
-
- // Information about whether key components are stored inline or in a
- // separate variable-length storage region. This is usually determined by a
- // HashTableKeyManager and set by calling setKeyInline().
- bool scalar_key_inline_;
- const std::vector<bool> *key_inline_;
-
- const std::size_t num_handles_;
- const std::vector<AggregationHandle *> handles_;
-
- std::size_t total_payload_size_;
- std::vector<std::size_t> payload_offsets_;
- std::uint8_t *init_payload_;
-
- StorageManager *storage_manager_;
- MutableBlobReference blob_;
-
- // Locked in shared mode for most operations, exclusive mode during resize.
- // Not locked at all for non-resizable HashTables.
- alignas(kCacheLineBytes) SpinSharedMutex<true> resize_shared_mutex_;
-
- std::size_t kBucketAlignment;
-
- // Value's offset in a bucket is the first alignof(ValueT) boundary after the
- // next pointer and hash code.
- std::size_t kValueOffset;
-
- // Round bucket size up to a multiple of kBucketAlignment.
- constexpr std::size_t ComputeBucketSize(const std::size_t fixed_key_size) {
- return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) /
- kBucketAlignment) +
- 1) *
- kBucketAlignment;
- }
-
- // Attempt to find an empty bucket to insert 'hash_code' into, starting after
- // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
- // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
- // empty bucket is found. Returns false if 'allow_duplicate_keys' is false
- // and a hash collision is found (caller should then check whether there is a
- // genuine key collision or the hash collision is spurious). Returns false
- // and sets '*bucket' to NULL if there are no more empty buckets in the hash
- // table. If 'variable_key_allocation_required' is nonzero, this method will
- // attempt to allocate storage for a variable-length key BEFORE allocating a
- // bucket, so that no bucket number below 'header_->num_buckets' is ever
- // deallocated after being allocated.
- inline bool locateBucketForInsertion(
- const std::size_t hash_code,
- const std::size_t variable_key_allocation_required,
- void **bucket,
- std::atomic<std::size_t> **pending_chain_ptr,
- std::size_t *pending_chain_ptr_finish_value);
-
- // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was
- // found by locateBucketForInsertion(). Assumes that storage for a
- // variable-length key copy (if any) was already allocated by a successful
- // call to allocateVariableLengthKeyStorage().
- inline void writeScalarKeyToBucket(
- const TypedValue &key,
- const std::size_t hash_code,
- void *bucket);
-
- // Write a composite 'key' and its 'hash_code' into the '*bucket', which was
- // found by locateBucketForInsertion(). Assumes that storage for
- // variable-length key copies (if any) was already allocated by a successful
- // call to allocateVariableLengthKeyStorage().
- inline void writeCompositeKeyToBucket(
- const std::vector<TypedValue> &key,
- const std::size_t hash_code,
- void *bucket);
-
- // Determine whether it is actually necessary to resize this hash table.
- // Checks that there is at least one unallocated bucket, and that there is
- // at least 'extra_variable_storage' bytes of variable-length storage free.
- inline bool isFull(const std::size_t extra_variable_storage) const;
-
- // Helper object to manage key storage.
- HashTableKeyManager<false, true> key_manager_;
-
- // In-memory structure is as follows:
- // - SeparateChainingHashTable::Header
- // - Array of slots, interpreted as follows:
- // - 0 = Points to nothing (empty)
- // - SIZE_T_MAX = Pending (some thread is starting a chain from this
- // slot and will overwrite it soon)
- // - Anything else = The number of the first bucket in the chain for
- // this slot PLUS ONE (i.e. subtract one to get the actual bucket
- // number).
- // - Array of buckets, each of which is:
- // - atomic size_t "next" pointer, interpreted the same as slots above.
- // - size_t hash value
- // - possibly some unused bytes as needed so that ValueT's alignment
- // requirement is met
- // - ValueT value slot
- // - fixed-length key storage (which may include pointers to external
- // memory or offsets of variable length keys stored within this hash
- // table)
- // - possibly some additional unused bytes so that bucket size is a
- // multiple of both alignof(std::atomic<std::size_t>) and
- // alignof(ValueT)
- // - Variable-length key storage region (referenced by offsets stored in
- // fixed-length keys).
- Header *header_;
-
- std::atomic<std::size_t> *slots_;
- void *buckets_;
- const std::size_t bucket_size_;
-
- DISALLOW_COPY_AND_ASSIGN(PackedPayloadSeparateChainingAggregationStateHashTable);
-};
-
-/** @} */
-
-// ----------------------------------------------------------------------------
-// Implementations of template class methods follow.
-
-class HashTableMerger {
- public:
- /**
- * @brief Constructor
- *
- * @param handle The Aggregation handle being used.
- * @param destination_hash_table The destination hash table to which other
- * hash tables will be merged.
- **/
- explicit HashTableMerger(
- AggregationStateHashTableBase *destination_hash_table)
- : destination_hash_table_(
- static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
- destination_hash_table)) {}
-
- /**
- * @brief The operator for the functor.
- *
- * @param group_by_key The group by key being merged.
- * @param source_state The aggregation state for the given key in the source
- * aggregation hash table.
- **/
- inline void operator()(const std::vector<TypedValue> &group_by_key,
- const std::uint8_t *source_state) {
- destination_hash_table_->upsertCompositeKey(group_by_key, source_state);
- }
-
- private:
- PackedPayloadSeparateChainingAggregationStateHashTable *destination_hash_table_;
-
- DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
-};
-
-inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
- ::hashCompositeKey(const std::vector<TypedValue> &key) const {
- DEBUG_ASSERT(!key.empty());
- DEBUG_ASSERT(key.size() == key_types_.size());
- std::size_t hash = key.front().getHash();
- for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
- key_it != key.end();
- ++key_it) {
- hash = CombineHashes(hash, key_it->getHash());
- }
- return hash;
-}
-
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
- ::getNextEntry(TypedValue *key,
- const std::uint8_t **value,
- std::size_t *entry_num) const {
- if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
- const char *bucket =
- static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
- *key = key_manager_.getKeyComponentTyped(bucket, 0);
- *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
- ++(*entry_num);
- return true;
- } else {
- return false;
- }
-}
-
-
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
- ::getNextEntryCompositeKey(std::vector<TypedValue> *key,
- const std::uint8_t **value,
- std::size_t *entry_num) const {
- if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
- const char *bucket =
- static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
- for (std::vector<const Type *>::size_type key_idx = 0;
- key_idx < this->key_types_.size();
- ++key_idx) {
- key->emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx));
- }
- *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
- ++(*entry_num);
- return true;
- } else {
- return false;
- }
-}
-
-
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
- ::locateBucketForInsertion(const std::size_t hash_code,
- const std::size_t variable_key_allocation_required,
- void **bucket,
- std::atomic<std::size_t> **pending_chain_ptr,
- std::size_t *pending_chain_ptr_finish_value) {
- if (*bucket == nullptr) {
- *pending_chain_ptr = &(slots_[hash_code % header_->num_slots]);
- } else {
- *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
- }
- for (;;) {
- std::size_t existing_chain_ptr = 0;
- if ((*pending_chain_ptr)
- ->compare_exchange_strong(existing_chain_ptr,
- std::numeric_limits<std::size_t>::max(),
- std::memory_order_acq_rel)) {
- // Got to the end of the chain. Allocate a new bucket.
-
- // First, allocate variable-length key storage, if needed (i.e. if this
- // is an upsert and we didn't allocate up-front).
- if (!key_manager_.allocateVariableLengthKeyStorage(
- variable_key_allocation_required)) {
- // Ran out of variable-length storage.
- (*pending_chain_ptr)->store(0, std::memory_order_release);
- *bucket = nullptr;
- return false;
- }
-
- const std::size_t allocated_bucket_num =
- header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed);
- if (allocated_bucket_num >= header_->num_buckets) {
- // Ran out of buckets.
- header_->buckets_allocated.fetch_sub(1, std::memory_order_relaxed);
- (*pending_chain_ptr)->store(0, std::memory_order_release);
- *bucket = nullptr;
- return false;
- } else {
- *bucket =
- static_cast<char *>(buckets_) + allocated_bucket_num * bucket_size_;
- *pending_chain_ptr_finish_value = allocated_bucket_num + 1;
- return true;
- }
- }
- // Spin until the real "next" pointer is available.
- while (existing_chain_ptr == std::numeric_limits<std::size_t>::max()) {
- existing_chain_ptr =
- (*pending_chain_ptr)->load(std::memory_order_acquire);
- }
- if (existing_chain_ptr == 0) {
- // Other thread had to roll back, so try again.
- continue;
- }
- // Chase the next pointer.
- *bucket =
- static_cast<char *>(buckets_) + (existing_chain_ptr - 1) * bucket_size_;
- *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
- const std::size_t hash_in_bucket = *reinterpret_cast<const std::size_t *>(
- static_cast<const char *>(*bucket) +
- sizeof(std::atomic<std::size_t>));
- if (hash_in_bucket == hash_code) {
- return false;
- }
- }
-}
-
-inline const std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable
- ::getSingleCompositeKey(const std::vector<TypedValue> &key) const {
- DEBUG_ASSERT(this->key_types_.size() == key.size());
-
- const std::size_t hash_code = this->hashCompositeKey(key);
- std::size_t bucket_ref =
- slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
- while (bucket_ref != 0) {
- DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
- const char *bucket =
- static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
- const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
- bucket + sizeof(std::atomic<std::size_t>));
- if ((bucket_hash == hash_code) &&
- key_manager_.compositeKeyCollisionCheck(key, bucket)) {
- // Match located.
- return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
- }
- bucket_ref =
- reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
- std::memory_order_relaxed);
- }
-
- // Reached the end of the chain and didn't find a match.
- return nullptr;
-}
-
-inline const std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable
- ::getSingleCompositeKey(const std::vector<TypedValue> &key,
- const int index) const {
- DEBUG_ASSERT(this->key_types_.size() == key.size());
-
- const std::size_t hash_code = this->hashCompositeKey(key);
- std::size_t bucket_ref =
- slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
- while (bucket_ref != 0) {
- DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
- const char *bucket =
- static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
- const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
- bucket + sizeof(std::atomic<std::size_t>));
- if ((bucket_hash == hash_code) &&
- key_manager_.compositeKeyCollisionCheck(key, bucket)) {
- // Match located.
- return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset) +
- this->payload_offsets_[index];
- }
- bucket_ref =
- reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
- std::memory_order_relaxed);
- }
-
- // Reached the end of the chain and didn't find a match.
- return nullptr;
-}
-
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
- ::upsertCompositeKey(const std::vector<TypedValue> &key,
- const std::uint8_t *source_state) {
- const std::size_t variable_size =
- calculateVariableLengthCompositeKeyCopySize(key);
- for (;;) {
- {
- SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
- std::uint8_t *value =
- upsertCompositeKeyInternal(key, variable_size);
- if (value != nullptr) {
- SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
- for (unsigned int k = 0; k < num_handles_; ++k) {
- handles_[k]->mergeStates(source_state + payload_offsets_[k],
- value + payload_offsets_[k]);
- }
- return true;
- }
- }
- resize(0, variable_size);
- }
-}
-
-template <typename FunctorT>
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
- ::upsertCompositeKey(const std::vector<TypedValue> &key,
- FunctorT *functor,
- int index) {
- const std::size_t variable_size =
- calculateVariableLengthCompositeKeyCopySize(key);
- for (;;) {
- {
- SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
- std::uint8_t *value =
- upsertCompositeKeyInternal(key, variable_size);
- if (value != nullptr) {
- (*functor)(value + payload_offsets_[index]);
- return true;
- }
- }
- resize(0, variable_size);
- }
-}
-
-
-inline std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable
- ::upsertCompositeKeyInternal(const std::vector<TypedValue> &key,
- const std::size_t variable_key_size) {
- if (variable_key_size > 0) {
- // Don't allocate yet, since the key may already be present. However, we
- // do check if either the allocated variable storage space OR the free
- // space is big enough to hold the key (at least one must be true: either
- // the key is already present and allocated, or we need to be able to
- // allocate enough space for it).
- std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(
- std::memory_order_relaxed);
- if ((allocated_bytes < variable_key_size) &&
- (allocated_bytes + variable_key_size >
- key_manager_.getVariableLengthKeyStorageSize())) {
- return nullptr;
- }
- }
-
- const std::size_t hash_code = this->hashCompositeKey(key);
- void *bucket = nullptr;
- std::atomic<std::size_t> *pending_chain_ptr;
- std::size_t pending_chain_ptr_finish_value;
- for (;;) {
- if (locateBucketForInsertion(hash_code,
- variable_key_size,
- &bucket,
- &pending_chain_ptr,
- &pending_chain_ptr_finish_value)) {
- // Found an empty bucket.
- break;
- } else if (bucket == nullptr) {
- // Ran out of buckets or variable-key space.
- return nullptr;
- } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
- // Found an already-existing entry for this key.
- return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) +
- kValueOffset);
- }
- }
-
- // We are now writing to an empty bucket.
- // Write the key and hash.
- writeCompositeKeyToBucket(key, hash_code, bucket);
-
- std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
- std::memcpy(value, init_payload_, this->total_payload_size_);
-
- // Update the previous chaing pointer to point to the new bucket.
- pending_chain_ptr->store(pending_chain_ptr_finish_value,
- std::memory_order_release);
-
- // Return the value.
- return value;
-}
-
-template <bool use_two_accessors>
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
- ::upsertValueAccessorCompositeKeyInternal(
- const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
- const std::vector<MultiSourceAttributeId> &key_ids,
- ValueAccessor *base_accessor,
- ColumnVectorsValueAccessor *derived_accessor) {
- std::size_t variable_size;
- std::vector<TypedValue> key_vector;
- key_vector.resize(key_ids.size());
-
- return InvokeOnAnyValueAccessor(
- base_accessor,
- [&](auto *accessor) -> bool { // NOLINT(build/c++11)
- bool continuing = true;
- while (continuing) {
- {
- continuing = false;
- SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
- while (accessor->next()) {
- if (use_two_accessors) {
- derived_accessor->next();
- }
- if (this->GetCompositeKeyFromValueAccessor<use_two_accessors, true>(
- key_ids,
- accessor,
- derived_accessor,
- &key_vector)) {
- continue;
- }
- variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
- std::uint8_t *value = this->upsertCompositeKeyInternal(
- key_vector, variable_size);
- if (value == nullptr) {
- continuing = true;
- break;
- } else {
- SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
- for (unsigned int k = 0; k < num_handles_; ++k) {
- const auto &ids = argument_ids[k];
- if (ids.empty()) {
- handles_[k]->updateStateNullary(value + payload_offsets_[k]);
- } else {
- const MultiSourceAttributeId &arg_id = ids.front();
- if (use_two_accessors && arg_id.source == ValueAccessorSource::kDerived) {
- DCHECK_NE(arg_id.attr_id, kInvalidAttributeID);
- handles_[k]->updateStateUnary(derived_accessor->getTypedValue(arg_id.attr_id),
- value + payload_offsets_[k]);
- } else {
- handles_[k]->updateStateUnary(accessor->getTypedValue(arg_id.attr_id),
- value + payload_offsets_[k]);
- }
- }
- }
- }
- }
- }
- if (continuing) {
- this->resize(0, variable_size);
- accessor->previous();
- if (use_two_accessors) {
- derived_accessor->previous();
- }
- }
- }
- return true;
- });
-}
-
-inline void PackedPayloadSeparateChainingAggregationStateHashTable
- ::writeScalarKeyToBucket(const TypedValue &key,
- const std::size_t hash_code,
- void *bucket) {
- *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
- sizeof(std::atomic<std::size_t>)) =
- hash_code;
- key_manager_.writeKeyComponentToBucket(key, 0, bucket, nullptr);
-}
-
-inline void PackedPayloadSeparateChainingAggregationStateHashTable
- ::writeCompositeKeyToBucket(const std::vector<TypedValue> &key,
- const std::size_t hash_code,
- void *bucket) {
- DEBUG_ASSERT(key.size() == this->key_types_.size());
- *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
- sizeof(std::atomic<std::size_t>)) =
- hash_code;
- for (std::size_t idx = 0; idx < this->key_types_.size(); ++idx) {
- key_manager_.writeKeyComponentToBucket(key[idx], idx, bucket, nullptr);
- }
-}
-
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable::isFull(
- const std::size_t extra_variable_storage) const {
- if (header_->buckets_allocated.load(std::memory_order_relaxed) >=
- header_->num_buckets) {
- // All buckets are allocated.
- return true;
- }
-
- if (extra_variable_storage > 0) {
- if (extra_variable_storage +
- header_->variable_length_bytes_allocated.load(
- std::memory_order_relaxed) >
- key_manager_.getVariableLengthKeyStorageSize()) {
- // Not enough variable-length key storage space.
- return true;
- }
- }
-
- return false;
-}
-
-template <typename FunctorT>
-inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
- ::forEach(FunctorT *functor) const {
- std::size_t entries_visited = 0;
- std::size_t entry_num = 0;
- TypedValue key;
- const std::uint8_t *value_ptr;
- while (getNextEntry(&key, &value_ptr, &entry_num)) {
- ++entries_visited;
- (*functor)(key, value_ptr);
- }
- return entries_visited;
-}
-
-template <typename FunctorT>
-inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
- ::forEach(FunctorT *functor, const int index) const {
- std::size_t entries_visited = 0;
- std::size_t entry_num = 0;
- TypedValue key;
- const std::uint8_t *value_ptr;
- while (getNextEntry(&key, &value_ptr, &entry_num)) {
- ++entries_visited;
- (*functor)(key, value_ptr + payload_offsets_[index]);
- key.clear();
- }
- return entries_visited;
-}
-
-template <typename FunctorT>
-inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
- ::forEachCompositeKey(FunctorT *functor) const {
- std::size_t entries_visited = 0;
- std::size_t entry_num = 0;
- std::vector<TypedValue> key;
- const std::uint8_t *value_ptr;
- while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
- ++entries_visited;
- (*functor)(key, value_ptr);
- key.clear();
- }
- return entries_visited;
-}
-
-template <typename FunctorT>
-inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
- ::forEachCompositeKey(FunctorT *functor,
- const int index) const {
- std::size_t entries_visited = 0;
- std::size_t entry_num = 0;
- std::vector<TypedValue> key;
- const std::uint8_t *value_ptr;
- while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
- ++entries_visited;
- (*functor)(key, value_ptr + payload_offsets_[index]);
- key.clear();
- }
- return entries_visited;
-}
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_