You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/05 04:25:37 UTC
[03/17] incubator-quickstep git commit: Initial commit for
QUICKSTEP-28 and QUICKSTEP-29.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/FastHashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTableFactory.hpp b/storage/FastHashTableFactory.hpp
new file mode 100644
index 0000000..6d0b693
--- /dev/null
+++ b/storage/FastHashTableFactory.hpp
@@ -0,0 +1,257 @@
+/**
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_FAST_HASH_TABLE_FACTORY_HPP_
+#define QUICKSTEP_STORAGE_FAST_HASH_TABLE_FACTORY_HPP_
+
+#include <cstddef>
+#include <string>
+#include <vector>
+
+#include "storage/HashTable.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/HashTableFactory.hpp"
+#include "storage/HashTable.pb.h"
+#include "storage/LinearOpenAddressingHashTable.hpp"
+#include "storage/SeparateChainingHashTable.hpp"
+#include "storage/FastSeparateChainingHashTable.hpp"
+#include "storage/SimpleScalarSeparateChainingHashTable.hpp"
+#include "storage/TupleReference.hpp"
+#include "types/TypeFactory.hpp"
+#include "utility/BloomFilter.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+class Type;
+
+/** \addtogroup Storage
+ * @{
+ */
+
+/**
+ * @brief Templated all-static factory class that makes it easier to
+ * instantiate HashTables with the particular HashTable implementation
+ * chosen at runtime. All template parameters are exactly the same as
+ * those of HashTable.
+ **/
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+class FastHashTableFactory {
+ public:
+ /**
+ * @brief Create a new resizable HashTable, with the type selected by
+ * hash_table_type. Other parameters are forwarded to the HashTable's
+ * constructor.
+ *
+ * @param hash_table_type The specific HashTable implementation that should
+ * be used.
+ * @param key_types A vector of one or more types (>1 indicates a composite
+ * key). Forwarded as-is to the HashTable's constructor.
+ * @param num_entries The estimated number of entries the HashTable will
+ * hold. Forwarded as-is to the HashTable's constructor.
+ * @param payload_sizes The sizes in bytes for the AggregationStates for the
+ * respective AggregationHandles.
+ * @param handles The AggregationHandles used in this HashTable.
+ * @param storage_manager The StorageManager to use (a StorageBlob will be
+ * allocated to hold the HashTable's contents). Forwarded as-is to the
+ * HashTable's constructor.
+ * @return A new resizable HashTable.
+ **/
+ static FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>*
+ CreateResizable(const HashTableImplType hash_table_type,
+ const std::vector<const Type*> &key_types,
+ const std::size_t num_entries,
+ const std::vector<std::size_t> &payload_sizes,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager) {
+ DCHECK(resizable);
+
+ switch (hash_table_type) {
+ case HashTableImplType::kSeparateChaining:
+ return new FastSeparateChainingHashTable<
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>(key_types, num_entries, payload_sizes, handles, storage_manager);
+ default: {
+ LOG(FATAL) << "Unrecognized HashTableImplType in HashTableFactory::createResizable()\n";
+ }
+ }
+ }
+
+ /**
+ * @brief Create a new fixed-sized HashTable, with the type selected by
+ * hash_table_type. Other parameters are forwarded to the HashTables's
+ * constructor.
+ *
+ * @param hash_table_type The specific HashTable implementation that should
+ * be used.
+ * @param key_types A vector of one or more types (>1 indicates a composite
+ * key). Forwarded as-is to the HashTable's constructor.
+ * @param hash_table_memory A pointer to memory to use for the HashTable.
+ * Forwarded as-is to the HashTable's constructor.
+ * @param hash_table_memory_size The size of hash_table_memory in bytes.
+ * Forwarded as-is to the HashTable's constructor.
+ * @param new_hash_table If true, the HashTable is being constructed for the
+ * first time and hash_table_memory will be cleared. If false, reload
+ * a pre-existing HashTable. Forwarded as-is to the HashTable's
+ * constructor.
+ * @param hash_table_memory_zeroed If new_hash_table is true, setting this to
+ * true means that the HashTable will assume that hash_table_memory
+ * has already been zeroed-out (any newly-allocated block or blob
+ * memory from StorageManager is zeroed-out). If false, the HashTable
+ * will explicitly zero-fill its memory as neccessary. This parameter
+ * has no effect when new_hash_table is false. Forwarded as-is to the
+ * HashTable's constructor.
+ * @return A new (or reloaded) fixed-size HashTable.
+ **/
+ static FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>*
+ CreateFixedSize(const HashTableImplType hash_table_type,
+ const std::vector<const Type*> &key_types,
+ void *hash_table_memory,
+ const std::size_t hash_table_memory_size,
+ const bool new_hash_table,
+ const bool hash_table_memory_zeroed) {
+ DCHECK(!resizable);
+
+ switch (hash_table_type) {
+ case HashTableImplType::kSeparateChaining:
+ return new SeparateChainingHashTable<
+ int,
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>(key_types,
+ hash_table_memory,
+ hash_table_memory_size,
+ new_hash_table,
+ hash_table_memory_zeroed);
+ default: {
+ LOG(FATAL) << "Unrecognized HashTableImplType\n";
+ }
+ }
+ }
+
+ /**
+ * @brief Check whether a serialization::HashTable describing a resizable
+ * HashTable is fully-formed and all parts are valid.
+ *
+ * @param proto A serialized Protocol Buffer description of a HashTable,
+ * originally generated by the optimizer.
+ * @return Whether proto is fully-formed and valid.
+ **/
+ static bool ProtoIsValid(const serialization::HashTable &proto) {
+ if (!proto.IsInitialized() ||
+ !serialization::HashTableImplType_IsValid(
+ proto.hash_table_impl_type())) {
+ return false;
+ }
+
+ for (int i = 0; i < proto.key_types_size(); ++i) {
+ if (!TypeFactory::ProtoIsValid(proto.key_types(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @brief Create a new resizable HashTable according to a protobuf
+ * description.
+ *
+ * @param proto A protobuf description of a resizable HashTable.
+ * @param storage_manager The StorageManager to use (a StorageBlob will be
+ * allocated to hold the HashTable's contents).
+ * @param bloom_filters A vector of pointers to bloom filters that may be used
+ * during hash table construction in build/probe phase.
+ * @return A new resizable HashTable with parameters specified by proto.
+ **/
+ static FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>*
+ CreateResizableFromProto(const serialization::HashTable &proto,
+ StorageManager *storage_manager,
+ const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
+ DCHECK(ProtoIsValid(proto))
+ << "Attempted to create HashTable from invalid proto description:\n"
+ << proto.DebugString();
+
+ std::vector<const Type*> key_types;
+ for (int i = 0; i < proto.key_types_size(); ++i) {
+ key_types.emplace_back(&TypeFactory::ReconstructFromProto(proto.key_types(i)));
+ }
+
+ auto hash_table = CreateResizable(HashTableImplTypeFromProto(proto.hash_table_impl_type()),
+ key_types,
+ proto.estimated_num_entries(),
+ storage_manager);
+
+ // TODO(ssaurabh): These lazy initializations can be moved from here and pushed to the
+ // individual implementations of the hash table constructors.
+
+ // Check if there are any build side bloom filter defined on the hash table.
+ if (proto.build_side_bloom_filter_id_size() > 0) {
+ hash_table->enableBuildSideBloomFilter();
+ hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
+ }
+
+ // Check if there are any probe side bloom filters defined on the hash table.
+ if (proto.probe_side_bloom_filters_size() > 0) {
+ hash_table->enableProbeSideBloomFilter();
+ // Add as many probe bloom filters as defined by the proto.
+ for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
+ // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
+ const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
+ hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
+
+ // Add the attribute ids corresponding to this probe bloom filter.
+ std::vector<attribute_id> probe_attribute_ids;
+ for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
+ const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
+ probe_attribute_ids.push_back(probe_attribute_id);
+ }
+ hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
+ }
+ }
+
+ return hash_table;
+ }
+
+ private:
+ // Class is all-static and should not be instantiated.
+ FastHashTableFactory();
+
+ DISALLOW_COPY_AND_ASSIGN(FastHashTableFactory);
+};
+
+/**
+ * @brief Convenient alias that provides a HashTableFactory whose only template
+ * parameter is the aggregate state type.
+ **/
+using AggregationStateFastHashTableFactory
+ = FastHashTableFactory<true, false, true, false>;
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_HASH_TABLE_FACTORY_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/FastSeparateChainingHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastSeparateChainingHashTable.hpp b/storage/FastSeparateChainingHashTable.hpp
new file mode 100644
index 0000000..886a8ca
--- /dev/null
+++ b/storage/FastSeparateChainingHashTable.hpp
@@ -0,0 +1,1750 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_FAST_SEPARATE_CHAINING_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_FAST_SEPARATE_CHAINING_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstring>
+#include <limits>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "storage/FastHashTable.hpp"
+#include "storage/HashTable.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/HashTableKeyManager.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "types/Type.hpp"
+#include "types/TypedValue.hpp"
+#include "utility/Alignment.hpp"
+#include "utility/Macros.hpp"
+#include "utility/PrimeNumber.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ * @{
+ */
+
+/**
+ * @brief A hash table implementation which uses separate chaining for buckets.
+ **/
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+class FastSeparateChainingHashTable
+ : public FastHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys> {
+ public:
+ FastSeparateChainingHashTable(const std::vector<const Type *> &key_types,
+ const std::size_t num_entries,
+ const std::vector<std::size_t> &payload_sizes,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager);
+
+ FastSeparateChainingHashTable(const std::vector<const Type *> &key_types,
+ void *hash_table_memory,
+ const std::size_t hash_table_memory_size,
+ const bool new_hash_table,
+ const bool hash_table_memory_zeroed);
+
+ // Delegating constructors for single scalar keys.
+ FastSeparateChainingHashTable(const Type &key_type,
+ const std::size_t num_entries,
+ StorageManager *storage_manager)
+ : FastSeparateChainingHashTable(std::vector<const Type *>(1, &key_type),
+ num_entries,
+ storage_manager) {}
+
+ FastSeparateChainingHashTable(const Type &key_type,
+ void *hash_table_memory,
+ const std::size_t hash_table_memory_size,
+ const bool new_hash_table,
+ const bool hash_table_memory_zeroed)
+ : FastSeparateChainingHashTable(std::vector<const Type *>(1, &key_type),
+ hash_table_memory,
+ hash_table_memory_size,
+ new_hash_table,
+ hash_table_memory_zeroed) {}
+
+ ~FastSeparateChainingHashTable() override {
+ DestroyValues(buckets_,
+ header_->buckets_allocated.load(std::memory_order_relaxed),
+ bucket_size_);
+ std::free(init_payload_);
+ }
+
+ void clear() override;
+
+ std::size_t numEntries() const override {
+ return header_->buckets_allocated.load(std::memory_order_relaxed);
+ }
+
+ const std::uint8_t* getSingle(const TypedValue &key) const override;
+ const std::uint8_t* getSingleCompositeKey(
+ const std::vector<TypedValue> &key) const override;
+ const std::uint8_t* getSingleCompositeKey(const std::vector<TypedValue> &key,
+ int index) const override;
+
+ void getAll(const TypedValue &key,
+ std::vector<const std::uint8_t *> *values) const override;
+ void getAllCompositeKey(
+ const std::vector<TypedValue> &key,
+ std::vector<const std::uint8_t *> *values) const override;
+
+ protected:
+ HashTablePutResult putInternal(
+ const TypedValue &key,
+ const std::size_t variable_key_size,
+ const std::uint8_t &value,
+ HashTablePreallocationState *prealloc_state) override;
+
+ HashTablePutResult putCompositeKeyInternalFast(
+ const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size,
+ const std::uint8_t *init_value_ptr,
+ HashTablePreallocationState *prealloc_state) override;
+
+ std::uint8_t* upsertInternalFast(const TypedValue &key,
+ const std::size_t variable_key_size,
+ const std::uint8_t *init_value_ptr) override;
+
+ std::uint8_t* upsertCompositeKeyInternalFast(
+ const std::vector<TypedValue> &key,
+ const std::uint8_t *init_value_ptr,
+ const std::size_t variable_key_size) override;
+
+ bool getNextEntry(TypedValue *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const override;
+ bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const override;
+
+ bool getNextEntryForKey(const TypedValue &key,
+ const std::size_t hash_code,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const override;
+ bool getNextEntryForCompositeKey(const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const override;
+
+ bool hasKey(const TypedValue &key) const override;
+ bool hasCompositeKey(const std::vector<TypedValue> &key) const override;
+
+ void resize(const std::size_t extra_buckets,
+ const std::size_t extra_variable_storage,
+ const std::size_t retry_num = 0) override;
+
+ bool preallocateForBulkInsert(
+ const std::size_t total_entries,
+ const std::size_t total_variable_key_size,
+ HashTablePreallocationState *prealloc_state) override;
+
+ private:
+ struct Header {
+ std::size_t num_slots;
+ std::size_t num_buckets;
+ alignas(kCacheLineBytes) std::atomic<std::size_t> buckets_allocated;
+ alignas(kCacheLineBytes)
+ std::atomic<std::size_t> variable_length_bytes_allocated;
+ };
+
+ std::uint8_t *init_payload_;
+ std::size_t kBucketAlignment;
+
+ // Value's offset in a bucket is the first alignof(ValueT) boundary after the
+ // next pointer and hash code.
+ std::size_t kValueOffset;
+
+ // Round bucket size up to a multiple of kBucketAlignment.
+ constexpr std::size_t ComputeBucketSize(const std::size_t fixed_key_size) {
+ return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) /
+ kBucketAlignment) +
+ 1) *
+ kBucketAlignment;
+ }
+ // If ValueT is not trivially destructible, invoke its destructor for all
+ // values held in the specified buckets (including those in "empty" buckets
+ // that were default constructed). If ValueT is trivially destructible, this
+ // is a no-op.
+ void DestroyValues(void *buckets,
+ const std::size_t num_buckets,
+ const std::size_t bucket_size);
+
+ // Attempt to find an empty bucket to insert 'hash_code' into, starting after
+ // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
+ // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
+ // empty bucket is found. Returns false if 'allow_duplicate_keys' is false
+ // and a hash collision is found (caller should then check whether there is a
+ // genuine key collision or the hash collision is spurious). Returns false
+ // and sets '*bucket' to NULL if there are no more empty buckets in the hash
+ // table. If 'variable_key_allocation_required' is nonzero, this method will
+ // attempt to allocate storage for a variable-length key BEFORE allocating a
+ // bucket, so that no bucket number below 'header_->num_buckets' is ever
+ // deallocated after being allocated.
+ inline bool locateBucketForInsertion(
+ const std::size_t hash_code,
+ const std::size_t variable_key_allocation_required,
+ void **bucket,
+ std::atomic<std::size_t> **pending_chain_ptr,
+ std::size_t *pending_chain_ptr_finish_value,
+ HashTablePreallocationState *prealloc_state);
+
+ // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was
+ // found by locateBucketForInsertion(). Assumes that storage for a
+ // variable-length key copy (if any) was already allocated by a successful
+ // call to allocateVariableLengthKeyStorage().
+ inline void writeScalarKeyToBucket(
+ const TypedValue &key,
+ const std::size_t hash_code,
+ void *bucket,
+ HashTablePreallocationState *prealloc_state);
+
+ // Write a composite 'key' and its 'hash_code' into the '*bucket', which was
+ // found by locateBucketForInsertion(). Assumes that storage for
+ // variable-length key copies (if any) was already allocated by a successful
+ // call to allocateVariableLengthKeyStorage().
+ inline void writeCompositeKeyToBucket(
+ const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ void *bucket,
+ HashTablePreallocationState *prealloc_state);
+
+ // Determine whether it is actually necessary to resize this hash table.
+ // Checks that there is at least one unallocated bucket, and that there is
+ // at least 'extra_variable_storage' bytes of variable-length storage free.
+ bool isFull(const std::size_t extra_variable_storage) const;
+
+ // Helper object to manage key storage.
+ HashTableKeyManager<serializable, force_key_copy> key_manager_;
+
+ // In-memory structure is as follows:
+ // - SeparateChainingHashTable::Header
+ // - Array of slots, interpreted as follows:
+ // - 0 = Points to nothing (empty)
+ // - SIZE_T_MAX = Pending (some thread is starting a chain from this
+ // slot and will overwrite it soon)
+ // - Anything else = The number of the first bucket in the chain for
+ // this slot PLUS ONE (i.e. subtract one to get the actual bucket
+ // number).
+ // - Array of buckets, each of which is:
+ // - atomic size_t "next" pointer, interpreted the same as slots above.
+ // - size_t hash value
+ // - possibly some unused bytes as needed so that ValueT's alignment
+ // requirement is met
+ // - ValueT value slot
+ // - fixed-length key storage (which may include pointers to external
+ // memory or offsets of variable length keys stored within this hash
+ // table)
+ // - possibly some additional unused bytes so that bucket size is a
+ // multiple of both alignof(std::atomic<std::size_t>) and
+ // alignof(ValueT)
+ // - Variable-length key storage region (referenced by offsets stored in
+ // fixed-length keys).
+ Header *header_;
+
+ std::atomic<std::size_t> *slots_;
+ void *buckets_;
+ const std::size_t bucket_size_;
+
+ DISALLOW_COPY_AND_ASSIGN(FastSeparateChainingHashTable);
+};
+
+/** @} */
+
+// ----------------------------------------------------------------------------
+// Implementations of template class methods follow.
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ FastSeparateChainingHashTable(
+ const std::vector<const Type *> &key_types,
+ const std::size_t num_entries,
+ const std::vector<std::size_t> &payload_sizes,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager)
+ : FastHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>(key_types,
+ num_entries,
+ handles,
+ payload_sizes,
+ storage_manager,
+ false,
+ false,
+ true),
+ kBucketAlignment(alignof(std::atomic<std::size_t>)),
+ kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
+ key_manager_(this->key_types_, kValueOffset + this->total_payload_size_),
+ bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
+ init_payload_ =
+ static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
+ int k = 0;
+ for (auto handle : handles) {
+ handle->initPayload(init_payload_ + this->payload_offsets_[k]);
+ k++;
+ }
+ // Bucket size always rounds up to the alignment requirement of the atomic
+ // size_t "next" pointer at the front or a ValueT, whichever is larger.
+ //
+ // Give base HashTable information about what key components are stored
+ // inline from 'key_manager_'.
+ this->setKeyInline(key_manager_.getKeyInline());
+
+ // Pick out a prime number of slots and calculate storage requirements.
+ std::size_t num_slots_tmp =
+ get_next_prime_number(num_entries * kHashTableLoadFactor);
+ std::size_t required_memory =
+ sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
+ (num_slots_tmp / kHashTableLoadFactor) *
+ (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
+ std::size_t num_storage_slots =
+ this->storage_manager_->SlotsNeededForBytes(required_memory);
+ if (num_storage_slots == 0) {
+ FATAL_ERROR(
+ "Storage requirement for SeparateChainingHashTable "
+ "exceeds maximum allocation size.");
+ }
+
+ // Get a StorageBlob to hold the hash table.
+ const block_id blob_id =
+ this->storage_manager_->createBlob(num_storage_slots);
+ this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
+
+ void *aligned_memory_start = this->blob_->getMemoryMutable();
+ std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
+ if (align(alignof(Header),
+ sizeof(Header),
+ aligned_memory_start,
+ available_memory) == nullptr) {
+ // With current values from StorageConstants.hpp, this should be
+ // impossible. A blob is at least 1 MB, while a Header has alignment
+ // requirement of just kCacheLineBytes (64 bytes).
+ FATAL_ERROR(
+ "StorageBlob used to hold resizable "
+ "SeparateChainingHashTable is too small to meet alignment "
+ "requirements of SeparateChainingHashTable::Header.");
+ } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
+ // This should also be impossible, since the StorageManager allocates slots
+ // aligned to kCacheLineBytes.
+ DEV_WARNING("StorageBlob memory adjusted by "
+ << (num_storage_slots * kSlotSizeBytes - available_memory)
+ << " bytes to meet alignment requirement for "
+ << "SeparateChainingHashTable::Header.");
+ }
+
+ // Locate the header.
+ header_ = static_cast<Header *>(aligned_memory_start);
+ aligned_memory_start =
+ static_cast<char *>(aligned_memory_start) + sizeof(Header);
+ available_memory -= sizeof(Header);
+
+ // Recompute the number of slots & buckets using the actual available memory.
+ // Most likely, we got some extra free bucket space due to "rounding up" to
+ // the storage blob's size. It's also possible (though very unlikely) that we
+ // will wind up with fewer buckets than we initially wanted because of screwy
+ // alignment requirements for ValueT.
+ std::size_t num_buckets_tmp =
+ available_memory /
+ (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
+ key_manager_.getEstimatedVariableKeySize());
+ num_slots_tmp =
+ get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor);
+ num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
+ DEBUG_ASSERT(num_slots_tmp > 0);
+ DEBUG_ASSERT(num_buckets_tmp > 0);
+
+ // Locate the slot array.
+ slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
+ aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+ sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+ available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+
+ // Locate the buckets.
+ buckets_ = aligned_memory_start;
+ // Extra-paranoid: If ValueT has an alignment requirement greater than that
+ // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
+ // array.
+ if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) ==
+ nullptr) {
+ FATAL_ERROR(
+ "StorageBlob used to hold resizable "
+ "SeparateChainingHashTable is too small to meet "
+ "alignment requirements of buckets.");
+ } else if (buckets_ != aligned_memory_start) {
+ DEV_WARNING(
+ "Bucket array start position adjusted to meet alignment "
+ "requirement for SeparateChainingHashTable's value type.");
+ if (num_buckets_tmp * bucket_size_ > available_memory) {
+ --num_buckets_tmp;
+ }
+ }
+
+ // Fill in the header.
+ header_->num_slots = num_slots_tmp;
+ header_->num_buckets = num_buckets_tmp;
+ header_->buckets_allocated.store(0, std::memory_order_relaxed);
+ header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+ available_memory -= bucket_size_ * (header_->num_buckets);
+
+ // Locate variable-length key storage region, and give it all the remaining
+ // bytes in the blob.
+ key_manager_.setVariableLengthStorageInfo(
+ static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_,
+ available_memory,
+ &(header_->variable_length_bytes_allocated));
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ FastSeparateChainingHashTable(const std::vector<const Type *> &key_types,
+ void *hash_table_memory,
+ const std::size_t hash_table_memory_size,
+ const bool new_hash_table,
+ const bool hash_table_memory_zeroed)
+ : FastHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>(key_types,
+ hash_table_memory,
+ hash_table_memory_size,
+ new_hash_table,
+ hash_table_memory_zeroed,
+ false,
+ false,
+ true),
+ kBucketAlignment(alignof(std::atomic<std::size_t>) < alignof(std::uint8_t)
+ ? alignof(std::uint8_t)
+ : alignof(std::atomic<std::size_t>)),
+ kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
+ key_manager_(this->key_types_, kValueOffset + sizeof(std::uint8_t)),
+ bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
+ // Bucket size always rounds up to the alignment requirement of the atomic
+ // size_t "next" pointer at the front or a ValueT, whichever is larger.
+ //
+ // Make sure that the larger of the two alignment requirements also satisfies
+ // the smaller.
+ static_assert(
+ alignof(std::atomic<std::size_t>) < alignof(std::uint8_t)
+ ? alignof(std::uint8_t) % alignof(std::atomic<std::size_t>) == 0
+ : alignof(std::atomic<std::size_t>) % alignof(std::uint8_t) == 0,
+ "Alignment requirement of std::atomic<std::size_t> does not "
+ "evenly divide with alignment requirement of ValueT.");
+
+ // Give base HashTable information about what key components are stored
+ // inline from 'key_manager_'.
+ this->setKeyInline(key_manager_.getKeyInline());
+
+ // FIXME(chasseur): If we are reconstituting a HashTable using a block of
+ // memory whose start was aligned differently than the memory block that was
+ // originally used (modulo alignof(Header)), we could wind up with all of our
+ // data structures misaligned. If memory is inside a
+ // StorageBlock/StorageBlob, this will never occur, since the StorageManager
+ // always allocates slots aligned to kCacheLineBytes. Similarly, this isn't
+ // a problem for memory inside any other allocation aligned to at least
+ // alignof(Header) == kCacheLineBytes.
+
+ void *aligned_memory_start = this->hash_table_memory_;
+ std::size_t available_memory = this->hash_table_memory_size_;
+
+ if (align(alignof(Header),
+ sizeof(Header),
+ aligned_memory_start,
+ available_memory) == nullptr) {
+ FATAL_ERROR("Attempted to create a non-resizable "
+ << "SeparateChainingHashTable with "
+ << available_memory
+ << " bytes of memory at "
+ << aligned_memory_start
+ << " which either can not fit a "
+ << "SeparateChainingHashTable::Header or meet its alignement "
+ << "requirement.");
+ } else if (aligned_memory_start != this->hash_table_memory_) {
+ // In general, we could get memory of any alignment, although at least
+ // cache-line aligned would be nice.
+ DEV_WARNING("StorageBlob memory adjusted by "
+ << (this->hash_table_memory_size_ - available_memory)
+ << " bytes to meet alignment requirement for "
+ << "SeparateChainingHashTable::Header.");
+ }
+
+ header_ = static_cast<Header *>(aligned_memory_start);
+ aligned_memory_start =
+ static_cast<char *>(aligned_memory_start) + sizeof(Header);
+ available_memory -= sizeof(Header);
+
+ if (new_hash_table) {
+ std::size_t estimated_bucket_capacity =
+ available_memory /
+ (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) +
+ bucket_size_ + key_manager_.getEstimatedVariableKeySize());
+ std::size_t num_slots = get_previous_prime_number(
+ estimated_bucket_capacity * kHashTableLoadFactor);
+
+ // Fill in the header.
+ header_->num_slots = num_slots;
+ header_->num_buckets = num_slots / kHashTableLoadFactor;
+ header_->buckets_allocated.store(0, std::memory_order_relaxed);
+ header_->variable_length_bytes_allocated.store(0,
+ std::memory_order_relaxed);
+ }
+
+ // Locate the slot array.
+ slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
+ aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+ sizeof(std::atomic<std::size_t>) * header_->num_slots;
+ available_memory -= sizeof(std::atomic<std::size_t>) * header_->num_slots;
+
+ if (new_hash_table && !hash_table_memory_zeroed) {
+ std::memset(
+ slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
+ }
+
+ // Locate the buckets.
+ buckets_ = aligned_memory_start;
+ // Extra-paranoid: sizeof(Header) should almost certainly be a multiple of
+ // kBucketAlignment, unless ValueT has some members with seriously big
+ // (> kCacheLineBytes) alignment requirements specified using alignas().
+ if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) ==
+ nullptr) {
+ FATAL_ERROR("Attempted to create a non-resizable "
+ << "SeparateChainingHashTable with "
+ << this->hash_table_memory_size_
+ << " bytes of memory at "
+ << this->hash_table_memory_
+ << ", which can hold an aligned "
+ << "SeparateChainingHashTable::Header but does not have "
+ << "enough remaining space for even a single hash bucket.");
+ } else if (buckets_ != aligned_memory_start) {
+ DEV_WARNING(
+ "Bucket array start position adjusted to meet alignment "
+ "requirement for SeparateChainingHashTable's value type.");
+ if (header_->num_buckets * bucket_size_ > available_memory) {
+ DEBUG_ASSERT(new_hash_table);
+ --(header_->num_buckets);
+ }
+ }
+ available_memory -= bucket_size_ * header_->num_buckets;
+
+ // Make sure "next" pointers in buckets are zeroed-out.
+ if (new_hash_table && !hash_table_memory_zeroed) {
+ std::memset(buckets_, 0x0, header_->num_buckets * bucket_size_);
+ }
+
+ // Locate variable-length key storage region.
+ key_manager_.setVariableLengthStorageInfo(
+ static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_,
+ available_memory,
+ &(header_->variable_length_bytes_allocated));
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::clear() {
+ const std::size_t used_buckets =
+ header_->buckets_allocated.load(std::memory_order_relaxed);
+ // Destroy existing values, if necessary.
+ DestroyValues(buckets_, used_buckets, bucket_size_);
+
+ // Zero-out slot array.
+ std::memset(
+ slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
+
+ // Zero-out used buckets.
+ std::memset(buckets_, 0x0, used_buckets * bucket_size_);
+
+ header_->buckets_allocated.store(0, std::memory_order_relaxed);
+ header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+ key_manager_.zeroNextVariableLengthKeyOffset();
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+const std::uint8_t* FastSeparateChainingHashTable<
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::getSingle(const TypedValue &key) const {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(
+ key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ const std::size_t hash_code = key.getHash();
+ std::size_t bucket_ref =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ }
+ bucket_ref =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ }
+
+ // Reached the end of the chain and didn't find a match.
+ return nullptr;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+const std::uint8_t* FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ getSingleCompositeKey(const std::vector<TypedValue> &key) const {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ }
+ bucket_ref =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ }
+
+ // Reached the end of the chain and didn't find a match.
+ return nullptr;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+const std::uint8_t* FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ getSingleCompositeKey(const std::vector<TypedValue> &key, int index) const {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset) +
+ this->payload_offsets_[index];
+ }
+ bucket_ref =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ }
+
+ // Reached the end of the chain and didn't find a match.
+ return nullptr;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::getAll(const TypedValue &key,
+ std::vector<const std::uint8_t *> *values)
+ const {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(
+ key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ const std::size_t hash_code = key.getHash();
+ std::size_t bucket_ref =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ values->push_back(
+ reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset));
+ if (!allow_duplicate_keys) {
+ return;
+ }
+ }
+ bucket_ref =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ getAllCompositeKey(const std::vector<TypedValue> &key,
+ std::vector<const std::uint8_t *> *values) const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ values->push_back(
+ reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset));
+ if (!allow_duplicate_keys) {
+ return;
+ }
+ }
+ bucket_ref =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+HashTablePutResult FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ putInternal(const TypedValue &key,
+ const std::size_t variable_key_size,
+ const std::uint8_t &value,
+ HashTablePreallocationState *prealloc_state) {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(
+ key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ if (prealloc_state == nullptr) {
+ // Early check for a free bucket.
+ if (header_->buckets_allocated.load(std::memory_order_relaxed) >=
+ header_->num_buckets) {
+ return HashTablePutResult::kOutOfSpace;
+ }
+
+ // TODO(chasseur): If allow_duplicate_keys is true, avoid storing more than
+ // one copy of the same variable-length key.
+ if (!key_manager_.allocateVariableLengthKeyStorage(variable_key_size)) {
+ // Ran out of variable-length key storage space.
+ return HashTablePutResult::kOutOfSpace;
+ }
+ }
+
+ const std::size_t hash_code = key.getHash();
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ 0,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value,
+ prealloc_state)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets. Deallocate any variable space that we were unable
+ // to use.
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+ return HashTablePutResult::kOutOfSpace;
+ } else {
+ // Hash collision found, and duplicates aren't allowed.
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ if (key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Duplicate key. Deallocate any variable storage space and return.
+ key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+ return HashTablePutResult::kDuplicateKey;
+ }
+ }
+ }
+
+ // Write the key and hash.
+ writeScalarKeyToBucket(key, hash_code, bucket, prealloc_state);
+
+ // Store the value by using placement new with ValueT's copy constructor.
+ new (static_cast<char *>(bucket) + kValueOffset) std::uint8_t(value);
+
+ // Update the previous chain pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value,
+ std::memory_order_release);
+
+ // We're all done.
+ return HashTablePutResult::kOK;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+HashTablePutResult FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ putCompositeKeyInternalFast(const std::vector<TypedValue> &key,
+ const std::size_t variable_key_size,
+ const std::uint8_t *init_value_ptr,
+ HashTablePreallocationState *prealloc_state) {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ if (prealloc_state == nullptr) {
+ // Early check for a free bucket.
+ if (header_->buckets_allocated.load(std::memory_order_relaxed) >=
+ header_->num_buckets) {
+ return HashTablePutResult::kOutOfSpace;
+ }
+
+ // TODO(chasseur): If allow_duplicate_keys is true, avoid storing more than
+ // one copy of the same variable-length key.
+ if (!key_manager_.allocateVariableLengthKeyStorage(variable_key_size)) {
+ // Ran out of variable-length key storage space.
+ return HashTablePutResult::kOutOfSpace;
+ }
+ }
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ 0,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value,
+ prealloc_state)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets. Deallocate any variable space that we were unable
+ // to use.
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+ return HashTablePutResult::kOutOfSpace;
+ } else {
+ // Hash collision found, and duplicates aren't allowed.
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Duplicate key. Deallocate any variable storage space and return.
+ key_manager_.deallocateVariableLengthKeyStorage(variable_key_size);
+ return HashTablePutResult::kDuplicateKey;
+ }
+ }
+ }
+
+ // Write the key and hash.
+ writeCompositeKeyToBucket(key, hash_code, bucket, prealloc_state);
+
+ std::uint8_t *value = static_cast<std::uint8_t *>(bucket) + kValueOffset;
+ memcpy(value, init_value_ptr, this->total_payload_size_);
+ // Update the previous chain pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value,
+ std::memory_order_release);
+
+ // We're all done.
+ return HashTablePutResult::kOK;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+std::uint8_t* FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ upsertInternalFast(const TypedValue &key,
+ const std::size_t variable_key_size,
+ const std::uint8_t *init_value_ptr) {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(
+ key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ if (variable_key_size > 0) {
+ // Don't allocate yet, since the key may already be present. However, we
+ // do check if either the allocated variable storage space OR the free
+ // space is big enough to hold the key (at least one must be true: either
+ // the key is already present and allocated, or we need to be able to
+ // allocate enough space for it).
+ std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(
+ std::memory_order_relaxed);
+ if ((allocated_bytes < variable_key_size) &&
+ (allocated_bytes + variable_key_size >
+ key_manager_.getVariableLengthKeyStorageSize())) {
+ return nullptr;
+ }
+ }
+
+ const std::size_t hash_code = key.getHash();
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ variable_key_size,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value,
+ nullptr)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets or variable-key space.
+ return nullptr;
+ } else if (key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Found an already-existing entry for this key.
+ return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) +
+ kValueOffset);
+ }
+ }
+
+ // We are now writing to an empty bucket.
+ // Write the key and hash.
+ writeScalarKeyToBucket(key, hash_code, bucket, nullptr);
+
+ // Copy the supplied 'initial_value' into place.
+ std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
+ if (init_value_ptr == nullptr)
+ memcpy(value, init_payload_, this->total_payload_size_);
+ else
+ memcpy(value, init_value_ptr, this->total_payload_size_);
+
+ // Update the previous chain pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value,
+ std::memory_order_release);
+
+ // Return the value.
+ return value;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+std::uint8_t* FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ upsertCompositeKeyInternalFast(const std::vector<TypedValue> &key,
+ const std::uint8_t *init_value_ptr,
+ const std::size_t variable_key_size) {
+ DEBUG_ASSERT(!allow_duplicate_keys);
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ if (variable_key_size > 0) {
+ // Don't allocate yet, since the key may already be present. However, we
+ // do check if either the allocated variable storage space OR the free
+ // space is big enough to hold the key (at least one must be true: either
+ // the key is already present and allocated, or we need to be able to
+ // allocate enough space for it).
+ std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(
+ std::memory_order_relaxed);
+ if ((allocated_bytes < variable_key_size) &&
+ (allocated_bytes + variable_key_size >
+ key_manager_.getVariableLengthKeyStorageSize())) {
+ return nullptr;
+ }
+ }
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ variable_key_size,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value,
+ nullptr)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets or variable-key space.
+ return nullptr;
+ } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Found an already-existing entry for this key.
+ return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) +
+ kValueOffset);
+ }
+ }
+
+ // We are now writing to an empty bucket.
+ // Write the key and hash.
+ writeCompositeKeyToBucket(key, hash_code, bucket, nullptr);
+
+ std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
+ if (init_value_ptr == nullptr) {
+ memcpy(value, init_payload_, this->total_payload_size_);
+ } else {
+ memcpy(value, init_value_ptr, this->total_payload_size_);
+ }
+
+ // Update the previous chaing pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value,
+ std::memory_order_release);
+
+ // Return the value.
+ return value;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::getNextEntry(TypedValue *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
+ *key = key_manager_.getKeyComponentTyped(bucket, 0);
+ *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ ++(*entry_num);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ getNextEntryCompositeKey(std::vector<TypedValue> *key,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const {
+ if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
+ for (std::vector<const Type *>::size_type key_idx = 0;
+ key_idx < this->key_types_.size();
+ ++key_idx) {
+ key->emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx));
+ }
+ *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ ++(*entry_num);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::getNextEntryForKey(const TypedValue &key,
+ const std::size_t hash_code,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(
+ key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ if (*entry_num == 0) {
+ *entry_num =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ } else if (*entry_num == std::numeric_limits<std::size_t>::max()) {
+ return false;
+ }
+
+ while (*entry_num != 0) {
+ DEBUG_ASSERT(*entry_num != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (*entry_num - 1) * bucket_size_;
+ *entry_num =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ if (*entry_num == 0) {
+ // If this is the last bucket in the chain, prevent the next call from
+ // starting over again.
+ *entry_num = std::numeric_limits<std::size_t>::max();
+ }
+ return true;
+ }
+ }
+
+ // Reached the end of the chain.
+ return false;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ getNextEntryForCompositeKey(const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ const std::uint8_t **value,
+ std::size_t *entry_num) const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ if (*entry_num == 0) {
+ *entry_num =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ } else if (*entry_num == std::numeric_limits<std::size_t>::max()) {
+ return false;
+ }
+
+ while (*entry_num != 0) {
+ DEBUG_ASSERT(*entry_num != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (*entry_num - 1) * bucket_size_;
+ *entry_num =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+ if (*entry_num == 0) {
+ // If this is the last bucket in the chain, prevent the next call from
+ // starting over again.
+ *entry_num = std::numeric_limits<std::size_t>::max();
+ }
+ return true;
+ }
+ }
+
+ // Reached the end of the chain.
+ return false;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::hasKey(const TypedValue &key) const {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(
+ key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ const std::size_t hash_code = key.getHash();
+ std::size_t bucket_ref =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Find a match.
+ return true;
+ }
+ bucket_ref =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ }
+ return false;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::hasCompositeKey(const std::vector<TypedValue> &key)
+ const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref =
+ slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket =
+ static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) &&
+ key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Find a match.
+ return true;
+ }
+ bucket_ref =
+ reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+ std::memory_order_relaxed);
+ }
+ return false;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::resize(const std::size_t extra_buckets,
+ const std::size_t extra_variable_storage,
+ const std::size_t retry_num) {
+ DEBUG_ASSERT(resizable);
+
+ // A retry should never be necessary with this implementation of HashTable.
+ // Separate chaining ensures that any resized hash table with more buckets
+ // than the original table will be able to hold more entries than the
+ // original.
+ DEBUG_ASSERT(retry_num == 0);
+
+ SpinSharedMutexExclusiveLock<true> write_lock(this->resize_shared_mutex_);
+
+ // Recheck whether the hash table is still full. Note that multiple threads
+ // might wait to rebuild this hash table simultaneously. Only the first one
+ // should do the rebuild.
+ if (!isFull(extra_variable_storage)) {
+ return;
+ }
+
+ // Approximately double the number of buckets and slots.
+ //
+ // TODO(chasseur): It may be worth it to more than double the number of
+ // buckets here so that we can maintain a good, sparse fill factor for a
+ // longer time as more values are inserted. Such behavior should take into
+ // account kHashTableLoadFactor.
+ std::size_t resized_num_slots = get_next_prime_number(
+ (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2);
+ std::size_t variable_storage_required =
+ (resized_num_slots / kHashTableLoadFactor) *
+ key_manager_.getEstimatedVariableKeySize();
+ const std::size_t original_variable_storage_used =
+ header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+ // If this resize was triggered by a too-large variable-length key, bump up
+ // the variable-length storage requirement.
+ if ((extra_variable_storage > 0) &&
+ (extra_variable_storage + original_variable_storage_used >
+ key_manager_.getVariableLengthKeyStorageSize())) {
+ variable_storage_required += extra_variable_storage;
+ }
+
+ const std::size_t resized_memory_required =
+ sizeof(Header) + resized_num_slots * sizeof(std::atomic<std::size_t>) +
+ (resized_num_slots / kHashTableLoadFactor) * bucket_size_ +
+ variable_storage_required;
+ const std::size_t resized_storage_slots =
+ this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
+ if (resized_storage_slots == 0) {
+ FATAL_ERROR(
+ "Storage requirement for resized SeparateChainingHashTable "
+ "exceeds maximum allocation size.");
+ }
+
+ // Get a new StorageBlob to hold the resized hash table.
+ const block_id resized_blob_id =
+ this->storage_manager_->createBlob(resized_storage_slots);
+ MutableBlobReference resized_blob =
+ this->storage_manager_->getBlobMutable(resized_blob_id);
+
+ // Locate data structures inside the new StorageBlob.
+ void *aligned_memory_start = resized_blob->getMemoryMutable();
+ std::size_t available_memory = resized_storage_slots * kSlotSizeBytes;
+ if (align(alignof(Header),
+ sizeof(Header),
+ aligned_memory_start,
+ available_memory) == nullptr) {
+ // Should be impossible, as noted in constructor.
+ FATAL_ERROR(
+ "StorageBlob used to hold resized SeparateChainingHashTable "
+ "is too small to meet alignment requirements of "
+ "LinearOpenAddressingHashTable::Header.");
+ } else if (aligned_memory_start != resized_blob->getMemoryMutable()) {
+ // Again, should be impossible.
+ DEV_WARNING("In SeparateChainingHashTable::resize(), StorageBlob "
+ << "memory adjusted by "
+ << (resized_num_slots * kSlotSizeBytes - available_memory)
+ << " bytes to meet alignment requirement for "
+ << "LinearOpenAddressingHashTable::Header.");
+ }
+
+ Header *resized_header = static_cast<Header *>(aligned_memory_start);
+ aligned_memory_start =
+ static_cast<char *>(aligned_memory_start) + sizeof(Header);
+ available_memory -= sizeof(Header);
+
+ // As in constructor, recompute the number of slots and buckets using the
+ // actual available memory.
+ std::size_t resized_num_buckets =
+ (available_memory - extra_variable_storage) /
+ (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
+ key_manager_.getEstimatedVariableKeySize());
+ resized_num_slots =
+ get_previous_prime_number(resized_num_buckets * kHashTableLoadFactor);
+ resized_num_buckets = resized_num_slots / kHashTableLoadFactor;
+
+ // Locate slot array.
+ std::atomic<std::size_t> *resized_slots =
+ static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
+ aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+ sizeof(std::atomic<std::size_t>) * resized_num_slots;
+ available_memory -= sizeof(std::atomic<std::size_t>) * resized_num_slots;
+
+ // As in constructor, we will be extra paranoid and use align() to locate the
+ // start of the array of buckets, as well.
+ void *resized_buckets = aligned_memory_start;
+ if (align(
+ kBucketAlignment, bucket_size_, resized_buckets, available_memory) ==
+ nullptr) {
+ FATAL_ERROR(
+ "StorageBlob used to hold resized SeparateChainingHashTable "
+ "is too small to meet alignment requirements of buckets.");
+ } else if (resized_buckets != aligned_memory_start) {
+ DEV_WARNING(
+ "Bucket array start position adjusted to meet alignment "
+ "requirement for SeparateChainingHashTable's value type.");
+ if (resized_num_buckets * bucket_size_ + variable_storage_required >
+ available_memory) {
+ --resized_num_buckets;
+ }
+ }
+ aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+ resized_num_buckets * bucket_size_;
+ available_memory -= resized_num_buckets * bucket_size_;
+
+ void *resized_variable_length_key_storage = aligned_memory_start;
+ const std::size_t resized_variable_length_key_storage_size = available_memory;
+
+ const std::size_t original_buckets_used =
+ header_->buckets_allocated.load(std::memory_order_relaxed);
+
+ // Initialize the header.
+ resized_header->num_slots = resized_num_slots;
+ resized_header->num_buckets = resized_num_buckets;
+ resized_header->buckets_allocated.store(original_buckets_used,
+ std::memory_order_relaxed);
+ resized_header->variable_length_bytes_allocated.store(
+ original_variable_storage_used, std::memory_order_relaxed);
+
+ // Bulk-copy buckets. This is safe because:
+ // 1. The "next" pointers will be adjusted when rebuilding chains below.
+ // 2. The hash codes will stay the same.
+ // 3. For key components:
+ // a. Inline keys will stay exactly the same.
+ // b. Offsets into variable-length storage will remain valid, because
+ // we also do a byte-for-byte copy of variable-length storage below.
+ // c. Absolute external pointers will still point to the same address.
+ // d. Relative pointers are not used with resizable hash tables.
+ // 4. If values are not trivially copyable, then we invoke ValueT's copy
+ // or move constructor with placement new.
+ std::memcpy(resized_buckets, buckets_, original_buckets_used * bucket_size_);
+
+ // TODO(chasseur): std::is_trivially_copyable is not yet implemented in
+ // GCC 4.8.3, so we assume we need to invoke ValueT's copy or move
+ // constructor, even though the plain memcpy above could suffice for many
+ // possible ValueTs.
+ void *current_value_original = static_cast<char *>(buckets_) + kValueOffset;
+ void *current_value_resized =
+ static_cast<char *>(resized_buckets) + kValueOffset;
+ for (std::size_t bucket_num = 0; bucket_num < original_buckets_used;
+ ++bucket_num) {
+ // Use a move constructor if available to avoid a deep-copy, since resizes
+ // always succeed.
+ new (current_value_resized) std::uint8_t(
+ std::move(*static_cast<std::uint8_t *>(current_value_original)));
+ current_value_original =
+ static_cast<char *>(current_value_original) + bucket_size_;
+ current_value_resized =
+ static_cast<char *>(current_value_resized) + bucket_size_;
+ }
+
+ // Copy over variable-length key components, if any.
+ if (original_variable_storage_used > 0) {
+ DEBUG_ASSERT(original_variable_storage_used ==
+ key_manager_.getNextVariableLengthKeyOffset());
+ DEBUG_ASSERT(original_variable_storage_used <=
+ resized_variable_length_key_storage_size);
+ std::memcpy(resized_variable_length_key_storage,
+ key_manager_.getVariableLengthKeyStorage(),
+ original_variable_storage_used);
+ }
+
+ // Destroy values in the original hash table, if neccesary,
+ DestroyValues(buckets_, original_buckets_used, bucket_size_);
+
+ // Make resized structures active.
+ std::swap(this->blob_, resized_blob);
+ header_ = resized_header;
+ slots_ = resized_slots;
+ buckets_ = resized_buckets;
+ key_manager_.setVariableLengthStorageInfo(
+ resized_variable_length_key_storage,
+ resized_variable_length_key_storage_size,
+ &(resized_header->variable_length_bytes_allocated));
+
+ // Drop the old blob.
+ const block_id old_blob_id = resized_blob->getID();
+ resized_blob.release();
+ this->storage_manager_->deleteBlockOrBlobFile(old_blob_id);
+
+ // Rebuild chains.
+ void *current_bucket = buckets_;
+ for (std::size_t bucket_num = 0; bucket_num < original_buckets_used;
+ ++bucket_num) {
+ std::atomic<std::size_t> *next_ptr =
+ static_cast<std::atomic<std::size_t> *>(current_bucket);
+ const std::size_t hash_code = *reinterpret_cast<const std::size_t *>(
+ static_cast<const char *>(current_bucket) +
+ sizeof(std::atomic<std::size_t>));
+
+ const std::size_t slot_number = hash_code % header_->num_slots;
+ std::size_t slot_ptr_value = 0;
+ if (slots_[slot_number].compare_exchange_strong(
+ slot_ptr_value, bucket_num + 1, std::memory_order_relaxed)) {
+ // This bucket is the first in the chain for this block, so reset its
+ // next pointer to 0.
+ next_ptr->store(0, std::memory_order_relaxed);
+ } else {
+ // A chain already exists starting from this slot, so put this bucket at
+ // the head.
+ next_ptr->store(slot_ptr_value, std::memory_order_relaxed);
+ slots_[slot_number].store(bucket_num + 1, std::memory_order_relaxed);
+ }
+ current_bucket = static_cast<char *>(current_bucket) + bucket_size_;
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ preallocateForBulkInsert(const std::size_t total_entries,
+ const std::size_t total_variable_key_size,
+ HashTablePreallocationState *prealloc_state) {
+ DEBUG_ASSERT(allow_duplicate_keys);
+ if (!key_manager_.allocateVariableLengthKeyStorage(total_variable_key_size)) {
+ return false;
+ }
+
+ // We use load then compare-exchange here instead of simply fetch-add,
+ // because if multiple threads are simultaneously trying to allocate more
+ // than one bucket and exceed 'header_->num_buckets', their respective
+ // rollbacks might happen in such an order that some bucket ranges get
+ // skipped, while others might get double-allocated later.
+ std::size_t original_buckets_allocated =
+ header_->buckets_allocated.load(std::memory_order_relaxed);
+ std::size_t buckets_post_allocation =
+ original_buckets_allocated + total_entries;
+ while ((buckets_post_allocation <= header_->num_buckets) &&
+ !header_->buckets_allocated.compare_exchange_weak(
+ original_buckets_allocated,
+ buckets_post_allocation,
+ std::memory_order_relaxed)) {
+ buckets_post_allocation = original_buckets_allocated + total_entries;
+ }
+
+ if (buckets_post_allocation > header_->num_buckets) {
+ key_manager_.deallocateVariableLengthKeyStorage(total_variable_key_size);
+ return false;
+ }
+
+ prealloc_state->bucket_position = original_buckets_allocated;
+ if (total_variable_key_size != 0) {
+ prealloc_state->variable_length_key_position =
+ key_manager_.incrementNextVariableLengthKeyOffset(
+ total_variable_key_size);
+ }
+ return true;
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+void FastSeparateChainingHashTable<
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::DestroyValues(void *hash_buckets,
+ const std::size_t num_buckets,
+ const std::size_t bucket_size) {
+ if (!std::is_trivially_destructible<std::uint8_t>::value) {
+ void *value_ptr = static_cast<char *>(hash_buckets) + kValueOffset;
+ for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
+ static_cast<std::uint8_t *>(value_ptr)->~uint8_t();
+ value_ptr = static_cast<char *>(value_ptr) + bucket_size;
+ }
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+inline bool FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ locateBucketForInsertion(const std::size_t hash_code,
+ const std::size_t variable_key_allocation_required,
+ void **bucket,
+ std::atomic<std::size_t> **pending_chain_ptr,
+ std::size_t *pending_chain_ptr_finish_value,
+ HashTablePreallocationState *prealloc_state) {
+ DEBUG_ASSERT((prealloc_state == nullptr) || allow_duplicate_keys);
+ if (*bucket == nullptr) {
+ *pending_chain_ptr = &(slots_[hash_code % header_->num_slots]);
+ } else {
+ *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
+ }
+ for (;;) {
+ std::size_t existing_chain_ptr = 0;
+ if ((*pending_chain_ptr)
+ ->compare_exchange_strong(existing_chain_ptr,
+ std::numeric_limits<std::size_t>::max(),
+ std::memory_order_acq_rel)) {
+ // Got to the end of the chain. Allocate a new bucket.
+
+ // First, allocate variable-length key storage, if needed (i.e. if this
+ // is an upsert and we didn't allocate up-front).
+ if ((prealloc_state == nullptr) &&
+ !key_manager_.allocateVariableLengthKeyStorage(
+ variable_key_allocation_required)) {
+ // Ran out of variable-length storage.
+ (*pending_chain_ptr)->store(0, std::memory_order_release);
+ *bucket = nullptr;
+ return false;
+ }
+
+ const std::size_t allocated_bucket_num =
+ (prealloc_state == nullptr)
+ ? header_->buckets_allocated.fetch_add(1,
+ std::memory_order_relaxed)
+ : (prealloc_state->bucket_position)++;
+ if (allocated_bucket_num >= header_->num_buckets) {
+ // Ran out of buckets.
+ DEBUG_ASSERT(prealloc_state == nullptr);
+ header_->buckets_allocated.fetch_sub(1, std::memory_order_relaxed);
+ (*pending_chain_ptr)->store(0, std::memory_order_release);
+ *bucket = nullptr;
+ return false;
+ } else {
+ *bucket =
+ static_cast<char *>(buckets_) + allocated_bucket_num * bucket_size_;
+ *pending_chain_ptr_finish_value = allocated_bucket_num + 1;
+ return true;
+ }
+ }
+ // Spin until the real "next" pointer is available.
+ while (existing_chain_ptr == std::numeric_limits<std::size_t>::max()) {
+ existing_chain_ptr =
+ (*pending_chain_ptr)->load(std::memory_order_acquire);
+ }
+ if (existing_chain_ptr == 0) {
+ // Other thread had to roll back, so try again.
+ continue;
+ }
+ // Chase the next pointer.
+ *bucket =
+ static_cast<char *>(buckets_) + (existing_chain_ptr - 1) * bucket_size_;
+ *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
+ if (!allow_duplicate_keys) {
+ const std::size_t hash_in_bucket = *reinterpret_cast<const std::size_t *>(
+ static_cast<const char *>(*bucket) +
+ sizeof(std::atomic<std::size_t>));
+ if (hash_in_bucket == hash_code) {
+ return false;
+ }
+ }
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+inline void FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ writeScalarKeyToBucket(const TypedValue &key,
+ const std::size_t hash_code,
+ void *bucket,
+ HashTablePreallocationState *prealloc_state) {
+ *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
+ sizeof(std::atomic<std::size_t>)) =
+ hash_code;
+ key_manager_.writeKeyComponentToBucket(key, 0, bucket, prealloc_state);
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+inline void FastSeparateChainingHashTable<resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ writeCompositeKeyToBucket(const std::vector<TypedValue> &key,
+ const std::size_t hash_code,
+ void *bucket,
+ HashTablePreallocationState *prealloc_state) {
+ DEBUG_ASSERT(key.size() == this->key_types_.size());
+ *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
+ sizeof(std::atomic<std::size_t>)) =
+ hash_code;
+ for (std::size_t idx = 0; idx < this->key_types_.size(); ++idx) {
+ key_manager_.writeKeyComponentToBucket(
+ key[idx], idx, bucket, prealloc_state);
+ }
+}
+
+template <bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool FastSeparateChainingHashTable<
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::isFull(const std::size_t extra_variable_storage)
+ const {
+ if (header_->buckets_allocated.load(std::memory_order_relaxed) >=
+ header_->num_buckets) {
+ // All buckets are allocated.
+ return true;
+ }
+
+ if (extra_variable_storage > 0) {
+ if (extra_variable_storage +
+ header_->variable_length_bytes_allocated.load(
+ std::memory_order_relaxed) >
+ key_manager_.getVariableLengthKeyStorageSize()) {
+ // Not enough variable-length key storage space.
+ return true;
+ }
+ }
+
+ return false;
+}
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_SEPARATE_CHAINING_HASH_TABLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index 9fa41a2..f2dcb03 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -49,16 +49,6 @@ namespace quickstep {
*/
/**
- * @brief Codes which indicate the result of a call to put() or
- * putCompositeKey().
- **/
-enum class HashTablePutResult {
- kOK = 0,
- kDuplicateKey,
- kOutOfSpace
-};
-
-/**
* @brief Base class for hash table.
*
* This class is templated so that the core hash-table logic can be reused in
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index f1594e3..cd0a141 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -21,7 +21,9 @@
#define QUICKSTEP_STORAGE_HASH_TABLE_BASE_HPP_
#include <cstddef>
+#include <vector>
+#include "ValueAccessor.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
@@ -52,6 +54,12 @@ struct HashTablePreallocationState {
};
/**
+ * @brief Codes which indicate the result of a call to put() or
+ * putCompositeKey().
+ **/
+enum class HashTablePutResult { kOK = 0, kDuplicateKey, kOutOfSpace };
+
+/**
* @brief An ultra-minimal base class that HashTables with different ValueT
* parameters inherit from. This allows for a bit more type-safety than
* just passing around void* pointers (although casting will still be
@@ -64,12 +72,35 @@ template <bool resizable,
bool allow_duplicate_keys>
class HashTableBase {
public:
- virtual ~HashTableBase() {
+ virtual ~HashTableBase() {}
+
+ /**
+ * TODO(harshad) We should get rid of this function from here. We are
+ * postponing it because of the amount of work to be done is significant.
+ * The steps are as follows:
+ * 1. Replace AggregationStateHashTableBase occurence in HashTablePool to
+ * the FastHashTable implementation (i.e. an implementation specialized for
+ * aggregation).
+ * 2. Remove createGroupByHashTable from the AggregationHandle* classes.
+ * 3. Replace AggregationStateHashTableBase occurences in AggregationHandle*
+ * clases to the FastHashTable implementation (i.e. an implementation
+ * specialized for aggregation).
+ * 4. Move this method to the FastHashTable class from here, so that it can
+ * be called from the AggregationHandle* classes.
+ *
+ * Optionally, we can also remove the AggregationStateHashTableBase
+ * specialization from this file.
+ **/
+ virtual bool upsertValueAccessorCompositeKeyFast(
+ const std::vector<attribute_id> &argument,
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys) {
+ return false;
}
protected:
- HashTableBase() {
- }
+ HashTableBase() {}
private:
DISALLOW_COPY_AND_ASSIGN(HashTableBase);