You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/07/04 07:57:01 UTC
[2/5] incubator-quickstep git commit: Initial commit for QUICKSTEP-28
and QUICKSTEP-29. Code refactoring and cleanup,
some more optimizations are pending.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5e7d09f/storage/FastSeparateChainingHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastSeparateChainingHashTable.hpp b/storage/FastSeparateChainingHashTable.hpp
new file mode 100644
index 0000000..64c4979
--- /dev/null
+++ b/storage/FastSeparateChainingHashTable.hpp
@@ -0,0 +1,1761 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_FAST_SEPARATE_CHAINING_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_FAST_SEPARATE_CHAINING_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstring>
+#include <limits>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "storage/HashTable.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/HashTableKeyManager.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "types/Type.hpp"
+#include "types/TypedValue.hpp"
+#include "utility/Alignment.hpp"
+#include "utility/Macros.hpp"
+#include "utility/PrimeNumber.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ * @{
+ */
+
+/**
+ * @brief A hash table implementation which uses separate chaining for buckets.
+ **/
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+class FastSeparateChainingHashTable : public FastHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys> {
+ public:
+ FastSeparateChainingHashTable(const std::vector<const Type*> &key_types,
+ const std::size_t num_entries,
+ const std::vector<std::size_t> &payload_sizes,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager);
+
+ FastSeparateChainingHashTable(const std::vector<const Type*> &key_types,
+ void *hash_table_memory,
+ const std::size_t hash_table_memory_size,
+ const bool new_hash_table,
+ const bool hash_table_memory_zeroed);
+
+ // Delegating constructors for single scalar keys.
+ FastSeparateChainingHashTable(const Type &key_type,
+ const std::size_t num_entries,
+ StorageManager *storage_manager)
+ : FastSeparateChainingHashTable(std::vector<const Type*>(1, &key_type),
+ num_entries,
+ storage_manager) {
+ }
+
+ FastSeparateChainingHashTable(const Type &key_type,
+ void *hash_table_memory,
+ const std::size_t hash_table_memory_size,
+ const bool new_hash_table,
+ const bool hash_table_memory_zeroed)
+ : FastSeparateChainingHashTable(std::vector<const Type*>(1, &key_type),
+ hash_table_memory,
+ hash_table_memory_size,
+ new_hash_table,
+ hash_table_memory_zeroed) {
+ }
+
+ ~FastSeparateChainingHashTable() override {
+ DestroyValues(buckets_,
+ header_->buckets_allocated.load(std::memory_order_relaxed),
+ bucket_size_);
+ std::free(init_payload_);
+ }
+
+ void clear() override;
+
+ std::size_t numEntries() const override {
+ return header_->buckets_allocated.load(std::memory_order_relaxed);
+ }
+
+ const uint8_t* getSingle(const TypedValue &key) const override;
+ const uint8_t* getSingleCompositeKey(const std::vector<TypedValue> &key) const override;
+ const uint8_t* getSingleCompositeKey(const std::vector<TypedValue> &key, int index) const override;
+
+ void getAll(const TypedValue &key,
+ std::vector<const uint8_t*> *values) const override;
+ void getAllCompositeKey(const std::vector<TypedValue> &key,
+ std::vector<const uint8_t*> *values) const override;
+
+ protected:
+ HashTablePutResult putInternal(const TypedValue &key,
+ const std::size_t variable_key_size,
+ const uint8_t &value,
+ HashTablePreallocationState *prealloc_state) override;
+ HashTablePutResult putCompositeKeyInternal(const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size,
+ const uint8_t &value,
+ HashTablePreallocationState *prealloc_state) override;
+ HashTablePutResult putCompositeKeyInternalFast(const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size,
+ const std::uint8_t *init_value_ptr,
+ HashTablePreallocationState *prealloc_state) override;
+
+ uint8_t* upsertInternal(const TypedValue &key,
+ const std::size_t variable_key_size,
+ const uint8_t &initial_value) override;
+ uint8_t* upsertInternalFast(const TypedValue &key,
+ const std::uint8_t *init_value_ptr,
+ const std::size_t variable_key_size) override;
+
+ uint8_t* upsertCompositeKeyInternal(const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size,
+ const uint8_t &initial_value) override;
+
+ uint8_t* upsertCompositeKeyInternalFast(const std::vector<TypedValue> &key,
+ const std::uint8_t *init_value_ptr,
+ const std::size_t variable_key_size) override;
+
+ bool getNextEntry(TypedValue *key,
+ const uint8_t **value,
+ std::size_t *entry_num) const override;
+ bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
+ const uint8_t **value,
+ std::size_t *entry_num) const override;
+
+ bool getNextEntryForKey(const TypedValue &key,
+ const std::size_t hash_code,
+ const uint8_t **value,
+ std::size_t *entry_num) const override;
+ bool getNextEntryForCompositeKey(const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ const uint8_t **value,
+ std::size_t *entry_num) const override;
+
+ bool hasKey(const TypedValue &key) const override;
+ bool hasCompositeKey(const std::vector<TypedValue> &key) const override;
+
+ void resize(const std::size_t extra_buckets,
+ const std::size_t extra_variable_storage,
+ const std::size_t retry_num = 0) override;
+
+ bool preallocateForBulkInsert(const std::size_t total_entries,
+ const std::size_t total_variable_key_size,
+ HashTablePreallocationState *prealloc_state) override;
+
+ size_t get_buckets_allocated() const override {return header_->buckets_allocated;}
+
+ private:
+ struct Header {
+ std::size_t num_slots;
+ std::size_t num_buckets;
+ alignas(kCacheLineBytes)
+ std::atomic<std::size_t> buckets_allocated;
+ alignas(kCacheLineBytes)
+ std::atomic<std::size_t> variable_length_bytes_allocated;
+ };
+
+ std::uint8_t *init_payload_;
+ std::size_t kBucketAlignment;
+
+ // Value's offset in a bucket is the first alignof(ValueT) boundary after the
+ // next pointer and hash code.
+ std::size_t kValueOffset;
+
+ // Round bucket size up to a multiple of kBucketAlignment.
+ constexpr std::size_t ComputeBucketSize(const std::size_t fixed_key_size) {
+ return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) / kBucketAlignment) + 1)
+ * kBucketAlignment;
+ }
+ // If ValueT is not trivially destructible, invoke its destructor for all
+ // values held in the specified buckets (including those in "empty" buckets
+ // that were default constructed). If ValueT is trivially destructible, this
+ // is a no-op.
+ void DestroyValues(void *buckets,
+ const std::size_t num_buckets,
+ const std::size_t bucket_size);
+
+ // Attempt to find an empty bucket to insert 'hash_code' into, starting after
+ // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
+ // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
+ // empty bucket is found. Returns false if 'allow_duplicate_keys' is false
+ // and a hash collision is found (caller should then check whether there is a
+ // genuine key collision or the hash collision is spurious). Returns false
+ // and sets '*bucket' to NULL if there are no more empty buckets in the hash
+ // table. If 'variable_key_allocation_required' is nonzero, this method will
+ // attempt to allocate storage for a variable-length key BEFORE allocating a
+ // bucket, so that no bucket number below 'header_->num_buckets' is ever
+ // deallocated after being allocated.
+ inline bool locateBucketForInsertion(const std::size_t hash_code,
+ const std::size_t variable_key_allocation_required,
+ void **bucket,
+ std::atomic<std::size_t> **pending_chain_ptr,
+ std::size_t *pending_chain_ptr_finish_value,
+ HashTablePreallocationState *prealloc_state);
+
+ // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was
+ // found by locateBucketForInsertion(). Assumes that storage for a
+ // variable-length key copy (if any) was already allocated by a successful
+ // call to allocateVariableLengthKeyStorage().
+ inline void writeScalarKeyToBucket(const TypedValue &key,
+ const std::size_t hash_code,
+ void *bucket,
+ HashTablePreallocationState *prealloc_state);
+
+ // Write a composite 'key' and its 'hash_code' into the '*bucket', which was
+ // found by locateBucketForInsertion(). Assumes that storage for
+ // variable-length key copies (if any) was already allocated by a successful
+ // call to allocateVariableLengthKeyStorage().
+ inline void writeCompositeKeyToBucket(const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ void *bucket,
+ HashTablePreallocationState *prealloc_state);
+
+ // Determine whether it is actually necessary to resize this hash table.
+ // Checks that there is at least one unallocated bucket, and that there is
+ // at least 'extra_variable_storage' bytes of variable-length storage free.
+ bool isFull(const std::size_t extra_variable_storage) const;
+
+ // Helper object to manage key storage.
+ HashTableKeyManager<serializable, force_key_copy> key_manager_;
+
+ // In-memory structure is as follows:
+ // - SeparateChainingHashTable::Header
+ // - Array of slots, interpreted as follows:
+ // - 0 = Points to nothing (empty)
+ // - SIZE_T_MAX = Pending (some thread is starting a chain from this
+ // slot and will overwrite it soon)
+ // - Anything else = The number of the first bucket in the chain for
+ // this slot PLUS ONE (i.e. subtract one to get the actual bucket
+ // number).
+ // - Array of buckets, each of which is:
+ // - atomic size_t "next" pointer, interpreted the same as slots above.
+ // - size_t hash value
+ // - possibly some unused bytes as needed so that ValueT's alignment
+ // requirement is met
+ // - ValueT value slot
+ // - fixed-length key storage (which may include pointers to external
+ // memory or offsets of variable length keys stored within this hash
+ // table)
+ // - possibly some additional unused bytes so that bucket size is a
+ // multiple of both alignof(std::atomic<std::size_t>) and
+ // alignof(ValueT)
+ // - Variable-length key storage region (referenced by offsets stored in
+ // fixed-length keys).
+ Header *header_;
+
+ std::atomic<std::size_t> *slots_;
+ void *buckets_;
+ const std::size_t bucket_size_;
+
+ DISALLOW_COPY_AND_ASSIGN(FastSeparateChainingHashTable);
+};
+
+/** @} */
+
+// ----------------------------------------------------------------------------
+// Implementations of template class methods follow.
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::FastSeparateChainingHashTable(const std::vector<const Type*> &key_types,
+ const std::size_t num_entries,
+ const std::vector<std::size_t> &payload_sizes,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager)
+ : FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>(
+ key_types,
+ num_entries,
+ handles,
+ payload_sizes,
+ storage_manager,
+ false,
+ false,
+ true),
+ kBucketAlignment(alignof(std::atomic<std::size_t>)),
+ kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
+ key_manager_(this->key_types_, kValueOffset + this->total_payload_size_),
+ bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
+ init_payload_ = static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
+ for (auto handle : handles)
+ handle->initPayload(init_payload_);
+ // Bucket size always rounds up to the alignment requirement of the atomic
+ // size_t "next" pointer at the front or a ValueT, whichever is larger.
+ //
+ // Give base HashTable information about what key components are stored
+ // inline from 'key_manager_'.
+ this->setKeyInline(key_manager_.getKeyInline());
+
+ // Pick out a prime number of slots and calculate storage requirements.
+ std::size_t num_slots_tmp = get_next_prime_number(num_entries * kHashTableLoadFactor);
+ std::size_t required_memory = sizeof(Header)
+ + num_slots_tmp * sizeof(std::atomic<std::size_t>)
+ + (num_slots_tmp / kHashTableLoadFactor)
+ * (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
+ std::size_t num_storage_slots = this->storage_manager_->SlotsNeededForBytes(required_memory);
+ if (num_storage_slots == 0) {
+ FATAL_ERROR("Storage requirement for SeparateChainingHashTable "
+ "exceeds maximum allocation size.");
+ }
+
+ // Get a StorageBlob to hold the hash table.
+ const block_id blob_id = this->storage_manager_->createBlob(num_storage_slots);
+ this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
+
+ void *aligned_memory_start = this->blob_->getMemoryMutable();
+ std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
+ if (align(alignof(Header),
+ sizeof(Header),
+ aligned_memory_start,
+ available_memory)
+ == nullptr) {
+ // With current values from StorageConstants.hpp, this should be
+ // impossible. A blob is at least 1 MB, while a Header has alignment
+ // requirement of just kCacheLineBytes (64 bytes).
+ FATAL_ERROR("StorageBlob used to hold resizable "
+ "SeparateChainingHashTable is too small to meet alignment "
+ "requirements of SeparateChainingHashTable::Header.");
+ } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
+ // This should also be impossible, since the StorageManager allocates slots
+ // aligned to kCacheLineBytes.
+ DEV_WARNING("StorageBlob memory adjusted by "
+ << (num_storage_slots * kSlotSizeBytes - available_memory)
+ << " bytes to meet alignment requirement for "
+ << "SeparateChainingHashTable::Header.");
+ }
+
+ // Locate the header.
+ header_ = static_cast<Header*>(aligned_memory_start);
+ aligned_memory_start = static_cast<char*>(aligned_memory_start) + sizeof(Header);
+ available_memory -= sizeof(Header);
+
+ // Recompute the number of slots & buckets using the actual available memory.
+ // Most likely, we got some extra free bucket space due to "rounding up" to
+ // the storage blob's size. It's also possible (though very unlikely) that we
+ // will wind up with fewer buckets than we initially wanted because of screwy
+ // alignment requirements for ValueT.
+ std::size_t num_buckets_tmp
+ = available_memory / (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>)
+ + bucket_size_
+ + key_manager_.getEstimatedVariableKeySize());
+ num_slots_tmp = get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor);
+ num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
+ DEBUG_ASSERT(num_slots_tmp > 0);
+ DEBUG_ASSERT(num_buckets_tmp > 0);
+
+ // Locate the slot array.
+ slots_ = static_cast<std::atomic<std::size_t>*>(aligned_memory_start);
+ aligned_memory_start = static_cast<char*>(aligned_memory_start)
+ + sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+ available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+
+ // Locate the buckets.
+ buckets_ = aligned_memory_start;
+ // Extra-paranoid: If ValueT has an alignment requirement greater than that
+ // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
+ // array.
+ if (align(kBucketAlignment,
+ bucket_size_,
+ buckets_,
+ available_memory)
+ == nullptr) {
+ FATAL_ERROR("StorageBlob used to hold resizable "
+ "SeparateChainingHashTable is too small to meet "
+ "alignment requirements of buckets.");
+ } else if (buckets_ != aligned_memory_start) {
+ DEV_WARNING("Bucket array start position adjusted to meet alignment "
+ "requirement for SeparateChainingHashTable's value type.");
+ if (num_buckets_tmp * bucket_size_ > available_memory) {
+ --num_buckets_tmp;
+ }
+ }
+
+ // Fill in the header.
+ header_->num_slots = num_slots_tmp;
+ header_->num_buckets = num_buckets_tmp;
+ header_->buckets_allocated.store(0, std::memory_order_relaxed);
+ header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+ available_memory -= bucket_size_ * (header_->num_buckets);
+
+ // Locate variable-length key storage region, and give it all the remaining
+ // bytes in the blob.
+ key_manager_.setVariableLengthStorageInfo(
+ static_cast<char*>(buckets_) + header_->num_buckets * bucket_size_,
+ available_memory,
+ &(header_->variable_length_bytes_allocated));
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::FastSeparateChainingHashTable(const std::vector<const Type*> &key_types,
+ void *hash_table_memory,
+ const std::size_t hash_table_memory_size,
+ const bool new_hash_table,
+ const bool hash_table_memory_zeroed)
+ : FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>(
+ key_types,
+ hash_table_memory,
+ hash_table_memory_size,
+ new_hash_table,
+ hash_table_memory_zeroed,
+ false,
+ false,
+ true),
+ kBucketAlignment(alignof(std::atomic<std::size_t>) < alignof(uint8_t) ? alignof(uint8_t)
+ : alignof(std::atomic<std::size_t>)),
+ kValueOffset((((sizeof(std::atomic<std::size_t>) + sizeof(std::size_t) - 1) /
+ alignof(uint8_t)) + 1) * alignof(uint8_t)),
+ key_manager_(this->key_types_, kValueOffset + sizeof(uint8_t)),
+ bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
+ // Bucket size always rounds up to the alignment requirement of the atomic
+ // size_t "next" pointer at the front or a ValueT, whichever is larger.
+ //
+ // Make sure that the larger of the two alignment requirements also satisfies
+ // the smaller.
+ static_assert(alignof(std::atomic<std::size_t>) < alignof(uint8_t)
+ ? alignof(uint8_t) % alignof(std::atomic<std::size_t>) == 0
+ : alignof(std::atomic<std::size_t>) % alignof(uint8_t) == 0,
+ "Alignment requirement of std::atomic<std::size_t> does not "
+ "evenly divide with alignment requirement of ValueT.");
+
+ // Give base HashTable information about what key components are stored
+ // inline from 'key_manager_'.
+ this->setKeyInline(key_manager_.getKeyInline());
+
+ // FIXME(chasseur): If we are reconstituting a HashTable using a block of
+ // memory whose start was aligned differently than the memory block that was
+ // originally used (modulo alignof(Header)), we could wind up with all of our
+ // data structures misaligned. If memory is inside a
+ // StorageBlock/StorageBlob, this will never occur, since the StorageManager
+ // always allocates slots aligned to kCacheLineBytes. Similarly, this isn't
+ // a problem for memory inside any other allocation aligned to at least
+ // alignof(Header) == kCacheLineBytes.
+
+ void *aligned_memory_start = this->hash_table_memory_;
+ std::size_t available_memory = this->hash_table_memory_size_;
+
+ if (align(alignof(Header),
+ sizeof(Header),
+ aligned_memory_start,
+ available_memory)
+ == nullptr) {
+ FATAL_ERROR("Attempted to create a non-resizable "
+ << "SeparateChainingHashTable with "
+ << available_memory << " bytes of memory at "
+ << aligned_memory_start << " which either can not fit a "
+ << "SeparateChainingHashTable::Header or meet its alignement "
+ << "requirement.");
+ } else if (aligned_memory_start != this->hash_table_memory_) {
+ // In general, we could get memory of any alignment, although at least
+ // cache-line aligned would be nice.
+ DEV_WARNING("StorageBlob memory adjusted by "
+ << (this->hash_table_memory_size_ - available_memory)
+ << " bytes to meet alignment requirement for "
+ << "SeparateChainingHashTable::Header.");
+ }
+
+ header_ = static_cast<Header*>(aligned_memory_start);
+ aligned_memory_start = static_cast<char*>(aligned_memory_start) + sizeof(Header);
+ available_memory -= sizeof(Header);
+
+ if (new_hash_table) {
+ std::size_t estimated_bucket_capacity
+ = available_memory / (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>)
+ + bucket_size_
+ + key_manager_.getEstimatedVariableKeySize());
+ std::size_t num_slots = get_previous_prime_number(estimated_bucket_capacity * kHashTableLoadFactor);
+
+ // Fill in the header.
+ header_->num_slots = num_slots;
+ header_->num_buckets = num_slots / kHashTableLoadFactor;
+ header_->buckets_allocated.store(0, std::memory_order_relaxed);
+ header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+ }
+
+ // Locate the slot array.
+ slots_ = static_cast<std::atomic<std::size_t>*>(aligned_memory_start);
+ aligned_memory_start = static_cast<char*>(aligned_memory_start)
+ + sizeof(std::atomic<std::size_t>) * header_->num_slots;
+ available_memory -= sizeof(std::atomic<std::size_t>) * header_->num_slots;
+
+ if (new_hash_table && !hash_table_memory_zeroed) {
+ std::memset(slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
+ }
+
+ // Locate the buckets.
+ buckets_ = aligned_memory_start;
+ // Extra-paranoid: sizeof(Header) should almost certainly be a multiple of
+ // kBucketAlignment, unless ValueT has some members with seriously big
+ // (> kCacheLineBytes) alignment requirements specified using alignas().
+ if (align(kBucketAlignment,
+ bucket_size_,
+ buckets_,
+ available_memory)
+ == nullptr) {
+ FATAL_ERROR("Attempted to create a non-resizable "
+ << "SeparateChainingHashTable with "
+ << this->hash_table_memory_size_ << " bytes of memory at "
+ << this->hash_table_memory_ << ", which can hold an aligned "
+ << "SeparateChainingHashTable::Header but does not have "
+ << "enough remaining space for even a single hash bucket.");
+ } else if (buckets_ != aligned_memory_start) {
+ DEV_WARNING("Bucket array start position adjusted to meet alignment "
+ "requirement for SeparateChainingHashTable's value type.");
+ if (header_->num_buckets * bucket_size_ > available_memory) {
+ DEBUG_ASSERT(new_hash_table);
+ --(header_->num_buckets);
+ }
+ }
+ available_memory -= bucket_size_ * header_->num_buckets;
+
+ // Make sure "next" pointers in buckets are zeroed-out.
+ if (new_hash_table && !hash_table_memory_zeroed) {
+ std::memset(buckets_, 0x0, header_->num_buckets * bucket_size_);
+ }
+
+ // Locate variable-length key storage region.
+ key_manager_.setVariableLengthStorageInfo(
+ static_cast<char*>(buckets_) + header_->num_buckets * bucket_size_,
+ available_memory,
+ &(header_->variable_length_bytes_allocated));
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::clear() {
+ const std::size_t used_buckets = header_->buckets_allocated.load(std::memory_order_relaxed);
+ // Destroy existing values, if necessary.
+ DestroyValues(buckets_,
+ used_buckets,
+ bucket_size_);
+
+ // Zero-out slot array.
+ std::memset(slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
+
+ // Zero-out used buckets.
+ std::memset(buckets_, 0x0, used_buckets * bucket_size_);
+
+ header_->buckets_allocated.store(0, std::memory_order_relaxed);
+ header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+ key_manager_.zeroNextVariableLengthKeyOffset();
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+const uint8_t* FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::getSingle(const TypedValue &key) const {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ const std::size_t hash_code = key.getHash();
+ std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) && key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+ }
+ bucket_ref = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ }
+
+ // Reached the end of the chain and didn't find a match.
+ return nullptr;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+const uint8_t* FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::getSingleCompositeKey(const std::vector<TypedValue> &key) const {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) && key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+ }
+ bucket_ref = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ }
+
+ // Reached the end of the chain and didn't find a match.
+ return nullptr;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+const uint8_t* FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::getSingleCompositeKey(const std::vector<TypedValue> &key, int index) const {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) && key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return reinterpret_cast<const uint8_t*>(bucket + kValueOffset)+this->payload_offsets_[index];
+ }
+ bucket_ref = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ }
+
+ // Reached the end of the chain and didn't find a match.
+ return nullptr;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::getAll(const TypedValue &key, std::vector<const uint8_t*> *values) const {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ const std::size_t hash_code = key.getHash();
+ std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) && key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ values->push_back(reinterpret_cast<const uint8_t*>(bucket + kValueOffset));
+ if (!allow_duplicate_keys) {
+ return;
+ }
+ }
+ bucket_ref = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::getAllCompositeKey(const std::vector<TypedValue> &key, std::vector<const uint8_t*> *values) const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) && key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ values->push_back(reinterpret_cast<const uint8_t*>(bucket + kValueOffset));
+ if (!allow_duplicate_keys) {
+ return;
+ }
+ }
+ bucket_ref = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+HashTablePutResult
+ FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::putInternal(const TypedValue &key,
+ const std::size_t variable_key_size,
+ const uint8_t &value,
+ HashTablePreallocationState *prealloc_state) {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ if (prealloc_state == nullptr) {
+ // Early check for a free bucket.
+ if (header_->buckets_allocated.load(std::memory_order_relaxed) >= header_->num_buckets) {
+ return HashTablePutResult::kOutOfSpace;
+ }
+
+ // TODO(chasseur): If allow_duplicate_keys is true, avoid storing more than
+ // one copy of the same variable-length key.
+ if (!key_manager_.allocateVariableLengthKeyStorage(variable_key_size)) {
+ // Ran out of variable-length key storage space.
+ return HashTablePutResult::kOutOfSpace;
+ }
+ }
+
+ const std::size_t hash_code = key.getHash();
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ 0,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value,
+ prealloc_state)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets. Deallocate any variable space that we were unable
+ // to use.
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+ return HashTablePutResult::kOutOfSpace;
+ } else {
+ // Hash collision found, and duplicates aren't allowed.
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ if (key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Duplicate key. Deallocate any variable storage space and return.
+ key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+ return HashTablePutResult::kDuplicateKey;
+ }
+ }
+ }
+
+ // Write the key and hash.
+ writeScalarKeyToBucket(key, hash_code, bucket, prealloc_state);
+
+ // Store the value by using placement new with ValueT's copy constructor.
+ new(static_cast<char*>(bucket) + kValueOffset) uint8_t(value);
+
+ // Update the previous chain pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);
+
+ // We're all done.
+ return HashTablePutResult::kOK;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+HashTablePutResult
+ FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::putCompositeKeyInternal(const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size,
+ const uint8_t &value,
+ HashTablePreallocationState *prealloc_state) {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ if (prealloc_state == nullptr) {
+ // Early check for a free bucket.
+ if (header_->buckets_allocated.load(std::memory_order_relaxed) >= header_->num_buckets) {
+ return HashTablePutResult::kOutOfSpace;
+ }
+
+ // TODO(chasseur): If allow_duplicate_keys is true, avoid storing more than
+ // one copy of the same variable-length key.
+ if (!key_manager_.allocateVariableLengthKeyStorage(variable_key_size)) {
+ // Ran out of variable-length key storage space.
+ return HashTablePutResult::kOutOfSpace;
+ }
+ }
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ 0,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value,
+ prealloc_state)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets. Deallocate any variable space that we were unable
+ // to use.
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+ return HashTablePutResult::kOutOfSpace;
+ } else {
+ // Hash collision found, and duplicates aren't allowed.
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Duplicate key. Deallocate any variable storage space and return.
+ key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+ return HashTablePutResult::kDuplicateKey;
+ }
+ }
+ }
+
+ // Write the key and hash.
+ writeCompositeKeyToBucket(key, hash_code, bucket, prealloc_state);
+
+ // Store the value by using placement new with ValueT's copy constructor.
+ new(static_cast<char*>(bucket) + kValueOffset) uint8_t(value);
+
+ // Update the previous chain pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);
+
+ // We're all done.
+ return HashTablePutResult::kOK;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+HashTablePutResult
+ FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::putCompositeKeyInternalFast(const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size,
+ const uint8_t *init_value_ptr,
+ HashTablePreallocationState *prealloc_state) {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ if (prealloc_state == nullptr) {
+ // Early check for a free bucket.
+ if (header_->buckets_allocated.load(std::memory_order_relaxed) >= header_->num_buckets) {
+ return HashTablePutResult::kOutOfSpace;
+ }
+
+ // TODO(chasseur): If allow_duplicate_keys is true, avoid storing more than
+ // one copy of the same variable-length key.
+ if (!key_manager_.allocateVariableLengthKeyStorage(variable_key_size)) {
+ // Ran out of variable-length key storage space.
+ return HashTablePutResult::kOutOfSpace;
+ }
+ }
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ 0,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value,
+ prealloc_state)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets. Deallocate any variable space that we were unable
+ // to use.
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+ return HashTablePutResult::kOutOfSpace;
+ } else {
+ // Hash collision found, and duplicates aren't allowed.
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Duplicate key. Deallocate any variable storage space and return.
+ key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+ return HashTablePutResult::kDuplicateKey;
+ }
+ }
+ }
+
+ // Write the key and hash.
+ writeCompositeKeyToBucket(key, hash_code, bucket, prealloc_state);
+
+ // Store the value by using placement new with ValueT's copy constructor.
+// new(static_cast<char*>(bucket) + kValueOffset) uint8_t(value);
+ uint8_t *value = static_cast<uint8_t*>(bucket) + kValueOffset;
+ memcpy(value, init_value_ptr, this->total_payload_size_);
+ // Update the previous chain pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);
+
+ // We're all done.
+ return HashTablePutResult::kOK;
+}
+
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+uint8_t* FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::upsertInternal(const TypedValue &key,
+ const std::size_t variable_key_size,
+ const uint8_t &initial_value) {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ if (variable_key_size > 0) {
+ // Don't allocate yet, since the key may already be present. However, we
+ // do check if either the allocated variable storage space OR the free
+ // space is big enough to hold the key (at least one must be true: either
+ // the key is already present and allocated, or we need to be able to
+ // allocate enough space for it).
+ std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+ if ((allocated_bytes < variable_key_size)
+ && (allocated_bytes + variable_key_size > key_manager_.getVariableLengthKeyStorageSize())) {
+ return nullptr;
+ }
+ }
+
+ const std::size_t hash_code = key.getHash();
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ variable_key_size,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value,
+ nullptr)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets or variable-key space.
+ return nullptr;
+ } else if (key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Found an already-existing entry for this key.
+ return reinterpret_cast<uint8_t*>(static_cast<char*>(bucket) + kValueOffset);
+ }
+ }
+
+ // We are now writing to an empty bucket.
+ // Write the key and hash.
+ writeScalarKeyToBucket(key, hash_code, bucket, nullptr);
+
+ // Copy the supplied 'initial_value' into place.
+ uint8_t *value = new(static_cast<char*>(bucket) + kValueOffset) uint8_t(initial_value);
+
+ // Update the previous chain pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);
+
+ // Return the value.
+ return value;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+uint8_t* FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::upsertInternalFast(const TypedValue &key,
+ const std::uint8_t *init_value_ptr,
+ const std::size_t variable_key_size) {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ if (variable_key_size > 0) {
+ // Don't allocate yet, since the key may already be present. However, we
+ // do check if either the allocated variable storage space OR the free
+ // space is big enough to hold the key (at least one must be true: either
+ // the key is already present and allocated, or we need to be able to
+ // allocate enough space for it).
+ std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+ if ((allocated_bytes < variable_key_size)
+ && (allocated_bytes + variable_key_size > key_manager_.getVariableLengthKeyStorageSize())) {
+ return nullptr;
+ }
+ }
+
+ const std::size_t hash_code = key.getHash();
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ variable_key_size,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value,
+ nullptr)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets or variable-key space.
+ return nullptr;
+ } else if (key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Found an already-existing entry for this key.
+ return reinterpret_cast<uint8_t*>(static_cast<char*>(bucket) + kValueOffset);
+ }
+ }
+
+ // We are now writing to an empty bucket.
+ // Write the key and hash.
+ writeScalarKeyToBucket(key, hash_code, bucket, nullptr);
+
+ // Copy the supplied 'initial_value' into place.
+// uint8_t *value = new(static_cast<char*>(bucket) + kValueOffset) uint8_t(initial_value);
+
+ uint8_t *value = static_cast<unsigned char*>(bucket) + kValueOffset;
+ if (init_value_ptr == nullptr)
+ memcpy(value, init_payload_, this->total_payload_size_);
+ else
+ memcpy(value, init_value_ptr, this->total_payload_size_);
+
+
+ // Update the previous chain pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);
+
+ // Return the value.
+ return value;
+}
+
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+uint8_t* FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::upsertCompositeKeyInternal(const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size,
+ const uint8_t &initial_value) {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ if (variable_key_size > 0) {
+ // Don't allocate yet, since the key may already be present. However, we
+ // do check if either the allocated variable storage space OR the free
+ // space is big enough to hold the key (at least one must be true: either
+ // the key is already present and allocated, or we need to be able to
+ // allocate enough space for it).
+ std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+ if ((allocated_bytes < variable_key_size)
+ && (allocated_bytes + variable_key_size > key_manager_.getVariableLengthKeyStorageSize())) {
+ return nullptr;
+ }
+ }
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ variable_key_size,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value,
+ nullptr)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets or variable-key space.
+ return nullptr;
+ } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Found an already-existing entry for this key.
+ return reinterpret_cast<uint8_t*>(static_cast<char*>(bucket) + kValueOffset);
+ }
+ }
+
+ // We are now writing to an empty bucket.
+ // Write the key and hash.
+ writeCompositeKeyToBucket(key, hash_code, bucket, nullptr);
+
+ // Copy the supplied 'initial_value' into place.
+ uint8_t *value = new(static_cast<char*>(bucket) + kValueOffset) uint8_t(initial_value);
+
+ // Update the previous chaing pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);
+
+ // Return the value.
+ return value;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+uint8_t* FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::upsertCompositeKeyInternalFast(const std::vector<TypedValue> &key,
+ const std::uint8_t *init_value_ptr,
+ const std::size_t variable_key_size) {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ if (variable_key_size > 0) {
+ // Don't allocate yet, since the key may already be present. However, we
+ // do check if either the allocated variable storage space OR the free
+ // space is big enough to hold the key (at least one must be true: either
+ // the key is already present and allocated, or we need to be able to
+ // allocate enough space for it).
+ std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+ if ((allocated_bytes < variable_key_size)
+ && (allocated_bytes + variable_key_size > key_manager_.getVariableLengthKeyStorageSize())) {
+ return nullptr;
+ }
+ }
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ variable_key_size,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value,
+ nullptr)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets or variable-key space.
+ return nullptr;
+ } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Found an already-existing entry for this key.
+ return reinterpret_cast<uint8_t*>(static_cast<char*>(bucket) + kValueOffset);
+ }
+ }
+
+ // We are now writing to an empty bucket.
+ // Write the key and hash.
+ writeCompositeKeyToBucket(key, hash_code, bucket, nullptr);
+
+// uint8_t *value;
+// value = static_cast<unsigned char*>(bucket) + kValueOffset;
+ uint8_t *value = static_cast<unsigned char*>(bucket) + kValueOffset;
+ if (init_value_ptr == nullptr)
+ memcpy(value, init_payload_, this->total_payload_size_);
+ else
+ memcpy(value, init_value_ptr, this->total_payload_size_);
+
+ // Update the previous chaing pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);
+
+ // Return the value.
+ return value;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::getNextEntry(TypedValue *key, const uint8_t **value, std::size_t *entry_num) const {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+ const char *bucket = static_cast<const char*>(buckets_) + (*entry_num) * bucket_size_;
+ *key = key_manager_.getKeyComponentTyped(bucket, 0);
+ *value = reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+ ++(*entry_num);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::getNextEntryCompositeKey(std::vector<TypedValue> *key,
+ const uint8_t **value,
+ std::size_t *entry_num) const {
+ if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+ const char *bucket = static_cast<const char*>(buckets_) + (*entry_num) * bucket_size_;
+ for (std::vector<const Type*>::size_type key_idx = 0;
+ key_idx < this->key_types_.size();
+ ++key_idx) {
+ key->emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx));
+ }
+ *value = reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+ ++(*entry_num);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::getNextEntryForKey(const TypedValue &key,
+ const std::size_t hash_code,
+ const uint8_t **value,
+ std::size_t *entry_num) const {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ if (*entry_num == 0) {
+ *entry_num = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ } else if (*entry_num == std::numeric_limits<std::size_t>::max()) {
+ return false;
+ }
+
+ while (*entry_num != 0) {
+ DEBUG_ASSERT(*entry_num != std::numeric_limits<std::size_t>::max());
+ const char *bucket = static_cast<const char*>(buckets_) + (*entry_num - 1) * bucket_size_;
+ *entry_num = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) && key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ *value = reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+ if (*entry_num == 0) {
+ // If this is the last bucket in the chain, prevent the next call from
+ // starting over again.
+ *entry_num = std::numeric_limits<std::size_t>::max();
+ }
+ return true;
+ }
+ }
+
+ // Reached the end of the chain.
+ return false;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::getNextEntryForCompositeKey(const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ const uint8_t **value,
+ std::size_t *entry_num) const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ if (*entry_num == 0) {
+ *entry_num = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ } else if (*entry_num == std::numeric_limits<std::size_t>::max()) {
+ return false;
+ }
+
+ while (*entry_num != 0) {
+ DEBUG_ASSERT(*entry_num != std::numeric_limits<std::size_t>::max());
+ const char *bucket = static_cast<const char*>(buckets_) + (*entry_num - 1) * bucket_size_;
+ *entry_num = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) && key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ *value = reinterpret_cast<const uint8_t*>(bucket + kValueOffset);
+ if (*entry_num == 0) {
+ // If this is the last bucket in the chain, prevent the next call from
+ // starting over again.
+ *entry_num = std::numeric_limits<std::size_t>::max();
+ }
+ return true;
+ }
+ }
+
+ // Reached the end of the chain.
+ return false;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::hasKey(const TypedValue &key) const {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ const std::size_t hash_code = key.getHash();
+ std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) && key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Find a match.
+ return true;
+ }
+ bucket_ref = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ }
+ return false;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::hasCompositeKey(const std::vector<TypedValue> &key) const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) && key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Find a match.
+ return true;
+ }
+ bucket_ref = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ }
+ return false;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::resize(const std::size_t extra_buckets,
+ const std::size_t extra_variable_storage,
+ const std::size_t retry_num) {
+ DEBUG_ASSERT(resizable);
+
+ // A retry should never be necessary with this implementation of HashTable.
+ // Separate chaining ensures that any resized hash table with more buckets
+ // than the original table will be able to hold more entries than the
+ // original.
+ DEBUG_ASSERT(retry_num == 0);
+
+ SpinSharedMutexExclusiveLock<true> write_lock(this->resize_shared_mutex_);
+
+ // Recheck whether the hash table is still full. Note that multiple threads
+ // might wait to rebuild this hash table simultaneously. Only the first one
+ // should do the rebuild.
+ if (!isFull(extra_variable_storage)) {
+ return;
+ }
+
+ // Approximately double the number of buckets and slots.
+ //
+ // TODO(chasseur): It may be worth it to more than double the number of
+ // buckets here so that we can maintain a good, sparse fill factor for a
+ // longer time as more values are inserted. Such behavior should take into
+ // account kHashTableLoadFactor.
+ std::size_t resized_num_slots = get_next_prime_number(
+ (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2);
+ std::size_t variable_storage_required
+ = (resized_num_slots / kHashTableLoadFactor) * key_manager_.getEstimatedVariableKeySize();
+ const std::size_t original_variable_storage_used
+ = header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+ // If this resize was triggered by a too-large variable-length key, bump up
+ // the variable-length storage requirement.
+ if ((extra_variable_storage > 0)
+ && (extra_variable_storage + original_variable_storage_used
+ > key_manager_.getVariableLengthKeyStorageSize())) {
+ variable_storage_required += extra_variable_storage;
+ }
+
+ const std::size_t resized_memory_required
+ = sizeof(Header)
+ + resized_num_slots * sizeof(std::atomic<std::size_t>)
+ + (resized_num_slots / kHashTableLoadFactor) * bucket_size_
+ + variable_storage_required;
+ const std::size_t resized_storage_slots
+ = this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
+ if (resized_storage_slots == 0) {
+ FATAL_ERROR("Storage requirement for resized SeparateChainingHashTable "
+ "exceeds maximum allocation size.");
+ }
+
+ // Get a new StorageBlob to hold the resized hash table.
+ const block_id resized_blob_id = this->storage_manager_->createBlob(resized_storage_slots);
+ MutableBlobReference resized_blob = this->storage_manager_->getBlobMutable(resized_blob_id);
+
+ // Locate data structures inside the new StorageBlob.
+ void *aligned_memory_start = resized_blob->getMemoryMutable();
+ std::size_t available_memory = resized_storage_slots * kSlotSizeBytes;
+ if (align(alignof(Header),
+ sizeof(Header),
+ aligned_memory_start,
+ available_memory)
+ == nullptr) {
+ // Should be impossible, as noted in constructor.
+ FATAL_ERROR("StorageBlob used to hold resized SeparateChainingHashTable "
+ "is too small to meet alignment requirements of "
+ "LinearOpenAddressingHashTable::Header.");
+ } else if (aligned_memory_start != resized_blob->getMemoryMutable()) {
+ // Again, should be impossible.
+ DEV_WARNING("In SeparateChainingHashTable::resize(), StorageBlob "
+ << "memory adjusted by "
+ << (resized_num_slots * kSlotSizeBytes - available_memory)
+ << " bytes to meet alignment requirement for "
+ << "LinearOpenAddressingHashTable::Header.");
+ }
+
+ Header *resized_header = static_cast<Header*>(aligned_memory_start);
+ aligned_memory_start = static_cast<char*>(aligned_memory_start) + sizeof(Header);
+ available_memory -= sizeof(Header);
+
+ // As in constructor, recompute the number of slots and buckets using the
+ // actual available memory.
+ std::size_t resized_num_buckets
+ = (available_memory - extra_variable_storage)
+ / (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>)
+ + bucket_size_
+ + key_manager_.getEstimatedVariableKeySize());
+ resized_num_slots = get_previous_prime_number(resized_num_buckets * kHashTableLoadFactor);
+ resized_num_buckets = resized_num_slots / kHashTableLoadFactor;
+
+ // Locate slot array.
+ std::atomic<std::size_t> *resized_slots = static_cast<std::atomic<std::size_t>*>(aligned_memory_start);
+ aligned_memory_start = static_cast<char*>(aligned_memory_start)
+ + sizeof(std::atomic<std::size_t>) * resized_num_slots;
+ available_memory -= sizeof(std::atomic<std::size_t>) * resized_num_slots;
+
+ // As in constructor, we will be extra paranoid and use align() to locate the
+ // start of the array of buckets, as well.
+ void *resized_buckets = aligned_memory_start;
+ if (align(kBucketAlignment,
+ bucket_size_,
+ resized_buckets,
+ available_memory)
+ == nullptr) {
+ FATAL_ERROR("StorageBlob used to hold resized SeparateChainingHashTable "
+ "is too small to meet alignment requirements of buckets.");
+ } else if (resized_buckets != aligned_memory_start) {
+ DEV_WARNING("Bucket array start position adjusted to meet alignment "
+ "requirement for SeparateChainingHashTable's value type.");
+ if (resized_num_buckets * bucket_size_ + variable_storage_required > available_memory) {
+ --resized_num_buckets;
+ }
+ }
+ aligned_memory_start = static_cast<char*>(aligned_memory_start)
+ + resized_num_buckets * bucket_size_;
+ available_memory -= resized_num_buckets * bucket_size_;
+
+ void *resized_variable_length_key_storage = aligned_memory_start;
+ const std::size_t resized_variable_length_key_storage_size = available_memory;
+
+ const std::size_t original_buckets_used = header_->buckets_allocated.load(std::memory_order_relaxed);
+
+ // Initialize the header.
+ resized_header->num_slots = resized_num_slots;
+ resized_header->num_buckets = resized_num_buckets;
+ resized_header->buckets_allocated.store(original_buckets_used, std::memory_order_relaxed);
+ resized_header->variable_length_bytes_allocated.store(
+ original_variable_storage_used,
+ std::memory_order_relaxed);
+
+ // Bulk-copy buckets. This is safe because:
+ // 1. The "next" pointers will be adjusted when rebuilding chains below.
+ // 2. The hash codes will stay the same.
+ // 3. For key components:
+ // a. Inline keys will stay exactly the same.
+ // b. Offsets into variable-length storage will remain valid, because
+ // we also do a byte-for-byte copy of variable-length storage below.
+ // c. Absolute external pointers will still point to the same address.
+ // d. Relative pointers are not used with resizable hash tables.
+ // 4. If values are not trivially copyable, then we invoke ValueT's copy
+ // or move constructor with placement new.
+ std::memcpy(resized_buckets, buckets_, original_buckets_used * bucket_size_);
+
+ // TODO(chasseur): std::is_trivially_copyable is not yet implemented in
+ // GCC 4.8.3, so we assume we need to invoke ValueT's copy or move
+ // constructor, even though the plain memcpy above could suffice for many
+ // possible ValueTs.
+ void *current_value_original = static_cast<char*>(buckets_) + kValueOffset;
+ void *current_value_resized = static_cast<char*>(resized_buckets) + kValueOffset;
+ for (std::size_t bucket_num = 0; bucket_num < original_buckets_used; ++bucket_num) {
+ // Use a move constructor if available to avoid a deep-copy, since resizes
+ // always succeed.
+ new (current_value_resized) uint8_t(std::move(*static_cast<uint8_t*>(current_value_original)));
+ current_value_original = static_cast<char*>(current_value_original) + bucket_size_;
+ current_value_resized = static_cast<char*>(current_value_resized) + bucket_size_;
+ }
+
+ // Copy over variable-length key components, if any.
+ if (original_variable_storage_used > 0) {
+ DEBUG_ASSERT(original_variable_storage_used
+ == key_manager_.getNextVariableLengthKeyOffset());
+ DEBUG_ASSERT(original_variable_storage_used <= resized_variable_length_key_storage_size);
+ std::memcpy(resized_variable_length_key_storage,
+ key_manager_.getVariableLengthKeyStorage(),
+ original_variable_storage_used);
+ }
+
+ // Destroy values in the original hash table, if neccesary,
+ DestroyValues(buckets_,
+ original_buckets_used,
+ bucket_size_);
+
+ // Make resized structures active.
+ std::swap(this->blob_, resized_blob);
+ header_ = resized_header;
+ slots_ = resized_slots;
+ buckets_ = resized_buckets;
+ key_manager_.setVariableLengthStorageInfo(
+ resized_variable_length_key_storage,
+ resized_variable_length_key_storage_size,
+ &(resized_header->variable_length_bytes_allocated));
+
+ // Drop the old blob.
+ const block_id old_blob_id = resized_blob->getID();
+ resized_blob.release();
+ this->storage_manager_->deleteBlockOrBlobFile(old_blob_id);
+
+ // Rebuild chains.
+ void *current_bucket = buckets_;
+ for (std::size_t bucket_num = 0; bucket_num < original_buckets_used; ++bucket_num) {
+ std::atomic<std::size_t> *next_ptr
+ = static_cast<std::atomic<std::size_t>*>(current_bucket);
+ const std::size_t hash_code = *reinterpret_cast<const std::size_t*>(
+ static_cast<const char*>(current_bucket) + sizeof(std::atomic<std::size_t>));
+
+ const std::size_t slot_number = hash_code % header_->num_slots;
+ std::size_t slot_ptr_value = 0;
+ if (slots_[slot_number].compare_exchange_strong(slot_ptr_value,
+ bucket_num + 1,
+ std::memory_order_relaxed)) {
+ // This bucket is the first in the chain for this block, so reset its
+ // next pointer to 0.
+ next_ptr->store(0, std::memory_order_relaxed);
+ } else {
+ // A chain already exists starting from this slot, so put this bucket at
+ // the head.
+ next_ptr->store(slot_ptr_value, std::memory_order_relaxed);
+ slots_[slot_number].store(bucket_num + 1, std::memory_order_relaxed);
+ }
+ current_bucket = static_cast<char*>(current_bucket) + bucket_size_;
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::preallocateForBulkInsert(const std::size_t total_entries,
+ const std::size_t total_variable_key_size,
+ HashTablePreallocationState *prealloc_state) {
+ DEBUG_ASSERT(allow_duplicate_keys);
+ if (!key_manager_.allocateVariableLengthKeyStorage(total_variable_key_size)) {
+ return false;
+ }
+
+ // We use load then compare-exchange here instead of simply fetch-add,
+ // because if multiple threads are simultaneously trying to allocate more
+ // than one bucket and exceed 'header_->num_buckets', their respective
+ // rollbacks might happen in such an order that some bucket ranges get
+ // skipped, while others might get double-allocated later.
+ std::size_t original_buckets_allocated = header_->buckets_allocated.load(std::memory_order_relaxed);
+ std::size_t buckets_post_allocation = original_buckets_allocated + total_entries;
+ while ((buckets_post_allocation <= header_->num_buckets)
+ && !header_->buckets_allocated.compare_exchange_weak(original_buckets_allocated,
+ buckets_post_allocation,
+ std::memory_order_relaxed)) {
+ buckets_post_allocation = original_buckets_allocated + total_entries;
+ }
+
+ if (buckets_post_allocation > header_->num_buckets) {
+ key_manager_.deallocateVariableLengthKeyStorage(total_variable_key_size);
+ return false;
+ }
+
+ prealloc_state->bucket_position = original_buckets_allocated;
+ if (total_variable_key_size != 0) {
+ prealloc_state->variable_length_key_position
+ = key_manager_.incrementNextVariableLengthKeyOffset(total_variable_key_size);
+ }
+ return true;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::DestroyValues(void *hash_buckets,
+ const std::size_t num_buckets,
+ const std::size_t bucket_size) {
+ if (!std::is_trivially_destructible<uint8_t>::value) {
+ void *value_ptr = static_cast<char*>(hash_buckets) + kValueOffset;
+ for (std::size_t bucket_num = 0;
+ bucket_num < num_buckets;
+ ++bucket_num) {
+ static_cast<uint8_t*>(value_ptr)->~uint8_t();
+ value_ptr = static_cast<char*>(value_ptr) + bucket_size;
+ }
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+inline bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::locateBucketForInsertion(const std::size_t hash_code,
+ const std::size_t variable_key_allocation_required,
+ void **bucket,
+ std::atomic<std::size_t> **pending_chain_ptr,
+ std::size_t *pending_chain_ptr_finish_value,
+ HashTablePreallocationState *prealloc_state) {
+ DEBUG_ASSERT((prealloc_state == nullptr) || allow_duplicate_keys);
+ if (*bucket == nullptr) {
+ *pending_chain_ptr = &(slots_[hash_code % header_->num_slots]);
+ } else {
+ *pending_chain_ptr = static_cast<std::atomic<std::size_t>*>(*bucket);
+ }
+ for (;;) {
+ std::size_t existing_chain_ptr = 0;
+ if ((*pending_chain_ptr)->compare_exchange_strong(existing_chain_ptr,
+ std::numeric_limits<std::size_t>::max(),
+ std::memory_order_acq_rel)) {
+ // Got to the end of the chain. Allocate a new bucket.
+
+ // First, allocate variable-length key storage, if needed (i.e. if this
+ // is an upsert and we didn't allocate up-front).
+ if ((prealloc_state == nullptr)
+ && !key_manager_.allocateVariableLengthKeyStorage(variable_key_allocation_required)) {
+ // Ran out of variable-length storage.
+ (*pending_chain_ptr)->store(0, std::memory_order_release);
+ *bucket = nullptr;
+ return false;
+ }
+
+ const std::size_t allocated_bucket_num
+ = (prealloc_state == nullptr)
+ ? header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed)
+ : (prealloc_state->bucket_position)++;
+ if (allocated_bucket_num >= header_->num_buckets) {
+ // Ran out of buckets.
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ header_->buckets_allocated.fetch_sub(1, std::memory_order_relaxed);
+ (*pending_chain_ptr)->store(0, std::memory_order_release);
+ *bucket = nullptr;
+ return false;
+ } else {
+ *bucket = static_cast<char*>(buckets_) + allocated_bucket_num * bucket_size_;
+ *pending_chain_ptr_finish_value = allocated_bucket_num + 1;
+ return true;
+ }
+ }
+ // Spin until the real "next" pointer is available.
+ while (existing_chain_ptr == std::numeric_limits<std::size_t>::max()) {
+ existing_chain_ptr = (*pending_chain_ptr)->load(std::memory_order_acquire);
+ }
+ if (existing_chain_ptr == 0) {
+ // Other thread had to roll back, so try again.
+ continue;
+ }
+ // Chase the next pointer.
+ *bucket = static_cast<char*>(buckets_) + (existing_chain_ptr - 1) * bucket_size_;
+ *pending_chain_ptr = static_cast<std::atomic<std::size_t>*>(*bucket);
+ if (!allow_duplicate_keys) {
+ const std::size_t hash_in_bucket
+ = *reinterpret_cast<const std::size_t*>(static_cast<const char*>(*bucket)
+ + sizeof(std::atomic<std::size_t>));
+ if (hash_in_bucket == hash_code) {
+ return false;
+ }
+ }
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+inline void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::writeScalarKeyToBucket(const TypedValue &key,
+ const std::size_t hash_code,
+ void *bucket,
+ HashTablePreallocationState *prealloc_state) {
+ *reinterpret_cast<std::size_t*>(static_cast<char*>(bucket) + sizeof(std::atomic<std::size_t>))
+ = hash_code;
+ key_manager_.writeKeyComponentToBucket(key, 0, bucket, prealloc_state);
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+inline void FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::writeCompositeKeyToBucket(const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ void *bucket,
+ HashTablePreallocationState *prealloc_state) {
+ DEBUG_ASSERT(key.size() == this->key_types_.size());
+ *reinterpret_cast<std::size_t*>(static_cast<char*>(bucket) + sizeof(std::atomic<std::size_t>))
+ = hash_code;
+ for (std::size_t idx = 0;
+ idx < this->key_types_.size();
+ ++idx) {
+ key_manager_.writeKeyComponentToBucket(key[idx], idx, bucket, prealloc_state);
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::isFull(const std::size_t extra_variable_storage) const {
+ if (header_->buckets_allocated.load(std::memory_order_relaxed) >= header_->num_buckets) {
+ // All buckets are allocated.
+ return true;
+ }
+
+ if (extra_variable_storage > 0) {
+ if (extra_variable_storage
+ + header_->variable_length_bytes_allocated.load(std::memory_order_relaxed)
+ > key_manager_.getVariableLengthKeyStorageSize()) {
+ // Not enough variable-length key storage space.
+ return true;
+ }
+ }
+
+ return false;
+}
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_SEPARATE_CHAINING_HASH_TABLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5e7d09f/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index c37c1ec..ebdf464 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -63,7 +63,7 @@ class HashTableBase {
public:
virtual ~HashTableBase() {
}
-
+virtual size_t get_buckets_allocated() const {return 0;}
protected:
HashTableBase() {
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5e7d09f/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index c16d0f1..909b233 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -25,6 +25,8 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/FastHashTableFactory.hpp"
#include "threading/SpinMutex.hpp"
#include "utility/Macros.hpp"
#include "utility/StringUtil.hpp"
@@ -79,6 +81,19 @@ class HashTablePool {
agg_handle_(DCHECK_NOTNULL(agg_handle)),
storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ HashTablePool(const std::size_t estimated_num_entries,
+ const HashTableImplType hash_table_impl_type,
+ const std::vector<const Type *> &group_by_types,
+ const std::vector<std::size_t> &payload_sizes,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager)
+ : estimated_num_entries_(reduceEstimatedCardinality(estimated_num_entries)),
+ hash_table_impl_type_(hash_table_impl_type),
+ group_by_types_(group_by_types),
+ payload_sizes_(payload_sizes),
+ handles_(handles),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
/**
* @brief Check out a hash table for insertion.
*
@@ -98,6 +113,20 @@ class HashTablePool {
return createNewHashTable();
}
+ AggregationStateHashTableBase* getHashTableFast() {
+ {
+ SpinMutexLock lock(mutex_);
+ if (!hash_tables_.empty()) {
+ std::unique_ptr<AggregationStateHashTableBase> ret_hash_table(
+ std::move(hash_tables_.back()));
+ hash_tables_.pop_back();
+ DCHECK(ret_hash_table != nullptr);
+ return ret_hash_table.release();
+ }
+ }
+ return createNewHashTableFast();
+ }
+
/**
* @brief Return a previously checked out hash table.
*
@@ -132,6 +161,16 @@ class HashTablePool {
storage_manager_);
}
+ AggregationStateHashTableBase* createNewHashTableFast() {
+ return AggregationStateFastHashTableFactory::CreateResizable(
+ hash_table_impl_type_,
+ group_by_types_,
+ estimated_num_entries_,
+ payload_sizes_,
+ handles_,
+ storage_manager_);
+ }
+
inline std::size_t reduceEstimatedCardinality(
const std::size_t original_estimate) const {
if (original_estimate < kEstimateReductionFactor) {
@@ -151,7 +190,10 @@ class HashTablePool {
const std::vector<const Type *> group_by_types_;
+ std::vector<std::size_t> payload_sizes_;
+
AggregationHandle *agg_handle_;
+ const std::vector<AggregationHandle *> handles_;
StorageManager *storage_manager_;
SpinMutex mutex_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5e7d09f/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index fdd438d..b9faa2a 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -38,6 +38,7 @@
#include "storage/CompressedPackedRowStoreTupleStorageSubBlock.hpp"
#include "storage/CountedReference.hpp"
#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
#include "storage/IndexSubBlock.hpp"
#include "storage/InsertDestinationInterface.hpp"
#include "storage/PackedRowStoreTupleStorageSubBlock.hpp"
@@ -494,6 +495,92 @@ void StorageBlock::aggregateGroupBy(
hash_table);
}
+
+void StorageBlock::aggregateGroupByFast(
+ const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const Predicate *predicate,
+ AggregationStateHashTableBase *hash_table,
+ std::unique_ptr<TupleIdSequence> *reuse_matches,
+ std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
+ DCHECK_GT(group_by.size(), 0u)
+ << "Called aggregateGroupBy() with zero GROUP BY expressions";
+
+ SubBlocksReference sub_blocks_ref(*tuple_store_,
+ indices_,
+ indices_consistent_);
+
+ // IDs of 'arguments' as attributes in the ValueAccessor we create below.
+ std::vector<attribute_id> arg_ids;
+ std::vector<std::vector<attribute_id>> argument_ids;
+
+ // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
+ std::vector<attribute_id> key_ids;
+
+ // An intermediate ValueAccessor that stores the materialized 'arguments' for
+ // this aggregate, as well as the GROUP BY expression values.
+ ColumnVectorsValueAccessor temp_result;
+ {
+ std::unique_ptr<ValueAccessor> accessor;
+ if (predicate) {
+ if (!*reuse_matches) {
+ // If there is a filter predicate that hasn't already been evaluated,
+ // evaluate it now and save the results for other aggregates on this
+ // same block.
+ reuse_matches->reset(getMatchesForPredicate(predicate));
+ }
+
+ // Create a filtered ValueAccessor that only iterates over predicate
+ // matches.
+ accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
+ } else {
+ // Create a ValueAccessor that iterates over all tuples in this block
+ accessor.reset(tuple_store_->createValueAccessor());
+ }
+
+ attribute_id attr_id = 0;
+
+ // First, put GROUP BY keys into 'temp_result'.
+ if (reuse_group_by_vectors->empty()) {
+ // Compute GROUP BY values from group_by Scalars, and store them in
+ // reuse_group_by_vectors for reuse by other aggregates on this same
+ // block.
+ reuse_group_by_vectors->reserve(group_by.size());
+ for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ reuse_group_by_vectors->emplace_back(
+ group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
+ temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
+ key_ids.push_back(attr_id++);
+ }
+ } else {
+ // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
+ DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
+ << "Wrong number of reuse_group_by_vectors";
+ for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
+ temp_result.addColumn(reuse_cv.get(), false);
+ key_ids.push_back(attr_id++);
+ }
+ }
+
+ // Compute argument vectors and add them to 'temp_result'.
+ for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
+ arg_ids.clear();
+ for (const std::unique_ptr<const Scalar> &args : argument) {
+ temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
+ arg_ids.push_back(attr_id++);
+ }
+ argument_ids.push_back(arg_ids);
+ }
+ }
+
+ static_cast<AggregationStateFastHashTable *>(hash_table)->upsertValueAccessorCompositeKeyFast(
+ argument_ids,
+ &temp_result,
+ key_ids,
+ true);
+}
+
+
void StorageBlock::aggregateDistinct(
const AggregationHandle &handle,
const std::vector<std::unique_ptr<const Scalar>> &arguments,
@@ -582,7 +669,6 @@ void StorageBlock::aggregateDistinct(
&temp_result, key_ids, distinctify_hash_table);
}
-
// TODO(chasseur): Vectorization for updates.
StorageBlock::UpdateResult StorageBlock::update(
const unordered_map<attribute_id, unique_ptr<const Scalar>> &assignments,