You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/04 04:44:29 UTC

[1/3] incubator-quickstep git commit: Updates

Repository: incubator-quickstep
Updated Branches:
  refs/heads/collision-free-agg 3bcb5c892 -> 1e7a92a94


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/PackedPayloadHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
new file mode 100644
index 0000000..adf4b5c
--- /dev/null
+++ b/storage/PackedPayloadHashTable.cpp
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/PackedPayloadHashTable.hpp"
+
+namespace quickstep {
+
+PackedPayloadHashTable::PackedPayloadHashTable(
+    const std::vector<const Type *> &key_types,
+    const std::size_t num_entries,
+    const std::vector<AggregationHandle *> &handles,
+    StorageManager *storage_manager)
+    : key_types_(key_types),
+      num_handles_(handles.size()),
+      handles_(handles),
+      total_payload_size_(ComputeTotalPayloadSize(handles)),
+      storage_manager_(storage_manager),
+      kBucketAlignment(alignof(std::atomic<std::size_t>)),
+      kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
+      key_manager_(key_types_, kValueOffset + total_payload_size_),
+      bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
+  std::size_t payload_offset_running_sum = sizeof(SpinMutex);
+  for (const auto *handle : handles) {
+    payload_offsets_.emplace_back(payload_offset_running_sum);
+    payload_offset_running_sum += handle->getPayloadSize();
+  }
+
+  // NOTE(jianqiao): Potential memory leak / double freeing by copying from
+  // init_payload to buckets if payload contains out of line data.
+  init_payload_ =
+      static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
+  DCHECK(init_payload_ != nullptr);
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
+  }
+
+  // Bucket size always rounds up to the alignment requirement of the atomic
+  // size_t "next" pointer at the front or a ValueT, whichever is larger.
+  //
+  // Give base HashTable information about what key components are stored
+  // inline from 'key_manager_'.
+  setKeyInline(key_manager_.getKeyInline());
+
+  // Pick out a prime number of slots and calculate storage requirements.
+  std::size_t num_slots_tmp =
+      get_next_prime_number(num_entries * kHashTableLoadFactor);
+  std::size_t required_memory =
+      sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
+      (num_slots_tmp / kHashTableLoadFactor) *
+          (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
+  std::size_t num_storage_slots =
+      this->storage_manager_->SlotsNeededForBytes(required_memory);
+  if (num_storage_slots == 0) {
+    FATAL_ERROR(
+        "Storage requirement for SeparateChainingHashTable "
+        "exceeds maximum allocation size.");
+  }
+
+  // Get a StorageBlob to hold the hash table.
+  const block_id blob_id =
+      this->storage_manager_->createBlob(num_storage_slots);
+  this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
+
+  void *aligned_memory_start = this->blob_->getMemoryMutable();
+  std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
+  if (align(alignof(Header),
+            sizeof(Header),
+            aligned_memory_start,
+            available_memory) == nullptr) {
+    // With current values from StorageConstants.hpp, this should be
+    // impossible. A blob is at least 1 MB, while a Header has alignment
+    // requirement of just kCacheLineBytes (64 bytes).
+    FATAL_ERROR(
+        "StorageBlob used to hold resizable "
+        "SeparateChainingHashTable is too small to meet alignment "
+        "requirements of SeparateChainingHashTable::Header.");
+  } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
+    // This should also be impossible, since the StorageManager allocates slots
+    // aligned to kCacheLineBytes.
+    DEV_WARNING("StorageBlob memory adjusted by "
+                << (num_storage_slots * kSlotSizeBytes - available_memory)
+                << " bytes to meet alignment requirement for "
+                << "SeparateChainingHashTable::Header.");
+  }
+
+  // Locate the header.
+  header_ = static_cast<Header *>(aligned_memory_start);
+  aligned_memory_start =
+      static_cast<char *>(aligned_memory_start) + sizeof(Header);
+  available_memory -= sizeof(Header);
+
+  // Recompute the number of slots & buckets using the actual available memory.
+  // Most likely, we got some extra free bucket space due to "rounding up" to
+  // the storage blob's size. It's also possible (though very unlikely) that we
+  // will wind up with fewer buckets than we initially wanted because of screwy
+  // alignment requirements for ValueT.
+  std::size_t num_buckets_tmp =
+      available_memory /
+      (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
+       key_manager_.getEstimatedVariableKeySize());
+  num_slots_tmp =
+      get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor);
+  num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
+  DEBUG_ASSERT(num_slots_tmp > 0);
+  DEBUG_ASSERT(num_buckets_tmp > 0);
+
+  // Locate the slot array.
+  slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
+  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+                         sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+  available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+
+  // Locate the buckets.
+  buckets_ = aligned_memory_start;
+  // Extra-paranoid: If ValueT has an alignment requirement greater than that
+  // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
+  // array.
+  if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) ==
+      nullptr) {
+    FATAL_ERROR(
+        "StorageBlob used to hold resizable "
+        "SeparateChainingHashTable is too small to meet "
+        "alignment requirements of buckets.");
+  } else if (buckets_ != aligned_memory_start) {
+    DEV_WARNING(
+        "Bucket array start position adjusted to meet alignment "
+        "requirement for SeparateChainingHashTable's value type.");
+    if (num_buckets_tmp * bucket_size_ > available_memory) {
+      --num_buckets_tmp;
+    }
+  }
+
+  // Fill in the header.
+  header_->num_slots = num_slots_tmp;
+  header_->num_buckets = num_buckets_tmp;
+  header_->buckets_allocated.store(0, std::memory_order_relaxed);
+  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+  available_memory -= bucket_size_ * (header_->num_buckets);
+
+  // Locate variable-length key storage region, and give it all the remaining
+  // bytes in the blob.
+  key_manager_.setVariableLengthStorageInfo(
+      static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_,
+      available_memory,
+      &(header_->variable_length_bytes_allocated));
+}
+
+PackedPayloadHashTable::~PackedPayloadHashTable() {
+  if (blob_.valid()) {
+    const block_id blob_id = blob_->getID();
+    blob_.release();
+    storage_manager_->deleteBlockOrBlobFile(blob_id);
+  }
+  std::free(init_payload_);
+}
+
+void PackedPayloadHashTable::clear() {
+  const std::size_t used_buckets =
+      header_->buckets_allocated.load(std::memory_order_relaxed);
+  // Destroy existing values, if necessary.
+  destroyPayload();
+
+  // Zero-out slot array.
+  std::memset(
+      slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
+
+  // Zero-out used buckets.
+  std::memset(buckets_, 0x0, used_buckets * bucket_size_);
+
+  header_->buckets_allocated.store(0, std::memory_order_relaxed);
+  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+  key_manager_.zeroNextVariableLengthKeyOffset();
+}
+
+void PackedPayloadHashTable::destroyPayload() {
+  const std::size_t num_buckets =
+      header_->buckets_allocated.load(std::memory_order_relaxed);
+  void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset;
+  for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
+    for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) {
+      void *value_internal_ptr =
+          static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id];
+      handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr));
+    }
+    bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_;
+  }
+}
+
+bool PackedPayloadHashTable::upsertValueAccessor(
+    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+    const std::vector<MultiSourceAttributeId> &key_attr_ids,
+    ValueAccessorMultiplexer *accessor_mux) {
+  DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
+             == ValueAccessor::Implementation::kColumnVectors);
+  ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
+  ColumnVectorsValueAccessor *derived_accessor =
+      static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor());
+
+  if (derived_accessor == nullptr) {
+    return upsertValueAccessorCompositeKeyInternal<false>(argument_ids,
+                                                          key_attr_ids,
+                                                          base_accessor,
+                                                          derived_accessor);
+  } else {
+    return upsertValueAccessorCompositeKeyInternal<true>(argument_ids,
+                                                         key_attr_ids,
+                                                         base_accessor,
+                                                         derived_accessor);
+  }
+}
+
+void PackedPayloadHashTable::resize(const std::size_t extra_buckets,
+                                    const std::size_t extra_variable_storage,
+                                    const std::size_t retry_num) {
+  // A retry should never be necessary with this implementation of HashTable.
+  // Separate chaining ensures that any resized hash table with more buckets
+  // than the original table will be able to hold more entries than the
+  // original.
+  DEBUG_ASSERT(retry_num == 0);
+
+  SpinSharedMutexExclusiveLock<true> write_lock(this->resize_shared_mutex_);
+
+  // Recheck whether the hash table is still full. Note that multiple threads
+  // might wait to rebuild this hash table simultaneously. Only the first one
+  // should do the rebuild.
+  if (!isFull(extra_variable_storage)) {
+    return;
+  }
+
+  // Approximately double the number of buckets and slots.
+  //
+  // TODO(chasseur): It may be worth it to more than double the number of
+  // buckets here so that we can maintain a good, sparse fill factor for a
+  // longer time as more values are inserted. Such behavior should take into
+  // account kHashTableLoadFactor.
+  std::size_t resized_num_slots = get_next_prime_number(
+      (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2);
+  std::size_t variable_storage_required =
+      (resized_num_slots / kHashTableLoadFactor) *
+      key_manager_.getEstimatedVariableKeySize();
+  const std::size_t original_variable_storage_used =
+      header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+  // If this resize was triggered by a too-large variable-length key, bump up
+  // the variable-length storage requirement.
+  if ((extra_variable_storage > 0) &&
+      (extra_variable_storage + original_variable_storage_used >
+       key_manager_.getVariableLengthKeyStorageSize())) {
+    variable_storage_required += extra_variable_storage;
+  }
+
+  const std::size_t resized_memory_required =
+      sizeof(Header) + resized_num_slots * sizeof(std::atomic<std::size_t>) +
+      (resized_num_slots / kHashTableLoadFactor) * bucket_size_ +
+      variable_storage_required;
+  const std::size_t resized_storage_slots =
+      this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
+  if (resized_storage_slots == 0) {
+    FATAL_ERROR(
+        "Storage requirement for resized SeparateChainingHashTable "
+        "exceeds maximum allocation size.");
+  }
+
+  // Get a new StorageBlob to hold the resized hash table.
+  const block_id resized_blob_id =
+      this->storage_manager_->createBlob(resized_storage_slots);
+  MutableBlobReference resized_blob =
+      this->storage_manager_->getBlobMutable(resized_blob_id);
+
+  // Locate data structures inside the new StorageBlob.
+  void *aligned_memory_start = resized_blob->getMemoryMutable();
+  std::size_t available_memory = resized_storage_slots * kSlotSizeBytes;
+  if (align(alignof(Header),
+            sizeof(Header),
+            aligned_memory_start,
+            available_memory) == nullptr) {
+    // Should be impossible, as noted in constructor.
+    FATAL_ERROR(
+        "StorageBlob used to hold resized SeparateChainingHashTable "
+        "is too small to meet alignment requirements of "
+        "LinearOpenAddressingHashTable::Header.");
+  } else if (aligned_memory_start != resized_blob->getMemoryMutable()) {
+    // Again, should be impossible.
+    DEV_WARNING("In SeparateChainingHashTable::resize(), StorageBlob "
+                << "memory adjusted by "
+                << (resized_num_slots * kSlotSizeBytes - available_memory)
+                << " bytes to meet alignment requirement for "
+                << "LinearOpenAddressingHashTable::Header.");
+  }
+
+  Header *resized_header = static_cast<Header *>(aligned_memory_start);
+  aligned_memory_start =
+      static_cast<char *>(aligned_memory_start) + sizeof(Header);
+  available_memory -= sizeof(Header);
+
+  // As in constructor, recompute the number of slots and buckets using the
+  // actual available memory.
+  std::size_t resized_num_buckets =
+      (available_memory - extra_variable_storage) /
+      (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
+       key_manager_.getEstimatedVariableKeySize());
+  resized_num_slots =
+      get_previous_prime_number(resized_num_buckets * kHashTableLoadFactor);
+  resized_num_buckets = resized_num_slots / kHashTableLoadFactor;
+
+  // Locate slot array.
+  std::atomic<std::size_t> *resized_slots =
+      static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
+  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+                         sizeof(std::atomic<std::size_t>) * resized_num_slots;
+  available_memory -= sizeof(std::atomic<std::size_t>) * resized_num_slots;
+
+  // As in constructor, we will be extra paranoid and use align() to locate the
+  // start of the array of buckets, as well.
+  void *resized_buckets = aligned_memory_start;
+  if (align(
+          kBucketAlignment, bucket_size_, resized_buckets, available_memory) ==
+      nullptr) {
+    FATAL_ERROR(
+        "StorageBlob used to hold resized SeparateChainingHashTable "
+        "is too small to meet alignment requirements of buckets.");
+  } else if (resized_buckets != aligned_memory_start) {
+    DEV_WARNING(
+        "Bucket array start position adjusted to meet alignment "
+        "requirement for SeparateChainingHashTable's value type.");
+    if (resized_num_buckets * bucket_size_ + variable_storage_required >
+        available_memory) {
+      --resized_num_buckets;
+    }
+  }
+  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+                         resized_num_buckets * bucket_size_;
+  available_memory -= resized_num_buckets * bucket_size_;
+
+  void *resized_variable_length_key_storage = aligned_memory_start;
+  const std::size_t resized_variable_length_key_storage_size = available_memory;
+
+  const std::size_t original_buckets_used =
+      header_->buckets_allocated.load(std::memory_order_relaxed);
+
+  // Initialize the header.
+  resized_header->num_slots = resized_num_slots;
+  resized_header->num_buckets = resized_num_buckets;
+  resized_header->buckets_allocated.store(original_buckets_used,
+                                          std::memory_order_relaxed);
+  resized_header->variable_length_bytes_allocated.store(
+      original_variable_storage_used, std::memory_order_relaxed);
+
+  // Bulk-copy buckets. This is safe because:
+  //     1. The "next" pointers will be adjusted when rebuilding chains below.
+  //     2. The hash codes will stay the same.
+  //     3. For key components:
+  //       a. Inline keys will stay exactly the same.
+  //       b. Offsets into variable-length storage will remain valid, because
+  //          we also do a byte-for-byte copy of variable-length storage below.
+  //       c. Absolute external pointers will still point to the same address.
+  //       d. Relative pointers are not used with resizable hash tables.
+  //     4. If values are not trivially copyable, then we invoke ValueT's copy
+  //        or move constructor with placement new.
+  // NOTE(harshad) - Regarding point 4 above, as this is a specialized
+  // hash table implemented for aggregation, the values are trivially copyable,
+  // therefore we don't need to invoke payload values' copy/move constructors.
+  std::memcpy(resized_buckets, buckets_, original_buckets_used * bucket_size_);
+
+  // Copy over variable-length key components, if any.
+  if (original_variable_storage_used > 0) {
+    DEBUG_ASSERT(original_variable_storage_used ==
+                 key_manager_.getNextVariableLengthKeyOffset());
+    DEBUG_ASSERT(original_variable_storage_used <=
+                 resized_variable_length_key_storage_size);
+    std::memcpy(resized_variable_length_key_storage,
+                key_manager_.getVariableLengthKeyStorage(),
+                original_variable_storage_used);
+  }
+
+  destroyPayload();
+
+  // Make resized structures active.
+  std::swap(this->blob_, resized_blob);
+  header_ = resized_header;
+  slots_ = resized_slots;
+  buckets_ = resized_buckets;
+  key_manager_.setVariableLengthStorageInfo(
+      resized_variable_length_key_storage,
+      resized_variable_length_key_storage_size,
+      &(resized_header->variable_length_bytes_allocated));
+
+  // Drop the old blob.
+  const block_id old_blob_id = resized_blob->getID();
+  resized_blob.release();
+  this->storage_manager_->deleteBlockOrBlobFile(old_blob_id);
+
+  // Rebuild chains.
+  void *current_bucket = buckets_;
+  for (std::size_t bucket_num = 0; bucket_num < original_buckets_used;
+       ++bucket_num) {
+    std::atomic<std::size_t> *next_ptr =
+        static_cast<std::atomic<std::size_t> *>(current_bucket);
+    const std::size_t hash_code = *reinterpret_cast<const std::size_t *>(
+        static_cast<const char *>(current_bucket) +
+        sizeof(std::atomic<std::size_t>));
+
+    const std::size_t slot_number = hash_code % header_->num_slots;
+    std::size_t slot_ptr_value = 0;
+    if (slots_[slot_number].compare_exchange_strong(
+            slot_ptr_value, bucket_num + 1, std::memory_order_relaxed)) {
+      // This bucket is the first in the chain for this block, so reset its
+      // next pointer to 0.
+      next_ptr->store(0, std::memory_order_relaxed);
+    } else {
+      // A chain already exists starting from this slot, so put this bucket at
+      // the head.
+      next_ptr->store(slot_ptr_value, std::memory_order_relaxed);
+      slots_[slot_number].store(bucket_num + 1, std::memory_order_relaxed);
+    }
+    current_bucket = static_cast<char *>(current_bucket) + bucket_size_;
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
new file mode 100644
index 0000000..a871d29
--- /dev/null
+++ b/storage/PackedPayloadHashTable.hpp
@@ -0,0 +1,798 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdlib>
+#include <limits>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/HashTableKeyManager.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleReference.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "threading/SpinMutex.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "types/Type.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/Alignment.hpp"
+#include "utility/HashPair.hpp"
+#include "utility/Macros.hpp"
+#include "utility/PrimeNumber.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+class PackedPayloadHashTable : public AggregationStateHashTableBase {
+ public:
+  PackedPayloadHashTable(
+      const std::vector<const Type *> &key_types,
+      const std::size_t num_entries,
+      const std::vector<AggregationHandle *> &handles,
+      StorageManager *storage_manager);
+
+  ~PackedPayloadHashTable() override;
+
+  void clear();
+
+  void destroyPayload() override;
+
+  bool upsertValueAccessor(
+      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_ids,
+      ValueAccessorMultiplexer *accessor_mux) override;
+
+  inline block_id getBlobId() const {
+    return blob_->getID();
+  }
+
+  inline std::size_t numEntries() const {
+    return header_->buckets_allocated.load(std::memory_order_relaxed);
+  }
+
+  inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
+                                 const std::uint8_t *source_state);
+
+  template <typename FunctorT>
+  inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
+                                 FunctorT *functor,
+                                 int index);
+
+  inline const std::uint8_t* getSingleCompositeKey(
+      const std::vector<TypedValue> &key) const;
+
+  inline const std::uint8_t* getSingleCompositeKey(
+      const std::vector<TypedValue> &key,
+      const int index) const;
+
+  template <typename FunctorT>
+  inline std::size_t forEach(FunctorT *functor) const;
+
+  template <typename FunctorT>
+  inline std::size_t forEach(FunctorT *functor, const int index) const;
+
+  template <typename FunctorT>
+  inline std::size_t forEachCompositeKey(FunctorT *functor) const;
+
+  template <typename FunctorT>
+  inline std::size_t forEachCompositeKey(FunctorT *functor,
+                                         const std::size_t index) const;
+
+ private:
+  void resize(const std::size_t extra_buckets,
+              const std::size_t extra_variable_storage,
+              const std::size_t retry_num = 0);
+
+  inline std::size_t calculateVariableLengthCompositeKeyCopySize(
+      const std::vector<TypedValue> &key) const {
+    std::size_t total = 0;
+    for (std::vector<TypedValue>::size_type idx = 0; idx < key.size(); ++idx) {
+      if (!(*key_inline_)[idx]) {
+        total += key[idx].getDataSize();
+      }
+    }
+    return total;
+  }
+
+  inline bool getNextEntry(TypedValue *key,
+                           const std::uint8_t **value,
+                           std::size_t *entry_num) const;
+
+  inline bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
+                                       const std::uint8_t **value,
+                                       std::size_t *entry_num) const;
+
+  inline std::uint8_t* upsertCompositeKeyInternal(
+      const std::vector<TypedValue> &key,
+      const std::size_t variable_key_size);
+
+  template <bool use_two_accessors>
+  inline bool upsertValueAccessorCompositeKeyInternal(
+      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_ids,
+      ValueAccessor *base_accessor,
+      ColumnVectorsValueAccessor *derived_accessor);
+
+  // Generate a hash for a composite key by hashing each component of 'key' and
+  // mixing their bits with CombineHashes().
+  inline std::size_t hashCompositeKey(const std::vector<TypedValue> &key) const;
+
+  // Set information about which key components are stored inline. This usually
+  // comes from a HashTableKeyManager, and is set by the constructor of a
+  // subclass of HashTable.
+  inline void setKeyInline(const std::vector<bool> *key_inline) {
+    scalar_key_inline_ = key_inline->front();
+    key_inline_ = key_inline;
+  }
+
+  inline static std::size_t ComputeTotalPayloadSize(
+      const std::vector<AggregationHandle *> &handles) {
+    std::size_t total_payload_size = sizeof(SpinMutex);
+    for (const auto *handle : handles) {
+      total_payload_size += handle->getPayloadSize();
+    }
+    return total_payload_size;
+  }
+
+  // Assign '*key_vector' with the attribute values specified by 'key_ids' at
+  // the current position of 'accessor'. If 'check_for_null_keys' is true, stops
+  // and returns true if any of the values is null, otherwise returns false.
+  template <bool use_two_accessors,
+            bool check_for_null_keys,
+            typename ValueAccessorT>
+  inline static bool GetCompositeKeyFromValueAccessor(
+      const std::vector<MultiSourceAttributeId> &key_ids,
+      const ValueAccessorT *accessor,
+      const ColumnVectorsValueAccessor *derived_accessor,
+      std::vector<TypedValue> *key_vector) {
+    for (std::size_t key_idx = 0; key_idx < key_ids.size(); ++key_idx) {
+      const MultiSourceAttributeId &key_id = key_ids[key_idx];
+      if (use_two_accessors && key_id.source == ValueAccessorSource::kDerived) {
+        (*key_vector)[key_idx] = derived_accessor->getTypedValue(key_id.attr_id);
+      } else {
+        (*key_vector)[key_idx] = accessor->getTypedValue(key_id.attr_id);
+      }
+      if (check_for_null_keys && (*key_vector)[key_idx].isNull()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  struct Header {
+    std::size_t num_slots;
+    std::size_t num_buckets;
+    alignas(kCacheLineBytes) std::atomic<std::size_t> buckets_allocated;
+    alignas(kCacheLineBytes)
+        std::atomic<std::size_t> variable_length_bytes_allocated;
+  };
+
+  // Type(s) of keys.
+  const std::vector<const Type *> key_types_;
+
+  // Information about whether key components are stored inline or in a
+  // separate variable-length storage region. This is usually determined by a
+  // HashTableKeyManager and set by calling setKeyInline().
+  bool scalar_key_inline_;
+  const std::vector<bool> *key_inline_;
+
+  const std::size_t num_handles_;
+  const std::vector<AggregationHandle *> handles_;
+
+  std::size_t total_payload_size_;
+  std::vector<std::size_t> payload_offsets_;
+  std::uint8_t *init_payload_;
+
+  StorageManager *storage_manager_;
+  MutableBlobReference blob_;
+
+  // Locked in shared mode for most operations, exclusive mode during resize.
+  // Not locked at all for non-resizable HashTables.
+  alignas(kCacheLineBytes) SpinSharedMutex<true> resize_shared_mutex_;
+
+  std::size_t kBucketAlignment;
+
+  // Value's offset in a bucket is the first alignof(ValueT) boundary after the
+  // next pointer and hash code.
+  std::size_t kValueOffset;
+
+  // Round bucket size up to a multiple of kBucketAlignment.
+  constexpr std::size_t ComputeBucketSize(const std::size_t fixed_key_size) {
+    return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) /
+             kBucketAlignment) +
+            1) *
+           kBucketAlignment;
+  }
+
+  // Attempt to find an empty bucket to insert 'hash_code' into, starting after
+  // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
+  // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
+  // empty bucket is found. Returns false if 'allow_duplicate_keys' is false
+  // and a hash collision is found (caller should then check whether there is a
+  // genuine key collision or the hash collision is spurious). Returns false
+  // and sets '*bucket' to NULL if there are no more empty buckets in the hash
+  // table. If 'variable_key_allocation_required' is nonzero, this method will
+  // attempt to allocate storage for a variable-length key BEFORE allocating a
+  // bucket, so that no bucket number below 'header_->num_buckets' is ever
+  // deallocated after being allocated.
+  inline bool locateBucketForInsertion(
+      const std::size_t hash_code,
+      const std::size_t variable_key_allocation_required,
+      void **bucket,
+      std::atomic<std::size_t> **pending_chain_ptr,
+      std::size_t *pending_chain_ptr_finish_value);
+
+  // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was
+  // found by locateBucketForInsertion(). Assumes that storage for a
+  // variable-length key copy (if any) was already allocated by a successful
+  // call to allocateVariableLengthKeyStorage().
+  inline void writeScalarKeyToBucket(
+      const TypedValue &key,
+      const std::size_t hash_code,
+      void *bucket);
+
+  // Write a composite 'key' and its 'hash_code' into the '*bucket', which was
+  // found by locateBucketForInsertion(). Assumes that storage for
+  // variable-length key copies (if any) was already allocated by a successful
+  // call to allocateVariableLengthKeyStorage().
+  inline void writeCompositeKeyToBucket(
+      const std::vector<TypedValue> &key,
+      const std::size_t hash_code,
+      void *bucket);
+
+  // Determine whether it is actually necessary to resize this hash table.
+  // Checks that there is at least one unallocated bucket, and that there is
+  // at least 'extra_variable_storage' bytes of variable-length storage free.
+  inline bool isFull(const std::size_t extra_variable_storage) const;
+
+  // Helper object to manage key storage.
+  HashTableKeyManager<false, true> key_manager_;
+
+  // In-memory structure is as follows:
+  //   - SeparateChainingHashTable::Header
+  //   - Array of slots, interpreted as follows:
+  //       - 0 = Points to nothing (empty)
+  //       - SIZE_T_MAX = Pending (some thread is starting a chain from this
+  //         slot and will overwrite it soon)
+  //       - Anything else = The number of the first bucket in the chain for
+  //         this slot PLUS ONE (i.e. subtract one to get the actual bucket
+  //         number).
+  //   - Array of buckets, each of which is:
+  //       - atomic size_t "next" pointer, interpreted the same as slots above.
+  //       - size_t hash value
+  //       - possibly some unused bytes as needed so that ValueT's alignment
+  //         requirement is met
+  //       - ValueT value slot
+  //       - fixed-length key storage (which may include pointers to external
+  //         memory or offsets of variable length keys stored within this hash
+  //         table)
+  //       - possibly some additional unused bytes so that bucket size is a
+  //         multiple of both alignof(std::atomic<std::size_t>) and
+  //         alignof(ValueT)
+  //   - Variable-length key storage region (referenced by offsets stored in
+  //     fixed-length keys).
+  Header *header_;
+
+  std::atomic<std::size_t> *slots_;
+  void *buckets_;
+  const std::size_t bucket_size_;
+
+  DISALLOW_COPY_AND_ASSIGN(PackedPayloadHashTable);
+};
+
+/** @} */
+
+// ----------------------------------------------------------------------------
+// Implementations of template class methods follow.
+
+class HashTableMerger {
+ public:
+  /**
+   * @brief Constructor
+   *
+   * @param handle The Aggregation handle being used.
+   * @param destination_hash_table The destination hash table to which other
+   *        hash tables will be merged.
+   **/
+  explicit HashTableMerger(PackedPayloadHashTable *destination_hash_table)
+      : destination_hash_table_(destination_hash_table) {}
+
+  /**
+   * @brief The operator for the functor.
+   *
+   * @param group_by_key The group by key being merged.
+   * @param source_state The aggregation state for the given key in the source
+   *        aggregation hash table.
+   **/
+  inline void operator()(const std::vector<TypedValue> &group_by_key,
+                         const std::uint8_t *source_state) {
+    destination_hash_table_->upsertCompositeKey(group_by_key, source_state);
+  }
+
+ private:
+  PackedPayloadHashTable *destination_hash_table_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
+};
+
+inline std::size_t PackedPayloadHashTable::hashCompositeKey(
+    const std::vector<TypedValue> &key) const {
+  DEBUG_ASSERT(!key.empty());
+  DEBUG_ASSERT(key.size() == key_types_.size());
+  std::size_t hash = key.front().getHash();
+  for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
+       key_it != key.end();
+       ++key_it) {
+    hash = CombineHashes(hash, key_it->getHash());
+  }
+  return hash;
+}
+
+inline bool PackedPayloadHashTable::getNextEntry(TypedValue *key,
+                                                 const std::uint8_t **value,
+                                                 std::size_t *entry_num) const {
+  if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+    const char *bucket =
+        static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
+    *key = key_manager_.getKeyComponentTyped(bucket, 0);
+    *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+    ++(*entry_num);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+
+inline bool PackedPayloadHashTable::getNextEntryCompositeKey(
+    std::vector<TypedValue> *key,
+    const std::uint8_t **value,
+    std::size_t *entry_num) const {
+  if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+    const char *bucket =
+        static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
+    for (std::vector<const Type *>::size_type key_idx = 0;
+         key_idx < this->key_types_.size();
+         ++key_idx) {
+      key->emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx));
+    }
+    *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+    ++(*entry_num);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+
+inline bool PackedPayloadHashTable::locateBucketForInsertion(
+    const std::size_t hash_code,
+    const std::size_t variable_key_allocation_required,
+    void **bucket,
+    std::atomic<std::size_t> **pending_chain_ptr,
+    std::size_t *pending_chain_ptr_finish_value) {
+  if (*bucket == nullptr) {
+    *pending_chain_ptr = &(slots_[hash_code % header_->num_slots]);
+  } else {
+    *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
+  }
+  for (;;) {
+    std::size_t existing_chain_ptr = 0;
+    if ((*pending_chain_ptr)
+            ->compare_exchange_strong(existing_chain_ptr,
+                                      std::numeric_limits<std::size_t>::max(),
+                                      std::memory_order_acq_rel)) {
+      // Got to the end of the chain. Allocate a new bucket.
+
+      // First, allocate variable-length key storage, if needed (i.e. if this
+      // is an upsert and we didn't allocate up-front).
+      if (!key_manager_.allocateVariableLengthKeyStorage(
+              variable_key_allocation_required)) {
+        // Ran out of variable-length storage.
+        (*pending_chain_ptr)->store(0, std::memory_order_release);
+        *bucket = nullptr;
+        return false;
+      }
+
+      const std::size_t allocated_bucket_num =
+          header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed);
+      if (allocated_bucket_num >= header_->num_buckets) {
+        // Ran out of buckets.
+        header_->buckets_allocated.fetch_sub(1, std::memory_order_relaxed);
+        (*pending_chain_ptr)->store(0, std::memory_order_release);
+        *bucket = nullptr;
+        return false;
+      } else {
+        *bucket =
+            static_cast<char *>(buckets_) + allocated_bucket_num * bucket_size_;
+        *pending_chain_ptr_finish_value = allocated_bucket_num + 1;
+        return true;
+      }
+    }
+    // Spin until the real "next" pointer is available.
+    while (existing_chain_ptr == std::numeric_limits<std::size_t>::max()) {
+      existing_chain_ptr =
+          (*pending_chain_ptr)->load(std::memory_order_acquire);
+    }
+    if (existing_chain_ptr == 0) {
+      // Other thread had to roll back, so try again.
+      continue;
+    }
+    // Chase the next pointer.
+    *bucket =
+        static_cast<char *>(buckets_) + (existing_chain_ptr - 1) * bucket_size_;
+    *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
+    const std::size_t hash_in_bucket = *reinterpret_cast<const std::size_t *>(
+        static_cast<const char *>(*bucket) +
+        sizeof(std::atomic<std::size_t>));
+    if (hash_in_bucket == hash_code) {
+      return false;
+    }
+  }
+}
+
+inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey(
+    const std::vector<TypedValue> &key) const {
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  std::size_t bucket_ref =
+      slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket =
+        static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) &&
+        key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+    }
+    bucket_ref =
+        reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+            std::memory_order_relaxed);
+  }
+
+  // Reached the end of the chain and didn't find a match.
+  return nullptr;
+}
+
+inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey(
+    const std::vector<TypedValue> &key,
+    const int index) const {
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  std::size_t bucket_ref =
+      slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket =
+        static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) &&
+        key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset) +
+             this->payload_offsets_[index];
+    }
+    bucket_ref =
+        reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+            std::memory_order_relaxed);
+  }
+
+  // Reached the end of the chain and didn't find a match.
+  return nullptr;
+}
+
+inline bool PackedPayloadHashTable::upsertCompositeKey(
+    const std::vector<TypedValue> &key,
+    const std::uint8_t *source_state) {
+  const std::size_t variable_size =
+      calculateVariableLengthCompositeKeyCopySize(key);
+  for (;;) {
+    {
+      SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
+      std::uint8_t *value =
+          upsertCompositeKeyInternal(key, variable_size);
+      if (value != nullptr) {
+        SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+        for (unsigned int k = 0; k < num_handles_; ++k) {
+          handles_[k]->mergeStates(source_state + payload_offsets_[k],
+                                   value + payload_offsets_[k]);
+        }
+        return true;
+      }
+    }
+    resize(0, variable_size);
+  }
+}
+
+template <typename FunctorT>
+inline bool PackedPayloadHashTable::upsertCompositeKey(
+    const std::vector<TypedValue> &key,
+    FunctorT *functor,
+    int index) {
+  const std::size_t variable_size =
+      calculateVariableLengthCompositeKeyCopySize(key);
+  for (;;) {
+    {
+      SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
+      std::uint8_t *value =
+          upsertCompositeKeyInternal(key, variable_size);
+      if (value != nullptr) {
+        (*functor)(value + payload_offsets_[index]);
+        return true;
+      }
+    }
+    resize(0, variable_size);
+  }
+}
+
+
+inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
+    const std::vector<TypedValue> &key,
+    const std::size_t variable_key_size) {
+  if (variable_key_size > 0) {
+    // Don't allocate yet, since the key may already be present. However, we
+    // do check if either the allocated variable storage space OR the free
+    // space is big enough to hold the key (at least one must be true: either
+    // the key is already present and allocated, or we need to be able to
+    // allocate enough space for it).
+    std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(
+        std::memory_order_relaxed);
+    if ((allocated_bytes < variable_key_size) &&
+        (allocated_bytes + variable_key_size >
+         key_manager_.getVariableLengthKeyStorageSize())) {
+      return nullptr;
+    }
+  }
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  void *bucket = nullptr;
+  std::atomic<std::size_t> *pending_chain_ptr;
+  std::size_t pending_chain_ptr_finish_value;
+  for (;;) {
+    if (locateBucketForInsertion(hash_code,
+                                 variable_key_size,
+                                 &bucket,
+                                 &pending_chain_ptr,
+                                 &pending_chain_ptr_finish_value)) {
+      // Found an empty bucket.
+      break;
+    } else if (bucket == nullptr) {
+      // Ran out of buckets or variable-key space.
+      return nullptr;
+    } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Found an already-existing entry for this key.
+      return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) +
+                                              kValueOffset);
+    }
+  }
+
+  // We are now writing to an empty bucket.
+  // Write the key and hash.
+  writeCompositeKeyToBucket(key, hash_code, bucket);
+
+  std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
+  std::memcpy(value, init_payload_, this->total_payload_size_);
+
+  // Update the previous chaing pointer to point to the new bucket.
+  pending_chain_ptr->store(pending_chain_ptr_finish_value,
+                           std::memory_order_release);
+
+  // Return the value.
+  return value;
+}
+
+template <bool use_two_accessors>
+inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
+    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+    const std::vector<MultiSourceAttributeId> &key_ids,
+    ValueAccessor *base_accessor,
+    ColumnVectorsValueAccessor *derived_accessor) {
+  std::size_t variable_size;
+  std::vector<TypedValue> key_vector;
+  key_vector.resize(key_ids.size());
+
+  return InvokeOnAnyValueAccessor(
+      base_accessor,
+      [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
+    bool continuing = true;
+    while (continuing) {
+      {
+        continuing = false;
+        SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
+        while (accessor->next()) {
+          if (use_two_accessors) {
+            derived_accessor->next();
+          }
+          if (this->GetCompositeKeyFromValueAccessor<use_two_accessors, true>(
+                  key_ids,
+                  accessor,
+                  derived_accessor,
+                  &key_vector)) {
+            continue;
+          }
+          variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
+          std::uint8_t *value = this->upsertCompositeKeyInternal(
+              key_vector, variable_size);
+          if (value == nullptr) {
+            continuing = true;
+            break;
+          } else {
+            SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+            for (unsigned int k = 0; k < num_handles_; ++k) {
+              const auto &ids = argument_ids[k];
+              if (ids.empty()) {
+                handles_[k]->updateStateNullary(value + payload_offsets_[k]);
+              } else {
+                const MultiSourceAttributeId &arg_id = ids.front();
+                if (use_two_accessors && arg_id.source == ValueAccessorSource::kDerived) {
+                  DCHECK_NE(arg_id.attr_id, kInvalidAttributeID);
+                  handles_[k]->updateStateUnary(derived_accessor->getTypedValue(arg_id.attr_id),
+                                                value + payload_offsets_[k]);
+                } else {
+                  handles_[k]->updateStateUnary(accessor->getTypedValue(arg_id.attr_id),
+                                                value + payload_offsets_[k]);
+                }
+              }
+            }
+          }
+        }
+      }
+      if (continuing) {
+        this->resize(0, variable_size);
+        accessor->previous();
+        if (use_two_accessors) {
+          derived_accessor->previous();
+        }
+      }
+    }
+    return true;
+  });
+}
+
+inline void PackedPayloadHashTable::writeScalarKeyToBucket(
+    const TypedValue &key,
+    const std::size_t hash_code,
+    void *bucket) {
+  *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
+                                   sizeof(std::atomic<std::size_t>)) =
+      hash_code;
+  key_manager_.writeKeyComponentToBucket(key, 0, bucket, nullptr);
+}
+
+inline void PackedPayloadHashTable::writeCompositeKeyToBucket(
+    const std::vector<TypedValue> &key,
+    const std::size_t hash_code,
+    void *bucket) {
+  DEBUG_ASSERT(key.size() == this->key_types_.size());
+  *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
+                                   sizeof(std::atomic<std::size_t>)) =
+      hash_code;
+  for (std::size_t idx = 0; idx < this->key_types_.size(); ++idx) {
+    key_manager_.writeKeyComponentToBucket(key[idx], idx, bucket, nullptr);
+  }
+}
+
+inline bool PackedPayloadHashTable::isFull(
+    const std::size_t extra_variable_storage) const {
+  if (header_->buckets_allocated.load(std::memory_order_relaxed) >=
+      header_->num_buckets) {
+    // All buckets are allocated.
+    return true;
+  }
+
+  if (extra_variable_storage > 0) {
+    if (extra_variable_storage +
+            header_->variable_length_bytes_allocated.load(
+                std::memory_order_relaxed) >
+        key_manager_.getVariableLengthKeyStorageSize()) {
+      // Not enough variable-length key storage space.
+      return true;
+    }
+  }
+
+  return false;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEach(FunctorT *functor) const {
+  std::size_t entries_visited = 0;
+  std::size_t entry_num = 0;
+  TypedValue key;
+  const std::uint8_t *value_ptr;
+  while (getNextEntry(&key, &value_ptr, &entry_num)) {
+    ++entries_visited;
+    (*functor)(key, value_ptr);
+  }
+  return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEach(
+    FunctorT *functor, const int index) const {
+  std::size_t entries_visited = 0;
+  std::size_t entry_num = 0;
+  TypedValue key;
+  const std::uint8_t *value_ptr;
+  while (getNextEntry(&key, &value_ptr, &entry_num)) {
+    ++entries_visited;
+    (*functor)(key, value_ptr + payload_offsets_[index]);
+    key.clear();
+  }
+  return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEachCompositeKey(
+    FunctorT *functor) const {
+  std::size_t entries_visited = 0;
+  std::size_t entry_num = 0;
+  std::vector<TypedValue> key;
+  const std::uint8_t *value_ptr;
+  while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
+    ++entries_visited;
+    (*functor)(key, value_ptr);
+    key.clear();
+  }
+  return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadHashTable::forEachCompositeKey(
+    FunctorT *functor,
+    const std::size_t index) const {
+  std::size_t entries_visited = 0;
+  std::size_t entry_num = 0;
+  std::vector<TypedValue> key;
+  const std::uint8_t *value_ptr;
+  while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
+    ++entries_visited;
+    (*functor)(key, value_ptr + payload_offsets_[index]);
+    key.clear();
+  }
+  return entries_visited;
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_


[3/3] incubator-quickstep git commit: Updates

Posted by ji...@apache.org.
Updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1e7a92a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1e7a92a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1e7a92a9

Branch: refs/heads/collision-free-agg
Commit: 1e7a92a94e0076d89151ea4a2ab4f68caa0572c0
Parents: 3bcb5c8
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Fri Feb 3 22:44:37 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Fri Feb 3 22:44:37 2017 -0600

----------------------------------------------------------------------
 .../aggregation/AggregationConcreteHandle.cpp   |   4 +-
 .../aggregation/AggregationConcreteHandle.hpp   |  21 +-
 expressions/aggregation/CMakeLists.txt          |   2 +-
 storage/AggregationOperationState.cpp           |  31 +-
 storage/CMakeLists.txt                          |  24 +-
 .../CollisionFreeAggregationStateHashTable.cpp  | 285 -------
 .../CollisionFreeAggregationStateHashTable.hpp  | 621 --------------
 storage/CollisionFreeVectorTable.cpp            | 283 +++++++
 storage/CollisionFreeVectorTable.hpp            | 621 ++++++++++++++
 storage/HashTableFactory.hpp                    |   8 +-
 .../PackedPayloadAggregationStateHashTable.cpp  | 439 ----------
 .../PackedPayloadAggregationStateHashTable.hpp  | 805 -------------------
 storage/PackedPayloadHashTable.cpp              | 436 ++++++++++
 storage/PackedPayloadHashTable.hpp              | 798 ++++++++++++++++++
 14 files changed, 2180 insertions(+), 2198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index 5fd7e0f..bbce29f 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -24,7 +24,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "storage/HashTableFactory.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/ValueAccessorMultiplexer.hpp"
 
 namespace quickstep {
@@ -57,7 +57,7 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
     concatenated_ids.emplace_back(arg_id);
   }
 
-  static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(distinctify_hash_table)
+  static_cast<PackedPayloadHashTable *>(distinctify_hash_table)
       ->upsertValueAccessor({}, concatenated_ids, accessor_mux);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 5b49d0d..c8d61ff 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -29,7 +29,7 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/TypedValue.hpp"
@@ -151,7 +151,7 @@ class AggregationConcreteHandle : public AggregationHandle {
       const std::size_t index,
       const std::vector<TypedValue> &group_key) const {
     const std::uint8_t *group_state =
-        static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(hash_table)
+        static_cast<const PackedPayloadHashTable &>(hash_table)
             .getSingleCompositeKey(group_key, index);
     DCHECK(group_state != nullptr)
         << "Could not find entry for specified group_key in HashTable";
@@ -217,8 +217,7 @@ StateT* AggregationConcreteHandle::
   };
 
   const auto &hash_table =
-      static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
-          distinctify_hash_table);
+      static_cast<const PackedPayloadHashTable &>(distinctify_hash_table);
   // Invoke the lambda function "aggregate_functor" on each key from the
   // distinctify hash table.
   hash_table.forEach(&aggregate_functor);
@@ -233,9 +232,8 @@ void AggregationConcreteHandle::
         const std::size_t index,
         AggregationStateHashTableBase *aggregation_hash_table) const {
   const HandleT &handle = static_cast<const HandleT &>(*this);
-  PackedPayloadSeparateChainingAggregationStateHashTable *target_hash_table =
-      static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
-          aggregation_hash_table);
+  PackedPayloadHashTable *target_hash_table =
+      static_cast<PackedPayloadHashTable *>(aggregation_hash_table);
 
   // A lambda function which will be called on each key-value pair from the
   // distinctify hash table.
@@ -256,9 +254,8 @@ void AggregationConcreteHandle::
     target_hash_table->upsertCompositeKey(key, &upserter, index);
   };
 
-  const PackedPayloadSeparateChainingAggregationStateHashTable &source_hash_table =
-      static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
-          distinctify_hash_table);
+  const PackedPayloadHashTable &source_hash_table =
+      static_cast<const PackedPayloadHashTable &>(distinctify_hash_table);
   // Invoke the lambda function "aggregate_functor" on each composite key vector
   // from the distinctify hash table.
   source_hash_table.forEachCompositeKey(&aggregate_functor);
@@ -271,8 +268,8 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
     const std::size_t index,
     std::vector<std::vector<TypedValue>> *group_by_keys) const {
   const HandleT &handle = static_cast<const HandleT &>(*this);
-  const PackedPayloadSeparateChainingAggregationStateHashTable &hash_table_concrete =
-      static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(hash_table);
+  const PackedPayloadHashTable &hash_table_concrete =
+      static_cast<const PackedPayloadHashTable &>(hash_table);
 
   if (group_by_keys->empty()) {
     if (NativeColumnVector::UsableForType(result_type)) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 0816db3..7203c8c 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -146,7 +146,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_TypedValue

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index a393185..4ffd418 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -38,10 +38,11 @@
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/AggregationOperationState.pb.h"
+#include "storage/CollisionFreeVectorTable.hpp"
 #include "storage/HashTableFactory.hpp"
 #include "storage/HashTableBase.hpp"
 #include "storage/InsertDestination.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
@@ -353,7 +354,7 @@ bool AggregationOperationState::ProtoIsValid(
 
 std::size_t AggregationOperationState::getNumPartitions() const {
   if (is_aggregate_collision_free_) {
-    return static_cast<CollisionFreeAggregationStateHashTable *>(
+    return static_cast<CollisionFreeVectorTable *>(
         collision_free_hashtable_.get())->getNumFinalizationPartitions();
   } else if (is_aggregate_partitioned_) {
     return partitioned_group_by_hashtable_pool_->getNumPartitions();
@@ -364,7 +365,7 @@ std::size_t AggregationOperationState::getNumPartitions() const {
 
 std::size_t AggregationOperationState::getNumInitializationPartitions() const {
   if (is_aggregate_collision_free_) {
-    return static_cast<CollisionFreeAggregationStateHashTable *>(
+    return static_cast<CollisionFreeVectorTable *>(
         collision_free_hashtable_.get())->getNumInitializationPartitions();
   } else {
     return 0u;
@@ -373,7 +374,7 @@ std::size_t AggregationOperationState::getNumInitializationPartitions() const {
 
 void AggregationOperationState::initializeState(const std::size_t partition_id) {
   if (is_aggregate_collision_free_) {
-    static_cast<CollisionFreeAggregationStateHashTable *>(
+    static_cast<CollisionFreeVectorTable *>(
         collision_free_hashtable_.get())->initialize(partition_id);
   } else {
     LOG(FATAL) << "AggregationOperationState::initializeState() "
@@ -512,10 +513,10 @@ void AggregationOperationState::mergeSingleState(
 }
 
 void AggregationOperationState::mergeGroupByHashTables(
-    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) const {
-  HashTableMerger merger(dst);
-  static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src)
-      ->forEachCompositeKey(&merger);
+    AggregationStateHashTableBase *src,
+    AggregationStateHashTableBase *dst) const {
+  HashTableMerger merger(static_cast<PackedPayloadHashTable *>(dst));
+  static_cast<PackedPayloadHashTable *>(src)->forEachCompositeKey(&merger);
 }
 
 void AggregationOperationState::aggregateBlockHashTable(
@@ -661,9 +662,8 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree(
     const std::size_t partition_id,
     InsertDestination *output_destination) {
   std::vector<std::unique_ptr<ColumnVector>> final_values;
-  CollisionFreeAggregationStateHashTable *hash_table =
-      static_cast<CollisionFreeAggregationStateHashTable *>(
-          collision_free_hashtable_.get());
+  CollisionFreeVectorTable *hash_table =
+      static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get());
 
   // TODO
   const std::size_t max_length =
@@ -696,8 +696,8 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree(
 void AggregationOperationState::finalizeHashTableImplPartitioned(
     const std::size_t partition_id,
     InsertDestination *output_destination) {
-  PackedPayloadSeparateChainingAggregationStateHashTable *hash_table =
-      static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
+  PackedPayloadHashTable *hash_table =
+      static_cast<PackedPayloadHashTable *>(
           partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
 
   // Each element of 'group_by_keys' is a vector of values for a particular
@@ -790,9 +790,8 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
     hash_table->destroyPayload();
   }
 
-  PackedPayloadSeparateChainingAggregationStateHashTable *final_hash_table =
-      static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
-          final_hash_table_ptr.get());
+  PackedPayloadHashTable *final_hash_table =
+      static_cast<PackedPayloadHashTable *>(final_hash_table_ptr.get());
 
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index d43d7a2..8fbb4ea 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -165,9 +165,9 @@ if(QUICKSTEP_HAVE_BITWEAVING)
               bitweaving/BitWeavingVIndexSubBlock.hpp)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
-add_library(quickstep_storage_CollisionFreeAggregationStateHashTable
-            CollisionFreeAggregationStateHashTable.cpp
-            CollisionFreeAggregationStateHashTable.hpp)
+add_library(quickstep_storage_CollisionFreeVectorTable
+            CollisionFreeVectorTable.cpp
+            CollisionFreeVectorTable.hpp)
 add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp)
 add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp)
 add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -225,9 +225,7 @@ add_library(quickstep_storage_InsertDestination_proto
 add_library(quickstep_storage_LinearOpenAddressingHashTable
             ../empty_src.cpp
             LinearOpenAddressingHashTable.hpp)
-add_library(quickstep_storage_PackedPayloadAggregationStateHashTable
-            PackedPayloadAggregationStateHashTable.cpp
-            PackedPayloadAggregationStateHashTable.hpp)
+add_library(quickstep_storage_PackedPayloadHashTable PackedPayloadHashTable.cpp PackedPayloadHashTable.hpp)
 add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
 add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
 add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
@@ -284,7 +282,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_HashTablePool
                       quickstep_storage_InsertDestination
                       quickstep_storage_PartitionedHashTablePool
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
@@ -435,7 +433,7 @@ if(QUICKSTEP_HAVE_BITWEAVING)
                         quickstep_utility_Macros)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
-target_link_libraries(quickstep_storage_CollisionFreeAggregationStateHashTable
+target_link_libraries(quickstep_storage_CollisionFreeVectorTable
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationID
@@ -714,12 +712,12 @@ target_link_libraries(quickstep_storage_HashTable_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_storage_HashTableFactory
                       glog
-                      quickstep_storage_CollisionFreeAggregationStateHashTable
+                      quickstep_storage_CollisionFreeVectorTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase
                       quickstep_storage_LinearOpenAddressingHashTable
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_SeparateChainingHashTable
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
                       quickstep_storage_TupleReference
@@ -798,7 +796,7 @@ target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_utility_Alignment
                       quickstep_utility_Macros
                       quickstep_utility_PrimeNumber)
-target_link_libraries(quickstep_storage_PackedPayloadAggregationStateHashTable
+target_link_libraries(quickstep_storage_PackedPayloadHashTable
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_storage_HashTableBase
@@ -1115,7 +1113,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_BasicColumnStoreValueAccessor
                       quickstep_storage_BloomFilterIndexSubBlock
                       quickstep_storage_CSBTreeIndexSubBlock
-                      quickstep_storage_CollisionFreeAggregationStateHashTable
+                      quickstep_storage_CollisionFreeVectorTable
                       quickstep_storage_ColumnStoreUtil
                       quickstep_storage_CompressedBlockBuilder
                       quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -1141,7 +1139,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_InsertDestination_proto
                       quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_storage_PartitionedHashTablePool
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_PreloaderThread
                       quickstep_storage_SMAIndexSubBlock
                       quickstep_storage_SeparateChainingHashTable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp b/storage/CollisionFreeAggregationStateHashTable.cpp
deleted file mode 100644
index 2f3b336..0000000
--- a/storage/CollisionFreeAggregationStateHashTable.cpp
+++ /dev/null
@@ -1,285 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "storage/CollisionFreeAggregationStateHashTable.hpp"
-
-#include <algorithm>
-#include <atomic>
-#include <cstddef>
-#include <cstdint>
-#include <cstdlib>
-#include <map>
-#include <memory>
-#include <vector>
-
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageManager.hpp"
-#include "storage/ValueAccessor.hpp"
-#include "storage/ValueAccessorMultiplexer.hpp"
-#include "storage/ValueAccessorUtil.hpp"
-#include "types/containers/ColumnVectorsValueAccessor.hpp"
-#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
-
-namespace quickstep {
-
-CollisionFreeAggregationStateHashTable::CollisionFreeAggregationStateHashTable(
-    const std::vector<const Type *> &key_types,
-    const std::size_t num_entries,
-    const std::vector<AggregationHandle *> &handles,
-    StorageManager *storage_manager)
-    : key_type_(key_types.front()),
-      num_entries_(num_entries),
-      num_handles_(handles.size()),
-      handles_(handles),
-      num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)),
-      storage_manager_(storage_manager) {
-  CHECK_EQ(1u, key_types.size());
-  DCHECK_GT(num_entries, 0u);
-
-  std::map<std::string, std::size_t> memory_offsets;
-  std::size_t required_memory = 0;
-
-  memory_offsets.emplace("existence_map", required_memory);
-  required_memory += CacheLineAlignedBytes(
-      BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
-
-  for (std::size_t i = 0; i < num_handles_; ++i) {
-    const AggregationHandle *handle = handles_[i];
-    const std::vector<const Type *> argument_types = handle->getArgumentTypes();
-
-    std::size_t state_size = 0;
-    switch (handle->getAggregationID()) {
-      case AggregationID::kCount: {
-        state_size = sizeof(std::atomic<std::size_t>);
-        break;
-      }
-      case AggregationID::kSum: {
-        CHECK_EQ(1u, argument_types.size());
-        switch (argument_types.front()->getTypeID()) {
-          case TypeID::kInt:  // Fall through
-          case TypeID::kLong:
-            state_size = sizeof(std::atomic<std::int64_t>);
-            break;
-          case TypeID::kFloat:  // Fall through
-          case TypeID::kDouble:
-            state_size = sizeof(std::atomic<double>);
-            break;
-          default:
-            LOG(FATAL) << "Not implemented";
-        }
-        break;
-      }
-      default:
-        LOG(FATAL) << "Not implemented";
-    }
-
-    memory_offsets.emplace(std::string("state") + std::to_string(i),
-                           required_memory);
-    required_memory += CacheLineAlignedBytes(state_size * num_entries);
-  }
-
-  const std::size_t num_storage_slots =
-      storage_manager_->SlotsNeededForBytes(required_memory);
-
-  const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
-  blob_ = storage_manager_->getBlobMutable(blob_id);
-
-  void *memory_start = blob_->getMemoryMutable();
-  existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
-      reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"),
-      num_entries,
-      false /* initialize */));
-
-  for (std::size_t i = 0; i < num_handles_; ++i) {
-    vec_tables_.emplace_back(
-        reinterpret_cast<char *>(memory_start) +
-            memory_offsets.at(std::string("state") + std::to_string(i)));
-  }
-
-  memory_size_ = required_memory;
-  num_init_partitions_ =
-      std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL));
-}
-
-CollisionFreeAggregationStateHashTable::~CollisionFreeAggregationStateHashTable() {
-  const block_id blob_id = blob_->getID();
-  blob_.release();
-  storage_manager_->deleteBlockOrBlobFile(blob_id);
-}
-
-void CollisionFreeAggregationStateHashTable::destroyPayload() {
-}
-
-bool CollisionFreeAggregationStateHashTable::upsertValueAccessor(
-    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
-    const std::vector<MultiSourceAttributeId> &key_ids,
-    ValueAccessorMultiplexer *accessor_mux) {
-  DCHECK_EQ(1u, key_ids.size());
-
-  const ValueAccessorSource key_source = key_ids.front().source;
-  const attribute_id key_id = key_ids.front().attr_id;
-  const bool is_key_nullable = key_type_->isNullable();
-
-  if (handles_.empty()) {
-    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-        accessor_mux->getValueAccessorBySource(key_source),
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-      upsertValueAccessorKeyOnlyHelper(is_key_nullable,
-                                       key_type_,
-                                       key_id,
-                                       accessor);
-    });
-    return true;
-  }
-
-  DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
-             == ValueAccessor::Implementation::kColumnVectors);
-  ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
-  ColumnVectorsValueAccessor *derived_accesor =
-      static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor());
-
-  for (std::size_t i = 0; i < num_handles_; ++i) {
-    DCHECK_LE(argument_ids[i].size(), 1u);
-
-    const AggregationHandle *handle = handles_[i];
-    const auto &argument_types = handle->getArgumentTypes();
-    const auto &argument_ids_i = argument_ids[i];
-
-    ValueAccessorSource argument_source;
-    attribute_id argument_id;
-    const Type *argument_type;
-    bool is_argument_nullable;
-
-    if (argument_ids_i.empty()) {
-      argument_source = ValueAccessorSource::kInvalid;
-      argument_id = kInvalidAttributeID;
-
-      DCHECK(argument_types.empty());
-      argument_type = nullptr;
-      is_argument_nullable = false;
-    } else {
-      DCHECK_EQ(1u, argument_ids_i.size());
-      argument_source = argument_ids_i.front().source;
-      argument_id = argument_ids_i.front().attr_id;
-
-      DCHECK_EQ(1u, argument_types.size());
-      argument_type = argument_types.front();
-      is_argument_nullable = argument_type->isNullable();
-    }
-
-    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-        base_accessor,
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-      if (key_source == ValueAccessorSource::kBase) {
-        if (argument_source == ValueAccessorSource::kBase) {
-          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
-                                                   is_argument_nullable,
-                                                   key_type_,
-                                                   argument_type,
-                                                   handle->getAggregationID(),
-                                                   key_id,
-                                                   argument_id,
-                                                   vec_tables_[i],
-                                                   accessor,
-                                                   accessor);
-        } else {
-          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
-                                                  is_argument_nullable,
-                                                  key_type_,
-                                                  argument_type,
-                                                  handle->getAggregationID(),
-                                                  key_id,
-                                                  argument_id,
-                                                  vec_tables_[i],
-                                                  accessor,
-                                                  derived_accesor);
-        }
-      } else {
-        if (argument_source == ValueAccessorSource::kBase) {
-          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
-                                                  is_argument_nullable,
-                                                  key_type_,
-                                                  argument_type,
-                                                  handle->getAggregationID(),
-                                                  key_id,
-                                                  argument_id,
-                                                  vec_tables_[i],
-                                                  derived_accesor,
-                                                  accessor);
-        } else {
-          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
-                                                   is_argument_nullable,
-                                                   key_type_,
-                                                   argument_type,
-                                                   handle->getAggregationID(),
-                                                   key_id,
-                                                   argument_id,
-                                                   vec_tables_[i],
-                                                   derived_accesor,
-                                                   derived_accesor);
-        }
-      }
-    });
-  }
-  return true;
-}
-
-void CollisionFreeAggregationStateHashTable::finalizeKey(
-    const std::size_t partition_id,
-    NativeColumnVector *output_cv) const {
-  const std::size_t start_position =
-      calculatePartitionStartPosition(partition_id);
-  const std::size_t end_position =
-      calculatePartitionEndPosition(partition_id);
-
-  switch (key_type_->getTypeID()) {
-    case TypeID::kInt:
-      finalizeKeyInternal<int>(start_position, end_position, output_cv);
-      return;
-    case TypeID::kLong:
-      finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
-      return;
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-void CollisionFreeAggregationStateHashTable::finalizeState(
-    const std::size_t partition_id,
-    std::size_t handle_id,
-    NativeColumnVector *output_cv) const {
-  const std::size_t start_position =
-      calculatePartitionStartPosition(partition_id);
-  const std::size_t end_position =
-      calculatePartitionEndPosition(partition_id);
-
-  const AggregationHandle *handle = handles_[handle_id];
-  const auto &argument_types = handle->getArgumentTypes();
-  const Type *argument_type =
-      argument_types.empty() ? nullptr : argument_types.front();
-
-  finalizeStateDispatchHelper(handle->getAggregationID(),
-                              argument_type,
-                              vec_tables_[handle_id],
-                              start_position,
-                              end_position,
-                              output_cv);
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeAggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.hpp b/storage/CollisionFreeAggregationStateHashTable.hpp
deleted file mode 100644
index d738e4e..0000000
--- a/storage/CollisionFreeAggregationStateHashTable.hpp
+++ /dev/null
@@ -1,621 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
-#define QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
-
-#include <atomic>
-#include <cstddef>
-#include <cstdint>
-#include <cstring>
-#include <memory>
-#include <type_traits>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
-#include "storage/HashTableBase.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageConstants.hpp"
-#include "storage/ValueAccessor.hpp"
-#include "storage/ValueAccessorMultiplexer.hpp"
-#include "types/Type.hpp"
-#include "types/TypeID.hpp"
-#include "types/TypedValue.hpp"
-#include "types/containers/ColumnVector.hpp"
-#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class ColumnVectorsValueAccessor;
-class StorageMnager;
-
-/** \addtogroup Storage
- *  @{
- */
-
-class CollisionFreeAggregationStateHashTable : public AggregationStateHashTableBase {
- public:
-  CollisionFreeAggregationStateHashTable(
-      const std::vector<const Type *> &key_types,
-      const std::size_t num_entries,
-      const std::vector<AggregationHandle *> &handles,
-      StorageManager *storage_manager);
-
-  ~CollisionFreeAggregationStateHashTable() override;
-
-  void destroyPayload() override;
-
-  inline std::size_t getNumInitializationPartitions() const {
-    return num_init_partitions_;
-  }
-
-  inline std::size_t getNumFinalizationPartitions() const {
-    return num_finalize_partitions_;
-  }
-
-  inline std::size_t getNumTuplesInPartition(
-      const std::size_t partition_id) const {
-    const std::size_t start_position =
-        calculatePartitionStartPosition(partition_id);
-    const std::size_t end_position =
-        calculatePartitionEndPosition(partition_id);
-    return existence_map_->onesCountInRange(start_position, end_position);
-  }
-
-  inline void initialize(const std::size_t partition_id) {
-    const std::size_t memory_segment_size =
-        (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_;
-    const std::size_t memory_start = memory_segment_size * partition_id;
-    std::memset(reinterpret_cast<char *>(blob_->getMemoryMutable()) + memory_start,
-                0,
-                std::min(memory_segment_size, memory_size_ - memory_start));
-  }
-
-  bool upsertValueAccessor(
-      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
-      const std::vector<MultiSourceAttributeId> &key_ids,
-      ValueAccessorMultiplexer *accessor_mux) override;
-
-  void finalizeKey(const std::size_t partition_id,
-                   NativeColumnVector *output_cv) const;
-
-  void finalizeState(const std::size_t partition_id,
-                     std::size_t handle_id,
-                     NativeColumnVector *output_cv) const;
-
- private:
-  inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
-    return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
-  }
-
-  inline std::size_t calculatePartitionLength() const {
-    const std::size_t partition_length =
-        (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_;
-    DCHECK_GE(partition_length, 0u);
-    return partition_length;
-  }
-
-  inline std::size_t calculatePartitionStartPosition(
-      const std::size_t partition_id) const {
-    return calculatePartitionLength() * partition_id;
-  }
-
-  inline std::size_t calculatePartitionEndPosition(
-      const std::size_t partition_id) const {
-    return std::min(calculatePartitionLength() * (partition_id + 1),
-                    num_entries_);
-  }
-
-  template <bool use_two_accessors, typename ...ArgTypes>
-  inline void upsertValueAccessorDispatchHelper(
-      const bool is_key_nullable,
-      const bool is_argument_nullable,
-      ArgTypes &&...args);
-
-  template <bool ...bool_values, typename ...ArgTypes>
-  inline void upsertValueAccessorDispatchHelper(
-      const Type *key_type,
-      ArgTypes &&...args);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-            typename KeyT, typename ...ArgTypes>
-  inline void upsertValueAccessorDispatchHelper(
-      const Type *argument_type,
-      const AggregationID agg_id,
-      ArgTypes &&...args);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-            typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorCountHelper(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      void *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-            typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorSumHelper(
-      const Type *argument_type,
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      void *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <typename ...ArgTypes>
-  inline void upsertValueAccessorKeyOnlyHelper(
-      const bool is_key_nullable,
-      const Type *key_type,
-      ArgTypes &&...args);
-
-  template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
-  inline void upsertValueAccessorKeyOnly(
-      const attribute_id key_attr_id,
-      KeyValueAccessorT *key_accessor);
-
-  template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
-  inline void upsertValueAccessorCountNullary(
-      const attribute_id key_attr_id,
-      std::atomic<std::size_t> *vec_table,
-      KeyValueAccessorT *key_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
-            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorCountUnary(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      std::atomic<std::size_t> *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-            typename KeyT, typename ArgumentT, typename StateT,
-            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorIntegerSum(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      std::atomic<StateT> *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-            typename KeyT, typename ArgumentT, typename StateT,
-            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorGenericSum(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      std::atomic<StateT> *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <typename KeyT>
-  inline void finalizeKeyInternal(const std::size_t start_position,
-                                  const std::size_t end_position,
-                                  NativeColumnVector *output_cv) const {
-    std::size_t loc = start_position - 1;
-    while ((loc = existence_map_->nextOne(loc)) < end_position) {
-      *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
-    }
-  }
-
-  template <typename ...ArgTypes>
-  inline void finalizeStateDispatchHelper(const AggregationID agg_id,
-                                          const Type *argument_type,
-                                          const void *vec_table,
-                                          ArgTypes &&...args) const {
-    switch (agg_id) {
-       case AggregationID::kCount:
-         finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table),
-                            std::forward<ArgTypes>(args)...);
-         return;
-       case AggregationID::kSum:
-         finalizeStateSumHelper(argument_type,
-                                vec_table,
-                                std::forward<ArgTypes>(args)...);
-         return;
-       default:
-         LOG(FATAL) << "Not supported";
-    }
-  }
-
-  template <typename ...ArgTypes>
-  inline void finalizeStateSumHelper(const Type *argument_type,
-                                     const void *vec_table,
-                                     ArgTypes &&...args) const {
-    DCHECK(argument_type != nullptr);
-
-    switch (argument_type->getTypeID()) {
-      case TypeID::kInt:    // Fall through
-      case TypeID::kLong:
-        finalizeStateSum<std::int64_t>(
-            static_cast<const std::atomic<std::int64_t> *>(vec_table),
-            std::forward<ArgTypes>(args)...);
-        return;
-      case TypeID::kFloat:  // Fall through
-      case TypeID::kDouble:
-        finalizeStateSum<double>(
-            static_cast<const std::atomic<double> *>(vec_table),
-            std::forward<ArgTypes>(args)...);
-        return;
-      default:
-        LOG(FATAL) << "Not supported";
-    }
-  }
-
-  inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table,
-                                 const std::size_t start_position,
-                                 const std::size_t end_position,
-                                 NativeColumnVector *output_cv) const {
-    std::size_t loc = start_position - 1;
-    while ((loc = existence_map_->nextOne(loc)) < end_position) {
-      *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) =
-          vec_table[loc].load(std::memory_order_relaxed);
-    }
-  }
-
-  template <typename ResultT, typename StateT>
-  inline void finalizeStateSum(const std::atomic<StateT> *vec_table,
-                               const std::size_t start_position,
-                               const std::size_t end_position,
-                               NativeColumnVector *output_cv) const {
-    std::size_t loc = start_position - 1;
-    while ((loc = existence_map_->nextOne(loc)) < end_position) {
-      *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) =
-          vec_table[loc].load(std::memory_order_relaxed);
-    }
-  }
-
-  const Type *key_type_;
-  const std::size_t num_entries_;
-
-  const std::size_t num_handles_;
-  const std::vector<AggregationHandle *> handles_;
-
-  std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_;
-  std::vector<void *> vec_tables_;
-
-  const std::size_t num_finalize_partitions_;
-
-  StorageManager *storage_manager_;
-  MutableBlobReference blob_;
-
-  std::size_t memory_size_;
-  std::size_t num_init_partitions_;
-
-  DISALLOW_COPY_AND_ASSIGN(CollisionFreeAggregationStateHashTable);
-};
-
-// ----------------------------------------------------------------------------
-// Implementations of template methods follow.
-
-template <bool use_two_accessors, typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorDispatchHelper(const bool is_key_nullable,
-                                        const bool is_argument_nullable,
-                                        ArgTypes &&...args) {
-  if (is_key_nullable) {
-    if (is_argument_nullable) {
-      upsertValueAccessorDispatchHelper<use_two_accessors, true, true>(
-          std::forward<ArgTypes>(args)...);
-    } else {
-      upsertValueAccessorDispatchHelper<use_two_accessors, true, false>(
-          std::forward<ArgTypes>(args)...);
-    }
-  } else {
-    if (is_argument_nullable) {
-      upsertValueAccessorDispatchHelper<use_two_accessors, false, true>(
-          std::forward<ArgTypes>(args)...);
-    } else {
-      upsertValueAccessorDispatchHelper<use_two_accessors, false, false>(
-          std::forward<ArgTypes>(args)...);
-    }
-  }
-}
-
-template <bool ...bool_values, typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorDispatchHelper(const Type *key_type,
-                                        ArgTypes &&...args) {
-  switch (key_type->getTypeID()) {
-    case TypeID::kInt:
-      upsertValueAccessorDispatchHelper<bool_values..., int>(
-          std::forward<ArgTypes>(args)...);
-      return;
-    case TypeID::kLong:
-      upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>(
-          std::forward<ArgTypes>(args)...);
-      return;
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-          typename KeyT, typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorDispatchHelper(const Type *argument_type,
-                                        const AggregationID agg_id,
-                                        ArgTypes &&...args) {
-  switch (agg_id) {
-     case AggregationID::kCount:
-       upsertValueAccessorCountHelper<
-           use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
-               std::forward<ArgTypes>(args)...);
-       return;
-     case AggregationID::kSum:
-       upsertValueAccessorSumHelper<
-           use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
-               argument_type, std::forward<ArgTypes>(args)...);
-       return;
-     default:
-       LOG(FATAL) << "Not supported";
-  }
-}
-
-template <typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorKeyOnlyHelper(const bool is_key_nullable,
-                                       const Type *key_type,
-                                       ArgTypes &&...args) {
-  switch (key_type->getTypeID()) {
-    case TypeID::kInt: {
-      if (is_key_nullable) {
-        upsertValueAccessorKeyOnly<true, int>(std::forward<ArgTypes>(args)...);
-      } else {
-        upsertValueAccessorKeyOnly<false, int>(std::forward<ArgTypes>(args)...);
-      }
-      return;
-    }
-    case TypeID::kLong: {
-      if (is_key_nullable) {
-        upsertValueAccessorKeyOnly<true, std::int64_t>(std::forward<ArgTypes>(args)...);
-      } else {
-        upsertValueAccessorKeyOnly<false, std::int64_t>(std::forward<ArgTypes>(args)...);
-      }
-      return;
-    }
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorKeyOnly(const attribute_id key_attr_id,
-                                 ValueAccessorT *accessor) {
-  accessor->beginIteration();
-  while (accessor->next()) {
-    const KeyT *key = static_cast<const KeyT *>(
-        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    existence_map_->setBit(*key);
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-          typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorCountHelper(const attribute_id key_attr_id,
-                                     const attribute_id argument_id,
-                                     void *vec_table,
-                                     KeyValueAccessorT *key_accessor,
-                                     ArgumentValueAccessorT *argument_accessor) {
-  DCHECK_GE(key_attr_id, 0u);
-
-  if (is_argument_nullable && argument_id != kInvalidAttributeID) {
-    upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>(
-        key_attr_id,
-        argument_id,
-        static_cast<std::atomic<std::size_t> *>(vec_table),
-        key_accessor,
-        argument_accessor);
-    return;
-  } else {
-    upsertValueAccessorCountNullary<is_key_nullable, KeyT>(
-        key_attr_id,
-        static_cast<std::atomic<std::size_t> *>(vec_table),
-        key_accessor);
-    return;
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-          typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorSumHelper(const Type *argument_type,
-                                   const attribute_id key_attr_id,
-                                   const attribute_id argument_id,
-                                   void *vec_table,
-                                   KeyValueAccessorT *key_accessor,
-                                   ArgumentValueAccessorT *argument_accessor) {
-  DCHECK_GE(key_attr_id, 0u);
-  DCHECK_GE(argument_id, 0u);
-  DCHECK(argument_type != nullptr);
-
-  switch (argument_type->getTypeID()) {
-    case TypeID::kInt:
-      upsertValueAccessorIntegerSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<std::int64_t> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    case TypeID::kLong:
-      upsertValueAccessorIntegerSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<std::int64_t> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    case TypeID::kFloat:
-      upsertValueAccessorGenericSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<double> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    case TypeID::kDouble:
-      upsertValueAccessorGenericSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<double> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorCountNullary(const attribute_id key_attr_id,
-                                      std::atomic<std::size_t> *vec_table,
-                                      ValueAccessorT *accessor) {
-  accessor->beginIteration();
-  while (accessor->next()) {
-    const KeyT *key = static_cast<const KeyT *>(
-        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    const std::size_t loc = *key;
-    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
-    existence_map_->setBit(loc);
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
-          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorCountUnary(const attribute_id key_attr_id,
-                                    const attribute_id argument_id,
-                                    std::atomic<std::size_t> *vec_table,
-                                    KeyValueAccessorT *key_accessor,
-                                    ArgumentValueAccessorT *argument_accessor) {
-  key_accessor->beginIteration();
-  if (use_two_accessors) {
-    argument_accessor->beginIteration();
-  }
-  while (key_accessor->next()) {
-    if (use_two_accessors) {
-      argument_accessor->next();
-    }
-    const KeyT *key = static_cast<const KeyT *>(
-        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    const std::size_t loc = *key;
-    existence_map_->setBit(loc);
-    if (argument_accessor->getUntypedValue(argument_id) == nullptr) {
-      continue;
-    }
-    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-          typename KeyT, typename ArgumentT, typename StateT,
-          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorIntegerSum(const attribute_id key_attr_id,
-                                    const attribute_id argument_id,
-                                    std::atomic<StateT> *vec_table,
-                                    KeyValueAccessorT *key_accessor,
-                                    ArgumentValueAccessorT *argument_accessor) {
-  key_accessor->beginIteration();
-  if (use_two_accessors) {
-    argument_accessor->beginIteration();
-  }
-  while (key_accessor->next()) {
-    if (use_two_accessors) {
-      argument_accessor->next();
-    }
-    const KeyT *key = static_cast<const KeyT *>(
-        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    const std::size_t loc = *key;
-    existence_map_->setBit(loc);
-    const ArgumentT *argument = static_cast<const ArgumentT *>(
-        argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
-    if (is_argument_nullable && argument == nullptr) {
-      continue;
-    }
-    vec_table[loc].fetch_add(*argument, std::memory_order_relaxed);
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-          typename KeyT, typename ArgumentT, typename StateT,
-          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorGenericSum(const attribute_id key_attr_id,
-                                    const attribute_id argument_id,
-                                    std::atomic<StateT> *vec_table,
-                                    KeyValueAccessorT *key_accessor,
-                                    ArgumentValueAccessorT *argument_accessor) {
-  key_accessor->beginIteration();
-  if (use_two_accessors) {
-    argument_accessor->beginIteration();
-  }
-  while (key_accessor->next()) {
-    if (use_two_accessors) {
-      argument_accessor->next();
-    }
-    const KeyT *key = static_cast<const KeyT *>(
-        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    const std::size_t loc = *key;
-    existence_map_->setBit(loc);
-    const ArgumentT *argument = static_cast<const ArgumentT *>(
-        argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
-    if (is_argument_nullable && argument == nullptr) {
-      continue;
-    }
-    const ArgumentT arg_val = *argument;
-    std::atomic<StateT> &state = vec_table[loc];
-    StateT state_val = state.load(std::memory_order_relaxed);
-    while(!state.compare_exchange_weak(state_val, state_val + arg_val)) {}
-  }
-}
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeVectorTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
new file mode 100644
index 0000000..8065cd9
--- /dev/null
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/CollisionFreeVectorTable.hpp"
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+
+namespace quickstep {
+
+CollisionFreeVectorTable::CollisionFreeVectorTable(
+    const std::vector<const Type *> &key_types,
+    const std::size_t num_entries,
+    const std::vector<AggregationHandle *> &handles,
+    StorageManager *storage_manager)
+    : key_type_(key_types.front()),
+      num_entries_(num_entries),
+      num_handles_(handles.size()),
+      handles_(handles),
+      num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)),
+      storage_manager_(storage_manager) {
+  CHECK_EQ(1u, key_types.size());
+  DCHECK_GT(num_entries, 0u);
+
+  std::map<std::string, std::size_t> memory_offsets;
+  std::size_t required_memory = 0;
+
+  memory_offsets.emplace("existence_map", required_memory);
+  required_memory += CacheLineAlignedBytes(
+      BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    const AggregationHandle *handle = handles_[i];
+    const std::vector<const Type *> argument_types = handle->getArgumentTypes();
+
+    std::size_t state_size = 0;
+    switch (handle->getAggregationID()) {
+      case AggregationID::kCount: {
+        state_size = sizeof(std::atomic<std::size_t>);
+        break;
+      }
+      case AggregationID::kSum: {
+        CHECK_EQ(1u, argument_types.size());
+        switch (argument_types.front()->getTypeID()) {
+          case TypeID::kInt:  // Fall through
+          case TypeID::kLong:
+            state_size = sizeof(std::atomic<std::int64_t>);
+            break;
+          case TypeID::kFloat:  // Fall through
+          case TypeID::kDouble:
+            state_size = sizeof(std::atomic<double>);
+            break;
+          default:
+            LOG(FATAL) << "Not implemented";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Not implemented";
+    }
+
+    memory_offsets.emplace(std::string("state") + std::to_string(i),
+                           required_memory);
+    required_memory += CacheLineAlignedBytes(state_size * num_entries);
+  }
+
+  const std::size_t num_storage_slots =
+      storage_manager_->SlotsNeededForBytes(required_memory);
+
+  const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
+  blob_ = storage_manager_->getBlobMutable(blob_id);
+
+  void *memory_start = blob_->getMemoryMutable();
+  existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
+      reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"),
+      num_entries,
+      false /* initialize */));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    vec_tables_.emplace_back(
+        reinterpret_cast<char *>(memory_start) +
+            memory_offsets.at(std::string("state") + std::to_string(i)));
+  }
+
+  memory_size_ = required_memory;
+  num_init_partitions_ =
+      std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL));
+}
+
+CollisionFreeVectorTable::~CollisionFreeVectorTable() {
+  const block_id blob_id = blob_->getID();
+  blob_.release();
+  storage_manager_->deleteBlockOrBlobFile(blob_id);
+}
+
+void CollisionFreeVectorTable::destroyPayload() {
+}
+
+bool CollisionFreeVectorTable::upsertValueAccessor(
+    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+    const std::vector<MultiSourceAttributeId> &key_ids,
+    ValueAccessorMultiplexer *accessor_mux) {
+  DCHECK_EQ(1u, key_ids.size());
+
+  const ValueAccessorSource key_source = key_ids.front().source;
+  const attribute_id key_id = key_ids.front().attr_id;
+  const bool is_key_nullable = key_type_->isNullable();
+
+  if (handles_.empty()) {
+    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+        accessor_mux->getValueAccessorBySource(key_source),
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      upsertValueAccessorKeyOnlyHelper(is_key_nullable,
+                                       key_type_,
+                                       key_id,
+                                       accessor);
+    });
+    return true;
+  }
+
+  DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
+             == ValueAccessor::Implementation::kColumnVectors);
+  ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
+  ColumnVectorsValueAccessor *derived_accesor =
+      static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor());
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    DCHECK_LE(argument_ids[i].size(), 1u);
+
+    const AggregationHandle *handle = handles_[i];
+    const auto &argument_types = handle->getArgumentTypes();
+    const auto &argument_ids_i = argument_ids[i];
+
+    ValueAccessorSource argument_source;
+    attribute_id argument_id;
+    const Type *argument_type;
+    bool is_argument_nullable;
+
+    if (argument_ids_i.empty()) {
+      argument_source = ValueAccessorSource::kInvalid;
+      argument_id = kInvalidAttributeID;
+
+      DCHECK(argument_types.empty());
+      argument_type = nullptr;
+      is_argument_nullable = false;
+    } else {
+      DCHECK_EQ(1u, argument_ids_i.size());
+      argument_source = argument_ids_i.front().source;
+      argument_id = argument_ids_i.front().attr_id;
+
+      DCHECK_EQ(1u, argument_types.size());
+      argument_type = argument_types.front();
+      is_argument_nullable = argument_type->isNullable();
+    }
+
+    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+        base_accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      if (key_source == ValueAccessorSource::kBase) {
+        if (argument_source == ValueAccessorSource::kBase) {
+          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+                                                   is_argument_nullable,
+                                                   key_type_,
+                                                   argument_type,
+                                                   handle->getAggregationID(),
+                                                   key_id,
+                                                   argument_id,
+                                                   vec_tables_[i],
+                                                   accessor,
+                                                   accessor);
+        } else {
+          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+                                                  is_argument_nullable,
+                                                  key_type_,
+                                                  argument_type,
+                                                  handle->getAggregationID(),
+                                                  key_id,
+                                                  argument_id,
+                                                  vec_tables_[i],
+                                                  accessor,
+                                                  derived_accesor);
+        }
+      } else {
+        if (argument_source == ValueAccessorSource::kBase) {
+          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+                                                  is_argument_nullable,
+                                                  key_type_,
+                                                  argument_type,
+                                                  handle->getAggregationID(),
+                                                  key_id,
+                                                  argument_id,
+                                                  vec_tables_[i],
+                                                  derived_accesor,
+                                                  accessor);
+        } else {
+          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+                                                   is_argument_nullable,
+                                                   key_type_,
+                                                   argument_type,
+                                                   handle->getAggregationID(),
+                                                   key_id,
+                                                   argument_id,
+                                                   vec_tables_[i],
+                                                   derived_accesor,
+                                                   derived_accesor);
+        }
+      }
+    });
+  }
+  return true;
+}
+
+void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id,
+                                           NativeColumnVector *output_cv) const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  switch (key_type_->getTypeID()) {
+    case TypeID::kInt:
+      finalizeKeyInternal<int>(start_position, end_position, output_cv);
+      return;
+    case TypeID::kLong:
+      finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id,
+                                             std::size_t handle_id,
+                                             NativeColumnVector *output_cv) const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  const AggregationHandle *handle = handles_[handle_id];
+  const auto &argument_types = handle->getArgumentTypes();
+  const Type *argument_type =
+      argument_types.empty() ? nullptr : argument_types.front();
+
+  finalizeStateDispatchHelper(handle->getAggregationID(),
+                              argument_type,
+                              vec_tables_[handle_id],
+                              start_position,
+                              end_position,
+                              output_cv);
+}
+
+}  // namespace quickstep


[2/3] incubator-quickstep git commit: Updates

Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
new file mode 100644
index 0000000..25f7786
--- /dev/null
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ColumnVectorsValueAccessor;
+class StorageMnager;
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+class CollisionFreeVectorTable : public AggregationStateHashTableBase {
+ public:
+  CollisionFreeVectorTable(
+      const std::vector<const Type *> &key_types,
+      const std::size_t num_entries,
+      const std::vector<AggregationHandle *> &handles,
+      StorageManager *storage_manager);
+
+  ~CollisionFreeVectorTable() override;
+
+  void destroyPayload() override;
+
+  inline std::size_t getNumInitializationPartitions() const {
+    return num_init_partitions_;
+  }
+
+  inline std::size_t getNumFinalizationPartitions() const {
+    return num_finalize_partitions_;
+  }
+
+  inline std::size_t getNumTuplesInPartition(
+      const std::size_t partition_id) const {
+    const std::size_t start_position =
+        calculatePartitionStartPosition(partition_id);
+    const std::size_t end_position =
+        calculatePartitionEndPosition(partition_id);
+    return existence_map_->onesCountInRange(start_position, end_position);
+  }
+
+  inline void initialize(const std::size_t partition_id) {
+    const std::size_t memory_segment_size =
+        (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_;
+    const std::size_t memory_start = memory_segment_size * partition_id;
+    std::memset(reinterpret_cast<char *>(blob_->getMemoryMutable()) + memory_start,
+                0,
+                std::min(memory_segment_size, memory_size_ - memory_start));
+  }
+
+  bool upsertValueAccessor(
+      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_ids,
+      ValueAccessorMultiplexer *accessor_mux) override;
+
+  void finalizeKey(const std::size_t partition_id,
+                   NativeColumnVector *output_cv) const;
+
+  void finalizeState(const std::size_t partition_id,
+                     std::size_t handle_id,
+                     NativeColumnVector *output_cv) const;
+
+ private:
+  inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
+    return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
+  }
+
+  inline std::size_t calculatePartitionLength() const {
+    const std::size_t partition_length =
+        (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_;
+    DCHECK_GE(partition_length, 0u);
+    return partition_length;
+  }
+
+  inline std::size_t calculatePartitionStartPosition(
+      const std::size_t partition_id) const {
+    return calculatePartitionLength() * partition_id;
+  }
+
+  inline std::size_t calculatePartitionEndPosition(
+      const std::size_t partition_id) const {
+    return std::min(calculatePartitionLength() * (partition_id + 1),
+                    num_entries_);
+  }
+
+  template <bool use_two_accessors, typename ...ArgTypes>
+  inline void upsertValueAccessorDispatchHelper(
+      const bool is_key_nullable,
+      const bool is_argument_nullable,
+      ArgTypes &&...args);
+
+  template <bool ...bool_values, typename ...ArgTypes>
+  inline void upsertValueAccessorDispatchHelper(
+      const Type *key_type,
+      ArgTypes &&...args);
+
+  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename ...ArgTypes>
+  inline void upsertValueAccessorDispatchHelper(
+      const Type *argument_type,
+      const AggregationID agg_id,
+      ArgTypes &&...args);
+
+  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+  inline void upsertValueAccessorCountHelper(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      void *vec_table,
+      KeyValueAccessorT *key_accessor,
+      ArgumentValueAccessorT *argument_accessor);
+
+  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+  inline void upsertValueAccessorSumHelper(
+      const Type *argument_type,
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      void *vec_table,
+      KeyValueAccessorT *key_accessor,
+      ArgumentValueAccessorT *argument_accessor);
+
+  template <typename ...ArgTypes>
+  inline void upsertValueAccessorKeyOnlyHelper(
+      const bool is_key_nullable,
+      const Type *key_type,
+      ArgTypes &&...args);
+
+  template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
+  inline void upsertValueAccessorKeyOnly(
+      const attribute_id key_attr_id,
+      KeyValueAccessorT *key_accessor);
+
+  template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
+  inline void upsertValueAccessorCountNullary(
+      const attribute_id key_attr_id,
+      std::atomic<std::size_t> *vec_table,
+      KeyValueAccessorT *key_accessor);
+
+  template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
+            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+  inline void upsertValueAccessorCountUnary(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      std::atomic<std::size_t> *vec_table,
+      KeyValueAccessorT *key_accessor,
+      ArgumentValueAccessorT *argument_accessor);
+
+  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename ArgumentT, typename StateT,
+            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+  inline void upsertValueAccessorIntegerSum(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      std::atomic<StateT> *vec_table,
+      KeyValueAccessorT *key_accessor,
+      ArgumentValueAccessorT *argument_accessor);
+
+  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename ArgumentT, typename StateT,
+            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+  inline void upsertValueAccessorGenericSum(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      std::atomic<StateT> *vec_table,
+      KeyValueAccessorT *key_accessor,
+      ArgumentValueAccessorT *argument_accessor);
+
+  template <typename KeyT>
+  inline void finalizeKeyInternal(const std::size_t start_position,
+                                  const std::size_t end_position,
+                                  NativeColumnVector *output_cv) const {
+    std::size_t loc = start_position - 1;
+    while ((loc = existence_map_->nextOne(loc)) < end_position) {
+      *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
+    }
+  }
+
+  template <typename ...ArgTypes>
+  inline void finalizeStateDispatchHelper(const AggregationID agg_id,
+                                          const Type *argument_type,
+                                          const void *vec_table,
+                                          ArgTypes &&...args) const {
+    switch (agg_id) {
+       case AggregationID::kCount:
+         finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table),
+                            std::forward<ArgTypes>(args)...);
+         return;
+       case AggregationID::kSum:
+         finalizeStateSumHelper(argument_type,
+                                vec_table,
+                                std::forward<ArgTypes>(args)...);
+         return;
+       default:
+         LOG(FATAL) << "Not supported";
+    }
+  }
+
+  template <typename ...ArgTypes>
+  inline void finalizeStateSumHelper(const Type *argument_type,
+                                     const void *vec_table,
+                                     ArgTypes &&...args) const {
+    DCHECK(argument_type != nullptr);
+
+    switch (argument_type->getTypeID()) {
+      case TypeID::kInt:    // Fall through
+      case TypeID::kLong:
+        finalizeStateSum<std::int64_t>(
+            static_cast<const std::atomic<std::int64_t> *>(vec_table),
+            std::forward<ArgTypes>(args)...);
+        return;
+      case TypeID::kFloat:  // Fall through
+      case TypeID::kDouble:
+        finalizeStateSum<double>(
+            static_cast<const std::atomic<double> *>(vec_table),
+            std::forward<ArgTypes>(args)...);
+        return;
+      default:
+        LOG(FATAL) << "Not supported";
+    }
+  }
+
+  inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table,
+                                 const std::size_t start_position,
+                                 const std::size_t end_position,
+                                 NativeColumnVector *output_cv) const {
+    std::size_t loc = start_position - 1;
+    while ((loc = existence_map_->nextOne(loc)) < end_position) {
+      *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) =
+          vec_table[loc].load(std::memory_order_relaxed);
+    }
+  }
+
+  template <typename ResultT, typename StateT>
+  inline void finalizeStateSum(const std::atomic<StateT> *vec_table,
+                               const std::size_t start_position,
+                               const std::size_t end_position,
+                               NativeColumnVector *output_cv) const {
+    std::size_t loc = start_position - 1;
+    while ((loc = existence_map_->nextOne(loc)) < end_position) {
+      *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) =
+          vec_table[loc].load(std::memory_order_relaxed);
+    }
+  }
+
+  const Type *key_type_;
+  const std::size_t num_entries_;
+
+  const std::size_t num_handles_;
+  const std::vector<AggregationHandle *> handles_;
+
+  std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_;
+  std::vector<void *> vec_tables_;
+
+  const std::size_t num_finalize_partitions_;
+
+  StorageManager *storage_manager_;
+  MutableBlobReference blob_;
+
+  std::size_t memory_size_;
+  std::size_t num_init_partitions_;
+
+  DISALLOW_COPY_AND_ASSIGN(CollisionFreeVectorTable);
+};
+
+// ----------------------------------------------------------------------------
+// Implementations of template methods follow.
+
+template <bool use_two_accessors, typename ...ArgTypes>
+inline void CollisionFreeVectorTable
+    ::upsertValueAccessorDispatchHelper(const bool is_key_nullable,
+                                        const bool is_argument_nullable,
+                                        ArgTypes &&...args) {
+  if (is_key_nullable) {
+    if (is_argument_nullable) {
+      upsertValueAccessorDispatchHelper<use_two_accessors, true, true>(
+          std::forward<ArgTypes>(args)...);
+    } else {
+      upsertValueAccessorDispatchHelper<use_two_accessors, true, false>(
+          std::forward<ArgTypes>(args)...);
+    }
+  } else {
+    if (is_argument_nullable) {
+      upsertValueAccessorDispatchHelper<use_two_accessors, false, true>(
+          std::forward<ArgTypes>(args)...);
+    } else {
+      upsertValueAccessorDispatchHelper<use_two_accessors, false, false>(
+          std::forward<ArgTypes>(args)...);
+    }
+  }
+}
+
+template <bool ...bool_values, typename ...ArgTypes>
+inline void CollisionFreeVectorTable
+    ::upsertValueAccessorDispatchHelper(const Type *key_type,
+                                        ArgTypes &&...args) {
+  switch (key_type->getTypeID()) {
+    case TypeID::kInt:
+      upsertValueAccessorDispatchHelper<bool_values..., int>(
+          std::forward<ArgTypes>(args)...);
+      return;
+    case TypeID::kLong:
+      upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>(
+          std::forward<ArgTypes>(args)...);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename ...ArgTypes>
+inline void CollisionFreeVectorTable
+    ::upsertValueAccessorDispatchHelper(const Type *argument_type,
+                                        const AggregationID agg_id,
+                                        ArgTypes &&...args) {
+  switch (agg_id) {
+     case AggregationID::kCount:
+       upsertValueAccessorCountHelper<
+           use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
+               std::forward<ArgTypes>(args)...);
+       return;
+     case AggregationID::kSum:
+       upsertValueAccessorSumHelper<
+           use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
+               argument_type, std::forward<ArgTypes>(args)...);
+       return;
+     default:
+       LOG(FATAL) << "Not supported";
+  }
+}
+
+template <typename ...ArgTypes>
+inline void CollisionFreeVectorTable
+    ::upsertValueAccessorKeyOnlyHelper(const bool is_key_nullable,
+                                       const Type *key_type,
+                                       ArgTypes &&...args) {
+  switch (key_type->getTypeID()) {
+    case TypeID::kInt: {
+      if (is_key_nullable) {
+        upsertValueAccessorKeyOnly<true, int>(std::forward<ArgTypes>(args)...);
+      } else {
+        upsertValueAccessorKeyOnly<false, int>(std::forward<ArgTypes>(args)...);
+      }
+      return;
+    }
+    case TypeID::kLong: {
+      if (is_key_nullable) {
+        upsertValueAccessorKeyOnly<true, std::int64_t>(std::forward<ArgTypes>(args)...);
+      } else {
+        upsertValueAccessorKeyOnly<false, std::int64_t>(std::forward<ArgTypes>(args)...);
+      }
+      return;
+    }
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
+inline void CollisionFreeVectorTable
+    ::upsertValueAccessorKeyOnly(const attribute_id key_attr_id,
+                                 ValueAccessorT *accessor) {
+  accessor->beginIteration();
+  while (accessor->next()) {
+    const KeyT *key = static_cast<const KeyT *>(
+        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    existence_map_->setBit(*key);
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeVectorTable
+    ::upsertValueAccessorCountHelper(const attribute_id key_attr_id,
+                                     const attribute_id argument_id,
+                                     void *vec_table,
+                                     KeyValueAccessorT *key_accessor,
+                                     ArgumentValueAccessorT *argument_accessor) {
+  DCHECK_GE(key_attr_id, 0u);
+
+  if (is_argument_nullable && argument_id != kInvalidAttributeID) {
+    upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>(
+        key_attr_id,
+        argument_id,
+        static_cast<std::atomic<std::size_t> *>(vec_table),
+        key_accessor,
+        argument_accessor);
+    return;
+  } else {
+    upsertValueAccessorCountNullary<is_key_nullable, KeyT>(
+        key_attr_id,
+        static_cast<std::atomic<std::size_t> *>(vec_table),
+        key_accessor);
+    return;
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeVectorTable
+    ::upsertValueAccessorSumHelper(const Type *argument_type,
+                                   const attribute_id key_attr_id,
+                                   const attribute_id argument_id,
+                                   void *vec_table,
+                                   KeyValueAccessorT *key_accessor,
+                                   ArgumentValueAccessorT *argument_accessor) {
+  DCHECK_GE(key_attr_id, 0u);
+  DCHECK_GE(argument_id, 0u);
+  DCHECK(argument_type != nullptr);
+
+  switch (argument_type->getTypeID()) {
+    case TypeID::kInt:
+      upsertValueAccessorIntegerSum<
+          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<std::int64_t> *>(vec_table),
+              key_accessor,
+              argument_accessor);
+      return;
+    case TypeID::kLong:
+      upsertValueAccessorIntegerSum<
+          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<std::int64_t> *>(vec_table),
+              key_accessor,
+              argument_accessor);
+      return;
+    case TypeID::kFloat:
+      upsertValueAccessorGenericSum<
+          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<double> *>(vec_table),
+              key_accessor,
+              argument_accessor);
+      return;
+    case TypeID::kDouble:
+      upsertValueAccessorGenericSum<
+          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<double> *>(vec_table),
+              key_accessor,
+              argument_accessor);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
+inline void CollisionFreeVectorTable
+    ::upsertValueAccessorCountNullary(const attribute_id key_attr_id,
+                                      std::atomic<std::size_t> *vec_table,
+                                      ValueAccessorT *accessor) {
+  accessor->beginIteration();
+  while (accessor->next()) {
+    const KeyT *key = static_cast<const KeyT *>(
+        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
+    existence_map_->setBit(loc);
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
+          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeVectorTable
+    ::upsertValueAccessorCountUnary(const attribute_id key_attr_id,
+                                    const attribute_id argument_id,
+                                    std::atomic<std::size_t> *vec_table,
+                                    KeyValueAccessorT *key_accessor,
+                                    ArgumentValueAccessorT *argument_accessor) {
+  key_accessor->beginIteration();
+  if (use_two_accessors) {
+    argument_accessor->beginIteration();
+  }
+  while (key_accessor->next()) {
+    if (use_two_accessors) {
+      argument_accessor->next();
+    }
+    const KeyT *key = static_cast<const KeyT *>(
+        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    existence_map_->setBit(loc);
+    if (argument_accessor->getUntypedValue(argument_id) == nullptr) {
+      continue;
+    }
+    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename ArgumentT, typename StateT,
+          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeVectorTable
+    ::upsertValueAccessorIntegerSum(const attribute_id key_attr_id,
+                                    const attribute_id argument_id,
+                                    std::atomic<StateT> *vec_table,
+                                    KeyValueAccessorT *key_accessor,
+                                    ArgumentValueAccessorT *argument_accessor) {
+  key_accessor->beginIteration();
+  if (use_two_accessors) {
+    argument_accessor->beginIteration();
+  }
+  while (key_accessor->next()) {
+    if (use_two_accessors) {
+      argument_accessor->next();
+    }
+    const KeyT *key = static_cast<const KeyT *>(
+        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    existence_map_->setBit(loc);
+    const ArgumentT *argument = static_cast<const ArgumentT *>(
+        argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
+    if (is_argument_nullable && argument == nullptr) {
+      continue;
+    }
+    vec_table[loc].fetch_add(*argument, std::memory_order_relaxed);
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename ArgumentT, typename StateT,
+          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeVectorTable
+    ::upsertValueAccessorGenericSum(const attribute_id key_attr_id,
+                                    const attribute_id argument_id,
+                                    std::atomic<StateT> *vec_table,
+                                    KeyValueAccessorT *key_accessor,
+                                    ArgumentValueAccessorT *argument_accessor) {
+  key_accessor->beginIteration();
+  if (use_two_accessors) {
+    argument_accessor->beginIteration();
+  }
+  while (key_accessor->next()) {
+    if (use_two_accessors) {
+      argument_accessor->next();
+    }
+    const KeyT *key = static_cast<const KeyT *>(
+        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    existence_map_->setBit(loc);
+    const ArgumentT *argument = static_cast<const ArgumentT *>(
+        argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
+    if (is_argument_nullable && argument == nullptr) {
+      continue;
+    }
+    const ArgumentT arg_val = *argument;
+    std::atomic<StateT> &state = vec_table[loc];
+    StateT state_val = state.load(std::memory_order_relaxed);
+    while(!state.compare_exchange_weak(state_val, state_val + arg_val)) {}
+  }
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index d95362c..b88bf87 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -24,12 +24,12 @@
 #include <string>
 #include <vector>
 
-#include "storage/CollisionFreeAggregationStateHashTable.hpp"
+#include "storage/CollisionFreeVectorTable.hpp"
 #include "storage/HashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTable.pb.h"
 #include "storage/LinearOpenAddressingHashTable.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/SeparateChainingHashTable.hpp"
 #include "storage/SimpleScalarSeparateChainingHashTable.hpp"
 #include "storage/TupleReference.hpp"
@@ -346,10 +346,10 @@ class AggregationStateHashTableFactory {
       StorageManager *storage_manager) {
     switch (hash_table_type) {
       case HashTableImplType::kSeparateChaining:
-        return new PackedPayloadSeparateChainingAggregationStateHashTable(
+        return new PackedPayloadHashTable(
             key_types, num_entries, handles, storage_manager);
       case HashTableImplType::kCollisionFreeVector:
-        return new CollisionFreeAggregationStateHashTable(
+        return new CollisionFreeVectorTable(
             key_types, num_entries, handles, storage_manager);
       default: {
         LOG(FATAL) << "Unrecognized HashTableImplType in "

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/PackedPayloadAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadAggregationStateHashTable.cpp b/storage/PackedPayloadAggregationStateHashTable.cpp
deleted file mode 100644
index 0292092..0000000
--- a/storage/PackedPayloadAggregationStateHashTable.cpp
+++ /dev/null
@@ -1,439 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
-
-namespace quickstep {
-
-PackedPayloadSeparateChainingAggregationStateHashTable
-    ::PackedPayloadSeparateChainingAggregationStateHashTable(
-        const std::vector<const Type *> &key_types,
-        const std::size_t num_entries,
-        const std::vector<AggregationHandle *> &handles,
-        StorageManager *storage_manager)
-        : key_types_(key_types),
-          num_handles_(handles.size()),
-          handles_(handles),
-          total_payload_size_(ComputeTotalPayloadSize(handles)),
-          storage_manager_(storage_manager),
-          kBucketAlignment(alignof(std::atomic<std::size_t>)),
-          kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
-          key_manager_(key_types_, kValueOffset + total_payload_size_),
-          bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
-  std::size_t payload_offset_running_sum = sizeof(SpinMutex);
-  for (const auto *handle : handles) {
-    payload_offsets_.emplace_back(payload_offset_running_sum);
-    payload_offset_running_sum += handle->getPayloadSize();
-  }
-
-  // NOTE(jianqiao): Potential memory leak / double freeing by copying from
-  // init_payload to buckets if payload contains out of line data.
-  init_payload_ =
-      static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
-  DCHECK(init_payload_ != nullptr);
-
-  for (std::size_t i = 0; i < num_handles_; ++i) {
-    handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
-  }
-
-  // Bucket size always rounds up to the alignment requirement of the atomic
-  // size_t "next" pointer at the front or a ValueT, whichever is larger.
-  //
-  // Give base HashTable information about what key components are stored
-  // inline from 'key_manager_'.
-  setKeyInline(key_manager_.getKeyInline());
-
-  // Pick out a prime number of slots and calculate storage requirements.
-  std::size_t num_slots_tmp =
-      get_next_prime_number(num_entries * kHashTableLoadFactor);
-  std::size_t required_memory =
-      sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
-      (num_slots_tmp / kHashTableLoadFactor) *
-          (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
-  std::size_t num_storage_slots =
-      this->storage_manager_->SlotsNeededForBytes(required_memory);
-  if (num_storage_slots == 0) {
-    FATAL_ERROR(
-        "Storage requirement for SeparateChainingHashTable "
-        "exceeds maximum allocation size.");
-  }
-
-  // Get a StorageBlob to hold the hash table.
-  const block_id blob_id =
-      this->storage_manager_->createBlob(num_storage_slots);
-  this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
-
-  void *aligned_memory_start = this->blob_->getMemoryMutable();
-  std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
-  if (align(alignof(Header),
-            sizeof(Header),
-            aligned_memory_start,
-            available_memory) == nullptr) {
-    // With current values from StorageConstants.hpp, this should be
-    // impossible. A blob is at least 1 MB, while a Header has alignment
-    // requirement of just kCacheLineBytes (64 bytes).
-    FATAL_ERROR(
-        "StorageBlob used to hold resizable "
-        "SeparateChainingHashTable is too small to meet alignment "
-        "requirements of SeparateChainingHashTable::Header.");
-  } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
-    // This should also be impossible, since the StorageManager allocates slots
-    // aligned to kCacheLineBytes.
-    DEV_WARNING("StorageBlob memory adjusted by "
-                << (num_storage_slots * kSlotSizeBytes - available_memory)
-                << " bytes to meet alignment requirement for "
-                << "SeparateChainingHashTable::Header.");
-  }
-
-  // Locate the header.
-  header_ = static_cast<Header *>(aligned_memory_start);
-  aligned_memory_start =
-      static_cast<char *>(aligned_memory_start) + sizeof(Header);
-  available_memory -= sizeof(Header);
-
-  // Recompute the number of slots & buckets using the actual available memory.
-  // Most likely, we got some extra free bucket space due to "rounding up" to
-  // the storage blob's size. It's also possible (though very unlikely) that we
-  // will wind up with fewer buckets than we initially wanted because of screwy
-  // alignment requirements for ValueT.
-  std::size_t num_buckets_tmp =
-      available_memory /
-      (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
-       key_manager_.getEstimatedVariableKeySize());
-  num_slots_tmp =
-      get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor);
-  num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
-  DEBUG_ASSERT(num_slots_tmp > 0);
-  DEBUG_ASSERT(num_buckets_tmp > 0);
-
-  // Locate the slot array.
-  slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
-  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
-                         sizeof(std::atomic<std::size_t>) * num_slots_tmp;
-  available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
-
-  // Locate the buckets.
-  buckets_ = aligned_memory_start;
-  // Extra-paranoid: If ValueT has an alignment requirement greater than that
-  // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
-  // array.
-  if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) ==
-      nullptr) {
-    FATAL_ERROR(
-        "StorageBlob used to hold resizable "
-        "SeparateChainingHashTable is too small to meet "
-        "alignment requirements of buckets.");
-  } else if (buckets_ != aligned_memory_start) {
-    DEV_WARNING(
-        "Bucket array start position adjusted to meet alignment "
-        "requirement for SeparateChainingHashTable's value type.");
-    if (num_buckets_tmp * bucket_size_ > available_memory) {
-      --num_buckets_tmp;
-    }
-  }
-
-  // Fill in the header.
-  header_->num_slots = num_slots_tmp;
-  header_->num_buckets = num_buckets_tmp;
-  header_->buckets_allocated.store(0, std::memory_order_relaxed);
-  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
-  available_memory -= bucket_size_ * (header_->num_buckets);
-
-  // Locate variable-length key storage region, and give it all the remaining
-  // bytes in the blob.
-  key_manager_.setVariableLengthStorageInfo(
-      static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_,
-      available_memory,
-      &(header_->variable_length_bytes_allocated));
-}
-
-PackedPayloadSeparateChainingAggregationStateHashTable
-    ::~PackedPayloadSeparateChainingAggregationStateHashTable() {
-  if (blob_.valid()) {
-    const block_id blob_id = blob_->getID();
-    blob_.release();
-    storage_manager_->deleteBlockOrBlobFile(blob_id);
-  }
-  std::free(init_payload_);
-}
-
-void PackedPayloadSeparateChainingAggregationStateHashTable::clear() {
-  const std::size_t used_buckets =
-      header_->buckets_allocated.load(std::memory_order_relaxed);
-  // Destroy existing values, if necessary.
-  destroyPayload();
-
-  // Zero-out slot array.
-  std::memset(
-      slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
-
-  // Zero-out used buckets.
-  std::memset(buckets_, 0x0, used_buckets * bucket_size_);
-
-  header_->buckets_allocated.store(0, std::memory_order_relaxed);
-  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
-  key_manager_.zeroNextVariableLengthKeyOffset();
-}
-
-void PackedPayloadSeparateChainingAggregationStateHashTable::destroyPayload() {
-  const std::size_t num_buckets =
-      header_->buckets_allocated.load(std::memory_order_relaxed);
-  void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset;
-  for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
-    for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) {
-      void *value_internal_ptr =
-          static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id];
-      handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr));
-    }
-    bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_;
-  }
-}
-
-bool PackedPayloadSeparateChainingAggregationStateHashTable::upsertValueAccessor(
-    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
-    const std::vector<MultiSourceAttributeId> &key_attr_ids,
-    ValueAccessorMultiplexer *accessor_mux) {
-  DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
-             == ValueAccessor::Implementation::kColumnVectors);
-  ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
-  ColumnVectorsValueAccessor *derived_accessor =
-      static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor());
-
-  if (derived_accessor == nullptr) {
-    return upsertValueAccessorCompositeKeyInternal<false>(argument_ids,
-                                                          key_attr_ids,
-                                                          base_accessor,
-                                                          derived_accessor);
-  } else {
-    return upsertValueAccessorCompositeKeyInternal<true>(argument_ids,
-                                                         key_attr_ids,
-                                                         base_accessor,
-                                                         derived_accessor);
-  }
-}
-
-void PackedPayloadSeparateChainingAggregationStateHashTable
-    ::resize(const std::size_t extra_buckets,
-             const std::size_t extra_variable_storage,
-             const std::size_t retry_num) {
-  // A retry should never be necessary with this implementation of HashTable.
-  // Separate chaining ensures that any resized hash table with more buckets
-  // than the original table will be able to hold more entries than the
-  // original.
-  DEBUG_ASSERT(retry_num == 0);
-
-  SpinSharedMutexExclusiveLock<true> write_lock(this->resize_shared_mutex_);
-
-  // Recheck whether the hash table is still full. Note that multiple threads
-  // might wait to rebuild this hash table simultaneously. Only the first one
-  // should do the rebuild.
-  if (!isFull(extra_variable_storage)) {
-    return;
-  }
-
-  // Approximately double the number of buckets and slots.
-  //
-  // TODO(chasseur): It may be worth it to more than double the number of
-  // buckets here so that we can maintain a good, sparse fill factor for a
-  // longer time as more values are inserted. Such behavior should take into
-  // account kHashTableLoadFactor.
-  std::size_t resized_num_slots = get_next_prime_number(
-      (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2);
-  std::size_t variable_storage_required =
-      (resized_num_slots / kHashTableLoadFactor) *
-      key_manager_.getEstimatedVariableKeySize();
-  const std::size_t original_variable_storage_used =
-      header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
-  // If this resize was triggered by a too-large variable-length key, bump up
-  // the variable-length storage requirement.
-  if ((extra_variable_storage > 0) &&
-      (extra_variable_storage + original_variable_storage_used >
-       key_manager_.getVariableLengthKeyStorageSize())) {
-    variable_storage_required += extra_variable_storage;
-  }
-
-  const std::size_t resized_memory_required =
-      sizeof(Header) + resized_num_slots * sizeof(std::atomic<std::size_t>) +
-      (resized_num_slots / kHashTableLoadFactor) * bucket_size_ +
-      variable_storage_required;
-  const std::size_t resized_storage_slots =
-      this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
-  if (resized_storage_slots == 0) {
-    FATAL_ERROR(
-        "Storage requirement for resized SeparateChainingHashTable "
-        "exceeds maximum allocation size.");
-  }
-
-  // Get a new StorageBlob to hold the resized hash table.
-  const block_id resized_blob_id =
-      this->storage_manager_->createBlob(resized_storage_slots);
-  MutableBlobReference resized_blob =
-      this->storage_manager_->getBlobMutable(resized_blob_id);
-
-  // Locate data structures inside the new StorageBlob.
-  void *aligned_memory_start = resized_blob->getMemoryMutable();
-  std::size_t available_memory = resized_storage_slots * kSlotSizeBytes;
-  if (align(alignof(Header),
-            sizeof(Header),
-            aligned_memory_start,
-            available_memory) == nullptr) {
-    // Should be impossible, as noted in constructor.
-    FATAL_ERROR(
-        "StorageBlob used to hold resized SeparateChainingHashTable "
-        "is too small to meet alignment requirements of "
-        "LinearOpenAddressingHashTable::Header.");
-  } else if (aligned_memory_start != resized_blob->getMemoryMutable()) {
-    // Again, should be impossible.
-    DEV_WARNING("In SeparateChainingHashTable::resize(), StorageBlob "
-                << "memory adjusted by "
-                << (resized_num_slots * kSlotSizeBytes - available_memory)
-                << " bytes to meet alignment requirement for "
-                << "LinearOpenAddressingHashTable::Header.");
-  }
-
-  Header *resized_header = static_cast<Header *>(aligned_memory_start);
-  aligned_memory_start =
-      static_cast<char *>(aligned_memory_start) + sizeof(Header);
-  available_memory -= sizeof(Header);
-
-  // As in constructor, recompute the number of slots and buckets using the
-  // actual available memory.
-  std::size_t resized_num_buckets =
-      (available_memory - extra_variable_storage) /
-      (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
-       key_manager_.getEstimatedVariableKeySize());
-  resized_num_slots =
-      get_previous_prime_number(resized_num_buckets * kHashTableLoadFactor);
-  resized_num_buckets = resized_num_slots / kHashTableLoadFactor;
-
-  // Locate slot array.
-  std::atomic<std::size_t> *resized_slots =
-      static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
-  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
-                         sizeof(std::atomic<std::size_t>) * resized_num_slots;
-  available_memory -= sizeof(std::atomic<std::size_t>) * resized_num_slots;
-
-  // As in constructor, we will be extra paranoid and use align() to locate the
-  // start of the array of buckets, as well.
-  void *resized_buckets = aligned_memory_start;
-  if (align(
-          kBucketAlignment, bucket_size_, resized_buckets, available_memory) ==
-      nullptr) {
-    FATAL_ERROR(
-        "StorageBlob used to hold resized SeparateChainingHashTable "
-        "is too small to meet alignment requirements of buckets.");
-  } else if (resized_buckets != aligned_memory_start) {
-    DEV_WARNING(
-        "Bucket array start position adjusted to meet alignment "
-        "requirement for SeparateChainingHashTable's value type.");
-    if (resized_num_buckets * bucket_size_ + variable_storage_required >
-        available_memory) {
-      --resized_num_buckets;
-    }
-  }
-  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
-                         resized_num_buckets * bucket_size_;
-  available_memory -= resized_num_buckets * bucket_size_;
-
-  void *resized_variable_length_key_storage = aligned_memory_start;
-  const std::size_t resized_variable_length_key_storage_size = available_memory;
-
-  const std::size_t original_buckets_used =
-      header_->buckets_allocated.load(std::memory_order_relaxed);
-
-  // Initialize the header.
-  resized_header->num_slots = resized_num_slots;
-  resized_header->num_buckets = resized_num_buckets;
-  resized_header->buckets_allocated.store(original_buckets_used,
-                                          std::memory_order_relaxed);
-  resized_header->variable_length_bytes_allocated.store(
-      original_variable_storage_used, std::memory_order_relaxed);
-
-  // Bulk-copy buckets. This is safe because:
-  //     1. The "next" pointers will be adjusted when rebuilding chains below.
-  //     2. The hash codes will stay the same.
-  //     3. For key components:
-  //       a. Inline keys will stay exactly the same.
-  //       b. Offsets into variable-length storage will remain valid, because
-  //          we also do a byte-for-byte copy of variable-length storage below.
-  //       c. Absolute external pointers will still point to the same address.
-  //       d. Relative pointers are not used with resizable hash tables.
-  //     4. If values are not trivially copyable, then we invoke ValueT's copy
-  //        or move constructor with placement new.
-  // NOTE(harshad) - Regarding point 4 above, as this is a specialized
-  // hash table implemented for aggregation, the values are trivially copyable,
-  // therefore we don't need to invoke payload values' copy/move constructors.
-  std::memcpy(resized_buckets, buckets_, original_buckets_used * bucket_size_);
-
-  // Copy over variable-length key components, if any.
-  if (original_variable_storage_used > 0) {
-    DEBUG_ASSERT(original_variable_storage_used ==
-                 key_manager_.getNextVariableLengthKeyOffset());
-    DEBUG_ASSERT(original_variable_storage_used <=
-                 resized_variable_length_key_storage_size);
-    std::memcpy(resized_variable_length_key_storage,
-                key_manager_.getVariableLengthKeyStorage(),
-                original_variable_storage_used);
-  }
-
-  destroyPayload();
-
-  // Make resized structures active.
-  std::swap(this->blob_, resized_blob);
-  header_ = resized_header;
-  slots_ = resized_slots;
-  buckets_ = resized_buckets;
-  key_manager_.setVariableLengthStorageInfo(
-      resized_variable_length_key_storage,
-      resized_variable_length_key_storage_size,
-      &(resized_header->variable_length_bytes_allocated));
-
-  // Drop the old blob.
-  const block_id old_blob_id = resized_blob->getID();
-  resized_blob.release();
-  this->storage_manager_->deleteBlockOrBlobFile(old_blob_id);
-
-  // Rebuild chains.
-  void *current_bucket = buckets_;
-  for (std::size_t bucket_num = 0; bucket_num < original_buckets_used;
-       ++bucket_num) {
-    std::atomic<std::size_t> *next_ptr =
-        static_cast<std::atomic<std::size_t> *>(current_bucket);
-    const std::size_t hash_code = *reinterpret_cast<const std::size_t *>(
-        static_cast<const char *>(current_bucket) +
-        sizeof(std::atomic<std::size_t>));
-
-    const std::size_t slot_number = hash_code % header_->num_slots;
-    std::size_t slot_ptr_value = 0;
-    if (slots_[slot_number].compare_exchange_strong(
-            slot_ptr_value, bucket_num + 1, std::memory_order_relaxed)) {
-      // This bucket is the first in the chain for this block, so reset its
-      // next pointer to 0.
-      next_ptr->store(0, std::memory_order_relaxed);
-    } else {
-      // A chain already exists starting from this slot, so put this bucket at
-      // the head.
-      next_ptr->store(slot_ptr_value, std::memory_order_relaxed);
-      slots_[slot_number].store(bucket_num + 1, std::memory_order_relaxed);
-    }
-    current_bucket = static_cast<char *>(current_bucket) + bucket_size_;
-  }
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/PackedPayloadAggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadAggregationStateHashTable.hpp b/storage/PackedPayloadAggregationStateHashTable.hpp
deleted file mode 100644
index 85d4f8a..0000000
--- a/storage/PackedPayloadAggregationStateHashTable.hpp
+++ /dev/null
@@ -1,805 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_
-#define QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_
-
-#include <algorithm>
-#include <atomic>
-#include <cstddef>
-#include <cstdlib>
-#include <limits>
-#include <memory>
-#include <type_traits>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/HashTableBase.hpp"
-#include "storage/HashTableKeyManager.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConstants.hpp"
-#include "storage/StorageManager.hpp"
-#include "storage/TupleReference.hpp"
-#include "storage/ValueAccessor.hpp"
-#include "storage/ValueAccessorMultiplexer.hpp"
-#include "storage/ValueAccessorUtil.hpp"
-#include "threading/SpinMutex.hpp"
-#include "threading/SpinSharedMutex.hpp"
-#include "types/Type.hpp"
-#include "types/TypedValue.hpp"
-#include "types/containers/ColumnVectorsValueAccessor.hpp"
-#include "utility/Alignment.hpp"
-#include "utility/HashPair.hpp"
-#include "utility/Macros.hpp"
-#include "utility/PrimeNumber.hpp"
-
-namespace quickstep {
-
-/** \addtogroup Storage
- *  @{
- */
-
-class PackedPayloadSeparateChainingAggregationStateHashTable
-    : public AggregationStateHashTableBase {
- public:
-  PackedPayloadSeparateChainingAggregationStateHashTable(
-      const std::vector<const Type *> &key_types,
-      const std::size_t num_entries,
-      const std::vector<AggregationHandle *> &handles,
-      StorageManager *storage_manager);
-
-  ~PackedPayloadSeparateChainingAggregationStateHashTable() override;
-
-  void clear();
-
-  void destroyPayload() override;
-
-  bool upsertValueAccessor(
-      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
-      const std::vector<MultiSourceAttributeId> &key_ids,
-      ValueAccessorMultiplexer *accessor_mux) override;
-
-  inline block_id getBlobId() const {
-    return blob_->getID();
-  }
-
-  inline std::size_t numEntries() const {
-    return header_->buckets_allocated.load(std::memory_order_relaxed);
-  }
-
-  inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
-                                 const std::uint8_t *source_state);
-
-  template <typename FunctorT>
-  inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
-                                 FunctorT *functor,
-                                 int index);
-
-  inline const std::uint8_t* getSingleCompositeKey(
-      const std::vector<TypedValue> &key) const;
-
-  inline const std::uint8_t* getSingleCompositeKey(
-      const std::vector<TypedValue> &key,
-      const int index) const;
-
-  template <typename FunctorT>
-  inline std::size_t forEach(FunctorT *functor) const;
-
-  template <typename FunctorT>
-  inline std::size_t forEach(FunctorT *functor, const int index) const;
-
-  template <typename FunctorT>
-  inline std::size_t forEachCompositeKey(FunctorT *functor) const;
-
-  template <typename FunctorT>
-  inline std::size_t forEachCompositeKey(FunctorT *functor,
-                                         const int index) const;
-
- private:
-  void resize(const std::size_t extra_buckets,
-              const std::size_t extra_variable_storage,
-              const std::size_t retry_num = 0);
-
-  inline std::size_t calculateVariableLengthCompositeKeyCopySize(
-      const std::vector<TypedValue> &key) const {
-    std::size_t total = 0;
-    for (std::vector<TypedValue>::size_type idx = 0; idx < key.size(); ++idx) {
-      if (!(*key_inline_)[idx]) {
-        total += key[idx].getDataSize();
-      }
-    }
-    return total;
-  }
-
-  inline bool getNextEntry(TypedValue *key,
-                           const std::uint8_t **value,
-                           std::size_t *entry_num) const;
-
-  inline bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
-                                       const std::uint8_t **value,
-                                       std::size_t *entry_num) const;
-
-  inline std::uint8_t* upsertCompositeKeyInternal(
-      const std::vector<TypedValue> &key,
-      const std::size_t variable_key_size);
-
-  template <bool use_two_accessors>
-  inline bool upsertValueAccessorCompositeKeyInternal(
-      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
-      const std::vector<MultiSourceAttributeId> &key_ids,
-      ValueAccessor *base_accessor,
-      ColumnVectorsValueAccessor *derived_accessor);
-
-  // Generate a hash for a composite key by hashing each component of 'key' and
-  // mixing their bits with CombineHashes().
-  inline std::size_t hashCompositeKey(const std::vector<TypedValue> &key) const;
-
-  // Set information about which key components are stored inline. This usually
-  // comes from a HashTableKeyManager, and is set by the constructor of a
-  // subclass of HashTable.
-  inline void setKeyInline(const std::vector<bool> *key_inline) {
-    scalar_key_inline_ = key_inline->front();
-    key_inline_ = key_inline;
-  }
-
-  inline static std::size_t ComputeTotalPayloadSize(
-      const std::vector<AggregationHandle *> &handles) {
-    std::size_t total_payload_size = sizeof(SpinMutex);
-    for (const auto *handle : handles) {
-      total_payload_size += handle->getPayloadSize();
-    }
-    return total_payload_size;
-  }
-
-  // Assign '*key_vector' with the attribute values specified by 'key_ids' at
-  // the current position of 'accessor'. If 'check_for_null_keys' is true, stops
-  // and returns true if any of the values is null, otherwise returns false.
-  template <bool use_two_accessors,
-            bool check_for_null_keys,
-            typename ValueAccessorT>
-  inline static bool GetCompositeKeyFromValueAccessor(
-      const std::vector<MultiSourceAttributeId> &key_ids,
-      const ValueAccessorT *accessor,
-      const ColumnVectorsValueAccessor *derived_accessor,
-      std::vector<TypedValue> *key_vector) {
-    for (std::size_t key_idx = 0; key_idx < key_ids.size(); ++key_idx) {
-      const MultiSourceAttributeId &key_id = key_ids[key_idx];
-      if (use_two_accessors && key_id.source == ValueAccessorSource::kDerived) {
-        (*key_vector)[key_idx] = derived_accessor->getTypedValue(key_id.attr_id);
-      } else {
-        (*key_vector)[key_idx] = accessor->getTypedValue(key_id.attr_id);
-      }
-      if (check_for_null_keys && (*key_vector)[key_idx].isNull()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  struct Header {
-    std::size_t num_slots;
-    std::size_t num_buckets;
-    alignas(kCacheLineBytes) std::atomic<std::size_t> buckets_allocated;
-    alignas(kCacheLineBytes)
-        std::atomic<std::size_t> variable_length_bytes_allocated;
-  };
-
-  // Type(s) of keys.
-  const std::vector<const Type *> key_types_;
-
-  // Information about whether key components are stored inline or in a
-  // separate variable-length storage region. This is usually determined by a
-  // HashTableKeyManager and set by calling setKeyInline().
-  bool scalar_key_inline_;
-  const std::vector<bool> *key_inline_;
-
-  const std::size_t num_handles_;
-  const std::vector<AggregationHandle *> handles_;
-
-  std::size_t total_payload_size_;
-  std::vector<std::size_t> payload_offsets_;
-  std::uint8_t *init_payload_;
-
-  StorageManager *storage_manager_;
-  MutableBlobReference blob_;
-
-  // Locked in shared mode for most operations, exclusive mode during resize.
-  // Not locked at all for non-resizable HashTables.
-  alignas(kCacheLineBytes) SpinSharedMutex<true> resize_shared_mutex_;
-
-  std::size_t kBucketAlignment;
-
-  // Value's offset in a bucket is the first alignof(ValueT) boundary after the
-  // next pointer and hash code.
-  std::size_t kValueOffset;
-
-  // Round bucket size up to a multiple of kBucketAlignment.
-  constexpr std::size_t ComputeBucketSize(const std::size_t fixed_key_size) {
-    return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) /
-             kBucketAlignment) +
-            1) *
-           kBucketAlignment;
-  }
-
-  // Attempt to find an empty bucket to insert 'hash_code' into, starting after
-  // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
-  // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
-  // empty bucket is found. Returns false if 'allow_duplicate_keys' is false
-  // and a hash collision is found (caller should then check whether there is a
-  // genuine key collision or the hash collision is spurious). Returns false
-  // and sets '*bucket' to NULL if there are no more empty buckets in the hash
-  // table. If 'variable_key_allocation_required' is nonzero, this method will
-  // attempt to allocate storage for a variable-length key BEFORE allocating a
-  // bucket, so that no bucket number below 'header_->num_buckets' is ever
-  // deallocated after being allocated.
-  inline bool locateBucketForInsertion(
-      const std::size_t hash_code,
-      const std::size_t variable_key_allocation_required,
-      void **bucket,
-      std::atomic<std::size_t> **pending_chain_ptr,
-      std::size_t *pending_chain_ptr_finish_value);
-
-  // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was
-  // found by locateBucketForInsertion(). Assumes that storage for a
-  // variable-length key copy (if any) was already allocated by a successful
-  // call to allocateVariableLengthKeyStorage().
-  inline void writeScalarKeyToBucket(
-      const TypedValue &key,
-      const std::size_t hash_code,
-      void *bucket);
-
-  // Write a composite 'key' and its 'hash_code' into the '*bucket', which was
-  // found by locateBucketForInsertion(). Assumes that storage for
-  // variable-length key copies (if any) was already allocated by a successful
-  // call to allocateVariableLengthKeyStorage().
-  inline void writeCompositeKeyToBucket(
-      const std::vector<TypedValue> &key,
-      const std::size_t hash_code,
-      void *bucket);
-
-  // Determine whether it is actually necessary to resize this hash table.
-  // Checks that there is at least one unallocated bucket, and that there is
-  // at least 'extra_variable_storage' bytes of variable-length storage free.
-  inline bool isFull(const std::size_t extra_variable_storage) const;
-
-  // Helper object to manage key storage.
-  HashTableKeyManager<false, true> key_manager_;
-
-  // In-memory structure is as follows:
-  //   - SeparateChainingHashTable::Header
-  //   - Array of slots, interpreted as follows:
-  //       - 0 = Points to nothing (empty)
-  //       - SIZE_T_MAX = Pending (some thread is starting a chain from this
-  //         slot and will overwrite it soon)
-  //       - Anything else = The number of the first bucket in the chain for
-  //         this slot PLUS ONE (i.e. subtract one to get the actual bucket
-  //         number).
-  //   - Array of buckets, each of which is:
-  //       - atomic size_t "next" pointer, interpreted the same as slots above.
-  //       - size_t hash value
-  //       - possibly some unused bytes as needed so that ValueT's alignment
-  //         requirement is met
-  //       - ValueT value slot
-  //       - fixed-length key storage (which may include pointers to external
-  //         memory or offsets of variable length keys stored within this hash
-  //         table)
-  //       - possibly some additional unused bytes so that bucket size is a
-  //         multiple of both alignof(std::atomic<std::size_t>) and
-  //         alignof(ValueT)
-  //   - Variable-length key storage region (referenced by offsets stored in
-  //     fixed-length keys).
-  Header *header_;
-
-  std::atomic<std::size_t> *slots_;
-  void *buckets_;
-  const std::size_t bucket_size_;
-
-  DISALLOW_COPY_AND_ASSIGN(PackedPayloadSeparateChainingAggregationStateHashTable);
-};
-
-/** @} */
-
-// ----------------------------------------------------------------------------
-// Implementations of template class methods follow.
-
-class HashTableMerger {
- public:
-  /**
-   * @brief Constructor
-   *
-   * @param handle The Aggregation handle being used.
-   * @param destination_hash_table The destination hash table to which other
-   *        hash tables will be merged.
-   **/
-  explicit HashTableMerger(
-      AggregationStateHashTableBase *destination_hash_table)
-      : destination_hash_table_(
-            static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
-                destination_hash_table)) {}
-
-  /**
-   * @brief The operator for the functor.
-   *
-   * @param group_by_key The group by key being merged.
-   * @param source_state The aggregation state for the given key in the source
-   *        aggregation hash table.
-   **/
-  inline void operator()(const std::vector<TypedValue> &group_by_key,
-                         const std::uint8_t *source_state) {
-    destination_hash_table_->upsertCompositeKey(group_by_key, source_state);
-  }
-
- private:
-  PackedPayloadSeparateChainingAggregationStateHashTable *destination_hash_table_;
-
-  DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
-};
-
-inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
-    ::hashCompositeKey(const std::vector<TypedValue> &key) const {
-  DEBUG_ASSERT(!key.empty());
-  DEBUG_ASSERT(key.size() == key_types_.size());
-  std::size_t hash = key.front().getHash();
-  for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
-       key_it != key.end();
-       ++key_it) {
-    hash = CombineHashes(hash, key_it->getHash());
-  }
-  return hash;
-}
-
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
-    ::getNextEntry(TypedValue *key,
-                   const std::uint8_t **value,
-                   std::size_t *entry_num) const {
-  if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
-    const char *bucket =
-        static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
-    *key = key_manager_.getKeyComponentTyped(bucket, 0);
-    *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
-    ++(*entry_num);
-    return true;
-  } else {
-    return false;
-  }
-}
-
-
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
-    ::getNextEntryCompositeKey(std::vector<TypedValue> *key,
-                               const std::uint8_t **value,
-                               std::size_t *entry_num) const {
-  if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
-    const char *bucket =
-        static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
-    for (std::vector<const Type *>::size_type key_idx = 0;
-         key_idx < this->key_types_.size();
-         ++key_idx) {
-      key->emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx));
-    }
-    *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
-    ++(*entry_num);
-    return true;
-  } else {
-    return false;
-  }
-}
-
-
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
-    ::locateBucketForInsertion(const std::size_t hash_code,
-                               const std::size_t variable_key_allocation_required,
-                               void **bucket,
-                               std::atomic<std::size_t> **pending_chain_ptr,
-                               std::size_t *pending_chain_ptr_finish_value) {
-  if (*bucket == nullptr) {
-    *pending_chain_ptr = &(slots_[hash_code % header_->num_slots]);
-  } else {
-    *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
-  }
-  for (;;) {
-    std::size_t existing_chain_ptr = 0;
-    if ((*pending_chain_ptr)
-            ->compare_exchange_strong(existing_chain_ptr,
-                                      std::numeric_limits<std::size_t>::max(),
-                                      std::memory_order_acq_rel)) {
-      // Got to the end of the chain. Allocate a new bucket.
-
-      // First, allocate variable-length key storage, if needed (i.e. if this
-      // is an upsert and we didn't allocate up-front).
-      if (!key_manager_.allocateVariableLengthKeyStorage(
-              variable_key_allocation_required)) {
-        // Ran out of variable-length storage.
-        (*pending_chain_ptr)->store(0, std::memory_order_release);
-        *bucket = nullptr;
-        return false;
-      }
-
-      const std::size_t allocated_bucket_num =
-          header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed);
-      if (allocated_bucket_num >= header_->num_buckets) {
-        // Ran out of buckets.
-        header_->buckets_allocated.fetch_sub(1, std::memory_order_relaxed);
-        (*pending_chain_ptr)->store(0, std::memory_order_release);
-        *bucket = nullptr;
-        return false;
-      } else {
-        *bucket =
-            static_cast<char *>(buckets_) + allocated_bucket_num * bucket_size_;
-        *pending_chain_ptr_finish_value = allocated_bucket_num + 1;
-        return true;
-      }
-    }
-    // Spin until the real "next" pointer is available.
-    while (existing_chain_ptr == std::numeric_limits<std::size_t>::max()) {
-      existing_chain_ptr =
-          (*pending_chain_ptr)->load(std::memory_order_acquire);
-    }
-    if (existing_chain_ptr == 0) {
-      // Other thread had to roll back, so try again.
-      continue;
-    }
-    // Chase the next pointer.
-    *bucket =
-        static_cast<char *>(buckets_) + (existing_chain_ptr - 1) * bucket_size_;
-    *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
-    const std::size_t hash_in_bucket = *reinterpret_cast<const std::size_t *>(
-        static_cast<const char *>(*bucket) +
-        sizeof(std::atomic<std::size_t>));
-    if (hash_in_bucket == hash_code) {
-      return false;
-    }
-  }
-}
-
-inline const std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable
-    ::getSingleCompositeKey(const std::vector<TypedValue> &key) const {
-  DEBUG_ASSERT(this->key_types_.size() == key.size());
-
-  const std::size_t hash_code = this->hashCompositeKey(key);
-  std::size_t bucket_ref =
-      slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
-  while (bucket_ref != 0) {
-    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
-    const char *bucket =
-        static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
-    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
-        bucket + sizeof(std::atomic<std::size_t>));
-    if ((bucket_hash == hash_code) &&
-        key_manager_.compositeKeyCollisionCheck(key, bucket)) {
-      // Match located.
-      return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
-    }
-    bucket_ref =
-        reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
-            std::memory_order_relaxed);
-  }
-
-  // Reached the end of the chain and didn't find a match.
-  return nullptr;
-}
-
-inline const std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable
-    ::getSingleCompositeKey(const std::vector<TypedValue> &key,
-                            const int index) const {
-  DEBUG_ASSERT(this->key_types_.size() == key.size());
-
-  const std::size_t hash_code = this->hashCompositeKey(key);
-  std::size_t bucket_ref =
-      slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
-  while (bucket_ref != 0) {
-    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
-    const char *bucket =
-        static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
-    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
-        bucket + sizeof(std::atomic<std::size_t>));
-    if ((bucket_hash == hash_code) &&
-        key_manager_.compositeKeyCollisionCheck(key, bucket)) {
-      // Match located.
-      return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset) +
-             this->payload_offsets_[index];
-    }
-    bucket_ref =
-        reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
-            std::memory_order_relaxed);
-  }
-
-  // Reached the end of the chain and didn't find a match.
-  return nullptr;
-}
-
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
-    ::upsertCompositeKey(const std::vector<TypedValue> &key,
-                         const std::uint8_t *source_state) {
-  const std::size_t variable_size =
-      calculateVariableLengthCompositeKeyCopySize(key);
-  for (;;) {
-    {
-      SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
-      std::uint8_t *value =
-          upsertCompositeKeyInternal(key, variable_size);
-      if (value != nullptr) {
-        SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
-        for (unsigned int k = 0; k < num_handles_; ++k) {
-          handles_[k]->mergeStates(source_state + payload_offsets_[k],
-                                   value + payload_offsets_[k]);
-        }
-        return true;
-      }
-    }
-    resize(0, variable_size);
-  }
-}
-
-template <typename FunctorT>
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
-    ::upsertCompositeKey(const std::vector<TypedValue> &key,
-                         FunctorT *functor,
-                         int index) {
-  const std::size_t variable_size =
-      calculateVariableLengthCompositeKeyCopySize(key);
-  for (;;) {
-    {
-      SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
-      std::uint8_t *value =
-          upsertCompositeKeyInternal(key, variable_size);
-      if (value != nullptr) {
-        (*functor)(value + payload_offsets_[index]);
-        return true;
-      }
-    }
-    resize(0, variable_size);
-  }
-}
-
-
-inline std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable
-    ::upsertCompositeKeyInternal(const std::vector<TypedValue> &key,
-                                 const std::size_t variable_key_size) {
-  if (variable_key_size > 0) {
-    // Don't allocate yet, since the key may already be present. However, we
-    // do check if either the allocated variable storage space OR the free
-    // space is big enough to hold the key (at least one must be true: either
-    // the key is already present and allocated, or we need to be able to
-    // allocate enough space for it).
-    std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(
-        std::memory_order_relaxed);
-    if ((allocated_bytes < variable_key_size) &&
-        (allocated_bytes + variable_key_size >
-         key_manager_.getVariableLengthKeyStorageSize())) {
-      return nullptr;
-    }
-  }
-
-  const std::size_t hash_code = this->hashCompositeKey(key);
-  void *bucket = nullptr;
-  std::atomic<std::size_t> *pending_chain_ptr;
-  std::size_t pending_chain_ptr_finish_value;
-  for (;;) {
-    if (locateBucketForInsertion(hash_code,
-                                 variable_key_size,
-                                 &bucket,
-                                 &pending_chain_ptr,
-                                 &pending_chain_ptr_finish_value)) {
-      // Found an empty bucket.
-      break;
-    } else if (bucket == nullptr) {
-      // Ran out of buckets or variable-key space.
-      return nullptr;
-    } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
-      // Found an already-existing entry for this key.
-      return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) +
-                                              kValueOffset);
-    }
-  }
-
-  // We are now writing to an empty bucket.
-  // Write the key and hash.
-  writeCompositeKeyToBucket(key, hash_code, bucket);
-
-  std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
-  std::memcpy(value, init_payload_, this->total_payload_size_);
-
-  // Update the previous chaing pointer to point to the new bucket.
-  pending_chain_ptr->store(pending_chain_ptr_finish_value,
-                           std::memory_order_release);
-
-  // Return the value.
-  return value;
-}
-
-template <bool use_two_accessors>
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable
-    ::upsertValueAccessorCompositeKeyInternal(
-        const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
-        const std::vector<MultiSourceAttributeId> &key_ids,
-        ValueAccessor *base_accessor,
-        ColumnVectorsValueAccessor *derived_accessor) {
-  std::size_t variable_size;
-  std::vector<TypedValue> key_vector;
-  key_vector.resize(key_ids.size());
-
-  return InvokeOnAnyValueAccessor(
-      base_accessor,
-      [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
-    bool continuing = true;
-    while (continuing) {
-      {
-        continuing = false;
-        SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
-        while (accessor->next()) {
-          if (use_two_accessors) {
-            derived_accessor->next();
-          }
-          if (this->GetCompositeKeyFromValueAccessor<use_two_accessors, true>(
-                  key_ids,
-                  accessor,
-                  derived_accessor,
-                  &key_vector)) {
-            continue;
-          }
-          variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
-          std::uint8_t *value = this->upsertCompositeKeyInternal(
-              key_vector, variable_size);
-          if (value == nullptr) {
-            continuing = true;
-            break;
-          } else {
-            SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
-            for (unsigned int k = 0; k < num_handles_; ++k) {
-              const auto &ids = argument_ids[k];
-              if (ids.empty()) {
-                handles_[k]->updateStateNullary(value + payload_offsets_[k]);
-              } else {
-                const MultiSourceAttributeId &arg_id = ids.front();
-                if (use_two_accessors && arg_id.source == ValueAccessorSource::kDerived) {
-                  DCHECK_NE(arg_id.attr_id, kInvalidAttributeID);
-                  handles_[k]->updateStateUnary(derived_accessor->getTypedValue(arg_id.attr_id),
-                                                value + payload_offsets_[k]);
-                } else {
-                  handles_[k]->updateStateUnary(accessor->getTypedValue(arg_id.attr_id),
-                                                value + payload_offsets_[k]);
-                }
-              }
-            }
-          }
-        }
-      }
-      if (continuing) {
-        this->resize(0, variable_size);
-        accessor->previous();
-        if (use_two_accessors) {
-          derived_accessor->previous();
-        }
-      }
-    }
-    return true;
-  });
-}
-
-inline void PackedPayloadSeparateChainingAggregationStateHashTable
-    ::writeScalarKeyToBucket(const TypedValue &key,
-                             const std::size_t hash_code,
-                             void *bucket) {
-  *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
-                                   sizeof(std::atomic<std::size_t>)) =
-      hash_code;
-  key_manager_.writeKeyComponentToBucket(key, 0, bucket, nullptr);
-}
-
-inline void PackedPayloadSeparateChainingAggregationStateHashTable
-    ::writeCompositeKeyToBucket(const std::vector<TypedValue> &key,
-                                const std::size_t hash_code,
-                                void *bucket) {
-  DEBUG_ASSERT(key.size() == this->key_types_.size());
-  *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
-                                   sizeof(std::atomic<std::size_t>)) =
-      hash_code;
-  for (std::size_t idx = 0; idx < this->key_types_.size(); ++idx) {
-    key_manager_.writeKeyComponentToBucket(key[idx], idx, bucket, nullptr);
-  }
-}
-
-inline bool PackedPayloadSeparateChainingAggregationStateHashTable::isFull(
-    const std::size_t extra_variable_storage) const {
-  if (header_->buckets_allocated.load(std::memory_order_relaxed) >=
-      header_->num_buckets) {
-    // All buckets are allocated.
-    return true;
-  }
-
-  if (extra_variable_storage > 0) {
-    if (extra_variable_storage +
-            header_->variable_length_bytes_allocated.load(
-                std::memory_order_relaxed) >
-        key_manager_.getVariableLengthKeyStorageSize()) {
-      // Not enough variable-length key storage space.
-      return true;
-    }
-  }
-
-  return false;
-}
-
-template <typename FunctorT>
-inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
-    ::forEach(FunctorT *functor) const {
-  std::size_t entries_visited = 0;
-  std::size_t entry_num = 0;
-  TypedValue key;
-  const std::uint8_t *value_ptr;
-  while (getNextEntry(&key, &value_ptr, &entry_num)) {
-    ++entries_visited;
-    (*functor)(key, value_ptr);
-  }
-  return entries_visited;
-}
-
-template <typename FunctorT>
-inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
-    ::forEach(FunctorT *functor, const int index) const {
-  std::size_t entries_visited = 0;
-  std::size_t entry_num = 0;
-  TypedValue key;
-  const std::uint8_t *value_ptr;
-  while (getNextEntry(&key, &value_ptr, &entry_num)) {
-    ++entries_visited;
-    (*functor)(key, value_ptr + payload_offsets_[index]);
-    key.clear();
-  }
-  return entries_visited;
-}
-
-template <typename FunctorT>
-inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
-    ::forEachCompositeKey(FunctorT *functor) const {
-  std::size_t entries_visited = 0;
-  std::size_t entry_num = 0;
-  std::vector<TypedValue> key;
-  const std::uint8_t *value_ptr;
-  while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
-    ++entries_visited;
-    (*functor)(key, value_ptr);
-    key.clear();
-  }
-  return entries_visited;
-}
-
-template <typename FunctorT>
-inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
-    ::forEachCompositeKey(FunctorT *functor,
-                          const int index) const {
-  std::size_t entries_visited = 0;
-  std::size_t entry_num = 0;
-  std::vector<TypedValue> key;
-  const std::uint8_t *value_ptr;
-  while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
-    ++entries_visited;
-    (*functor)(key, value_ptr + payload_offsets_[index]);
-    key.clear();
-  }
-  return entries_visited;
-}
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_