You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/31 23:38:09 UTC

[3/8] incubator-quickstep git commit: Initial commit.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/PackedPayloadAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadAggregationStateHashTable.cpp b/storage/PackedPayloadAggregationStateHashTable.cpp
new file mode 100644
index 0000000..34c4177
--- /dev/null
+++ b/storage/PackedPayloadAggregationStateHashTable.cpp
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+
+namespace quickstep {
+
+PackedPayloadSeparateChainingAggregationStateHashTable
+    ::PackedPayloadSeparateChainingAggregationStateHashTable(
+        const std::vector<const Type *> &key_types,
+        const std::size_t num_entries,
+        const std::vector<AggregationHandle *> &handles,
+        StorageManager *storage_manager)
+        : key_types_(key_types),
+          num_handles_(handles.size()),
+          handles_(handles),
+          total_payload_size_(ComputeTotalPayloadSize(handles)),
+          storage_manager_(storage_manager),
+          kBucketAlignment(alignof(std::atomic<std::size_t>)),
+          kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
+          key_manager_(key_types_, kValueOffset + total_payload_size_),
+          bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
+  std::size_t payload_offset_running_sum = sizeof(SpinMutex);
+  for (const auto *handle : handles) {
+    payload_offsets_.emplace_back(payload_offset_running_sum);
+    payload_offset_running_sum += handle->getPayloadSize();
+  }
+
+  // NOTE(jianqiao): Potential memory leak / double freeing by copying from
+  // init_payload to buckets if payload contains out of line data.
+  init_payload_ =
+      static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
+  DCHECK(init_payload_ != nullptr);
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
+  }
+
+  // Bucket size always rounds up to the alignment requirement of the atomic
+  // size_t "next" pointer at the front or a ValueT, whichever is larger.
+  //
+  // Give base HashTable information about what key components are stored
+  // inline from 'key_manager_'.
+  setKeyInline(key_manager_.getKeyInline());
+
+  // Pick out a prime number of slots and calculate storage requirements.
+  std::size_t num_slots_tmp =
+      get_next_prime_number(num_entries * kHashTableLoadFactor);
+  std::size_t required_memory =
+      sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
+      (num_slots_tmp / kHashTableLoadFactor) *
+          (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
+  std::size_t num_storage_slots =
+      this->storage_manager_->SlotsNeededForBytes(required_memory);
+  if (num_storage_slots == 0) {
+    FATAL_ERROR(
+        "Storage requirement for SeparateChainingHashTable "
+        "exceeds maximum allocation size.");
+  }
+
+  // Get a StorageBlob to hold the hash table.
+  const block_id blob_id =
+      this->storage_manager_->createBlob(num_storage_slots);
+  this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
+
+  void *aligned_memory_start = this->blob_->getMemoryMutable();
+  std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
+  if (align(alignof(Header),
+            sizeof(Header),
+            aligned_memory_start,
+            available_memory) == nullptr) {
+    // With current values from StorageConstants.hpp, this should be
+    // impossible. A blob is at least 1 MB, while a Header has alignment
+    // requirement of just kCacheLineBytes (64 bytes).
+    FATAL_ERROR(
+        "StorageBlob used to hold resizable "
+        "SeparateChainingHashTable is too small to meet alignment "
+        "requirements of SeparateChainingHashTable::Header.");
+  } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
+    // This should also be impossible, since the StorageManager allocates slots
+    // aligned to kCacheLineBytes.
+    DEV_WARNING("StorageBlob memory adjusted by "
+                << (num_storage_slots * kSlotSizeBytes - available_memory)
+                << " bytes to meet alignment requirement for "
+                << "SeparateChainingHashTable::Header.");
+  }
+
+  // Locate the header.
+  header_ = static_cast<Header *>(aligned_memory_start);
+  aligned_memory_start =
+      static_cast<char *>(aligned_memory_start) + sizeof(Header);
+  available_memory -= sizeof(Header);
+
+  // Recompute the number of slots & buckets using the actual available memory.
+  // Most likely, we got some extra free bucket space due to "rounding up" to
+  // the storage blob's size. It's also possible (though very unlikely) that we
+  // will wind up with fewer buckets than we initially wanted because of screwy
+  // alignment requirements for ValueT.
+  std::size_t num_buckets_tmp =
+      available_memory /
+      (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
+       key_manager_.getEstimatedVariableKeySize());
+  num_slots_tmp =
+      get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor);
+  num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
+  DEBUG_ASSERT(num_slots_tmp > 0);
+  DEBUG_ASSERT(num_buckets_tmp > 0);
+
+  // Locate the slot array.
+  slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
+  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+                         sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+  available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
+
+  // Locate the buckets.
+  buckets_ = aligned_memory_start;
+  // Extra-paranoid: If ValueT has an alignment requirement greater than that
+  // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
+  // array.
+  if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) ==
+      nullptr) {
+    FATAL_ERROR(
+        "StorageBlob used to hold resizable "
+        "SeparateChainingHashTable is too small to meet "
+        "alignment requirements of buckets.");
+  } else if (buckets_ != aligned_memory_start) {
+    DEV_WARNING(
+        "Bucket array start position adjusted to meet alignment "
+        "requirement for SeparateChainingHashTable's value type.");
+    if (num_buckets_tmp * bucket_size_ > available_memory) {
+      --num_buckets_tmp;
+    }
+  }
+
+  // Fill in the header.
+  header_->num_slots = num_slots_tmp;
+  header_->num_buckets = num_buckets_tmp;
+  header_->buckets_allocated.store(0, std::memory_order_relaxed);
+  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+  available_memory -= bucket_size_ * (header_->num_buckets);
+
+  // Locate variable-length key storage region, and give it all the remaining
+  // bytes in the blob.
+  key_manager_.setVariableLengthStorageInfo(
+      static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_,
+      available_memory,
+      &(header_->variable_length_bytes_allocated));
+}
+
+PackedPayloadSeparateChainingAggregationStateHashTable
+    ::~PackedPayloadSeparateChainingAggregationStateHashTable() {
+  if (blob_.valid()) {
+    const block_id blob_id = blob_->getID();
+    blob_.release();
+    storage_manager_->deleteBlockOrBlobFile(blob_id);
+  }
+  std::free(init_payload_);
+}
+
+void PackedPayloadSeparateChainingAggregationStateHashTable::clear() {
+  const std::size_t used_buckets =
+      header_->buckets_allocated.load(std::memory_order_relaxed);
+  // Destroy existing values, if necessary.
+  destroyPayload();
+
+  // Zero-out slot array.
+  std::memset(
+      slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
+
+  // Zero-out used buckets.
+  std::memset(buckets_, 0x0, used_buckets * bucket_size_);
+
+  header_->buckets_allocated.store(0, std::memory_order_relaxed);
+  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
+  key_manager_.zeroNextVariableLengthKeyOffset();
+}
+
+void PackedPayloadSeparateChainingAggregationStateHashTable::destroyPayload() {
+  const std::size_t num_buckets =
+      header_->buckets_allocated.load(std::memory_order_relaxed);
+  void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset;
+  for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
+    for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) {
+      void *value_internal_ptr =
+          static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id];
+      handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr));
+    }
+    bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_;
+  }
+}
+
+bool PackedPayloadSeparateChainingAggregationStateHashTable::upsertValueAccessor(
+    const std::vector<std::vector<attribute_id>> &argument_ids,
+    const std::vector<attribute_id> &key_attr_ids,
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor) {
+  if (aux_accessor == nullptr) {
+    return upsertValueAccessorCompositeKeyInternal<false>(argument_ids,
+                                                          key_attr_ids,
+                                                          accessor,
+                                                          aux_accessor);
+  } else {
+    return upsertValueAccessorCompositeKeyInternal<true>(argument_ids,
+                                                         key_attr_ids,
+                                                         accessor,
+                                                         aux_accessor);
+  }
+}
+
+void PackedPayloadSeparateChainingAggregationStateHashTable
+    ::resize(const std::size_t extra_buckets,
+             const std::size_t extra_variable_storage,
+             const std::size_t retry_num) {
+  // A retry should never be necessary with this implementation of HashTable.
+  // Separate chaining ensures that any resized hash table with more buckets
+  // than the original table will be able to hold more entries than the
+  // original.
+  DEBUG_ASSERT(retry_num == 0);
+
+  SpinSharedMutexExclusiveLock<true> write_lock(this->resize_shared_mutex_);
+
+  // Recheck whether the hash table is still full. Note that multiple threads
+  // might wait to rebuild this hash table simultaneously. Only the first one
+  // should do the rebuild.
+  if (!isFull(extra_variable_storage)) {
+    return;
+  }
+
+  // Approximately double the number of buckets and slots.
+  //
+  // TODO(chasseur): It may be worth it to more than double the number of
+  // buckets here so that we can maintain a good, sparse fill factor for a
+  // longer time as more values are inserted. Such behavior should take into
+  // account kHashTableLoadFactor.
+  std::size_t resized_num_slots = get_next_prime_number(
+      (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2);
+  std::size_t variable_storage_required =
+      (resized_num_slots / kHashTableLoadFactor) *
+      key_manager_.getEstimatedVariableKeySize();
+  const std::size_t original_variable_storage_used =
+      header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
+  // If this resize was triggered by a too-large variable-length key, bump up
+  // the variable-length storage requirement.
+  if ((extra_variable_storage > 0) &&
+      (extra_variable_storage + original_variable_storage_used >
+       key_manager_.getVariableLengthKeyStorageSize())) {
+    variable_storage_required += extra_variable_storage;
+  }
+
+  const std::size_t resized_memory_required =
+      sizeof(Header) + resized_num_slots * sizeof(std::atomic<std::size_t>) +
+      (resized_num_slots / kHashTableLoadFactor) * bucket_size_ +
+      variable_storage_required;
+  const std::size_t resized_storage_slots =
+      this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
+  if (resized_storage_slots == 0) {
+    FATAL_ERROR(
+        "Storage requirement for resized SeparateChainingHashTable "
+        "exceeds maximum allocation size.");
+  }
+
+  // Get a new StorageBlob to hold the resized hash table.
+  const block_id resized_blob_id =
+      this->storage_manager_->createBlob(resized_storage_slots);
+  MutableBlobReference resized_blob =
+      this->storage_manager_->getBlobMutable(resized_blob_id);
+
+  // Locate data structures inside the new StorageBlob.
+  void *aligned_memory_start = resized_blob->getMemoryMutable();
+  std::size_t available_memory = resized_storage_slots * kSlotSizeBytes;
+  if (align(alignof(Header),
+            sizeof(Header),
+            aligned_memory_start,
+            available_memory) == nullptr) {
+    // Should be impossible, as noted in constructor.
+    FATAL_ERROR(
+        "StorageBlob used to hold resized SeparateChainingHashTable "
+        "is too small to meet alignment requirements of "
+        "LinearOpenAddressingHashTable::Header.");
+  } else if (aligned_memory_start != resized_blob->getMemoryMutable()) {
+    // Again, should be impossible.
+    DEV_WARNING("In SeparateChainingHashTable::resize(), StorageBlob "
+                << "memory adjusted by "
+                << (resized_num_slots * kSlotSizeBytes - available_memory)
+                << " bytes to meet alignment requirement for "
+                << "LinearOpenAddressingHashTable::Header.");
+  }
+
+  Header *resized_header = static_cast<Header *>(aligned_memory_start);
+  aligned_memory_start =
+      static_cast<char *>(aligned_memory_start) + sizeof(Header);
+  available_memory -= sizeof(Header);
+
+  // As in constructor, recompute the number of slots and buckets using the
+  // actual available memory.
+  std::size_t resized_num_buckets =
+      (available_memory - extra_variable_storage) /
+      (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
+       key_manager_.getEstimatedVariableKeySize());
+  resized_num_slots =
+      get_previous_prime_number(resized_num_buckets * kHashTableLoadFactor);
+  resized_num_buckets = resized_num_slots / kHashTableLoadFactor;
+
+  // Locate slot array.
+  std::atomic<std::size_t> *resized_slots =
+      static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
+  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+                         sizeof(std::atomic<std::size_t>) * resized_num_slots;
+  available_memory -= sizeof(std::atomic<std::size_t>) * resized_num_slots;
+
+  // As in constructor, we will be extra paranoid and use align() to locate the
+  // start of the array of buckets, as well.
+  void *resized_buckets = aligned_memory_start;
+  if (align(
+          kBucketAlignment, bucket_size_, resized_buckets, available_memory) ==
+      nullptr) {
+    FATAL_ERROR(
+        "StorageBlob used to hold resized SeparateChainingHashTable "
+        "is too small to meet alignment requirements of buckets.");
+  } else if (resized_buckets != aligned_memory_start) {
+    DEV_WARNING(
+        "Bucket array start position adjusted to meet alignment "
+        "requirement for SeparateChainingHashTable's value type.");
+    if (resized_num_buckets * bucket_size_ + variable_storage_required >
+        available_memory) {
+      --resized_num_buckets;
+    }
+  }
+  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
+                         resized_num_buckets * bucket_size_;
+  available_memory -= resized_num_buckets * bucket_size_;
+
+  void *resized_variable_length_key_storage = aligned_memory_start;
+  const std::size_t resized_variable_length_key_storage_size = available_memory;
+
+  const std::size_t original_buckets_used =
+      header_->buckets_allocated.load(std::memory_order_relaxed);
+
+  // Initialize the header.
+  resized_header->num_slots = resized_num_slots;
+  resized_header->num_buckets = resized_num_buckets;
+  resized_header->buckets_allocated.store(original_buckets_used,
+                                          std::memory_order_relaxed);
+  resized_header->variable_length_bytes_allocated.store(
+      original_variable_storage_used, std::memory_order_relaxed);
+
+  // Bulk-copy buckets. This is safe because:
+  //     1. The "next" pointers will be adjusted when rebuilding chains below.
+  //     2. The hash codes will stay the same.
+  //     3. For key components:
+  //       a. Inline keys will stay exactly the same.
+  //       b. Offsets into variable-length storage will remain valid, because
+  //          we also do a byte-for-byte copy of variable-length storage below.
+  //       c. Absolute external pointers will still point to the same address.
+  //       d. Relative pointers are not used with resizable hash tables.
+  //     4. If values are not trivially copyable, then we invoke ValueT's copy
+  //        or move constructor with placement new.
+  // NOTE(harshad) - Regarding point 4 above, as this is a specialized
+  // hash table implemented for aggregation, the values are trivially copyable,
+  // therefore we don't need to invoke payload values' copy/move constructors.
+  std::memcpy(resized_buckets, buckets_, original_buckets_used * bucket_size_);
+
+  // Copy over variable-length key components, if any.
+  if (original_variable_storage_used > 0) {
+    DEBUG_ASSERT(original_variable_storage_used ==
+                 key_manager_.getNextVariableLengthKeyOffset());
+    DEBUG_ASSERT(original_variable_storage_used <=
+                 resized_variable_length_key_storage_size);
+    std::memcpy(resized_variable_length_key_storage,
+                key_manager_.getVariableLengthKeyStorage(),
+                original_variable_storage_used);
+  }
+
+  destroyPayload();
+
+  // Make resized structures active.
+  std::swap(this->blob_, resized_blob);
+  header_ = resized_header;
+  slots_ = resized_slots;
+  buckets_ = resized_buckets;
+  key_manager_.setVariableLengthStorageInfo(
+      resized_variable_length_key_storage,
+      resized_variable_length_key_storage_size,
+      &(resized_header->variable_length_bytes_allocated));
+
+  // Drop the old blob.
+  const block_id old_blob_id = resized_blob->getID();
+  resized_blob.release();
+  this->storage_manager_->deleteBlockOrBlobFile(old_blob_id);
+
+  // Rebuild chains.
+  void *current_bucket = buckets_;
+  for (std::size_t bucket_num = 0; bucket_num < original_buckets_used;
+       ++bucket_num) {
+    std::atomic<std::size_t> *next_ptr =
+        static_cast<std::atomic<std::size_t> *>(current_bucket);
+    const std::size_t hash_code = *reinterpret_cast<const std::size_t *>(
+        static_cast<const char *>(current_bucket) +
+        sizeof(std::atomic<std::size_t>));
+
+    const std::size_t slot_number = hash_code % header_->num_slots;
+    std::size_t slot_ptr_value = 0;
+    if (slots_[slot_number].compare_exchange_strong(
+            slot_ptr_value, bucket_num + 1, std::memory_order_relaxed)) {
+      // This bucket is the first in the chain for this block, so reset its
+      // next pointer to 0.
+      next_ptr->store(0, std::memory_order_relaxed);
+    } else {
+      // A chain already exists starting from this slot, so put this bucket at
+      // the head.
+      next_ptr->store(slot_ptr_value, std::memory_order_relaxed);
+      slots_[slot_number].store(bucket_num + 1, std::memory_order_relaxed);
+    }
+    current_bucket = static_cast<char *>(current_bucket) + bucket_size_;
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/PackedPayloadAggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadAggregationStateHashTable.hpp b/storage/PackedPayloadAggregationStateHashTable.hpp
new file mode 100644
index 0000000..70152e7
--- /dev/null
+++ b/storage/PackedPayloadAggregationStateHashTable.hpp
@@ -0,0 +1,721 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdlib>
+#include <limits>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/HashTableKeyManager.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleReference.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "threading/SpinMutex.hpp"
+#include "threading/SpinSharedMutex.hpp"
+#include "types/Type.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/Alignment.hpp"
+#include "utility/HashPair.hpp"
+#include "utility/Macros.hpp"
+#include "utility/PrimeNumber.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+class PackedPayloadSeparateChainingAggregationStateHashTable
+    : public AggregationStateHashTableBase {
+ public:
+  PackedPayloadSeparateChainingAggregationStateHashTable(
+      const std::vector<const Type *> &key_types,
+      const std::size_t num_entries,
+      const std::vector<AggregationHandle *> &handles,
+      StorageManager *storage_manager);
+
+  ~PackedPayloadSeparateChainingAggregationStateHashTable() override;
+
+  void clear();
+
+  void destroyPayload() override;
+
+  bool upsertValueAccessor(
+      const std::vector<std::vector<attribute_id>> &argument_ids,
+      const std::vector<attribute_id> &key_attr_ids,
+      ValueAccessor *accessor,
+      ColumnVectorsValueAccessor *aux_accessor = nullptr) override;
+
+  inline block_id getBlobId() const {
+    return blob_->getID();
+  }
+
+  inline std::size_t numEntries() const {
+    return header_->buckets_allocated.load(std::memory_order_relaxed);
+  }
+
+  inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
+                                 const std::uint8_t *source_state);
+
+  inline const std::uint8_t* getSingleCompositeKey(
+      const std::vector<TypedValue> &key) const;
+
+  inline const std::uint8_t* getSingleCompositeKey(
+      const std::vector<TypedValue> &key,
+      const int index) const;
+
+  template <typename FunctorT>
+  inline std::size_t forEach(FunctorT *functor) const;
+
+  template <typename FunctorT>
+  inline std::size_t forEach(FunctorT *functor, const int index) const;
+
+ private:
+  void resize(const std::size_t extra_buckets,
+              const std::size_t extra_variable_storage,
+              const std::size_t retry_num = 0);
+
+  inline std::size_t calculateVariableLengthCompositeKeyCopySize(
+      const std::vector<TypedValue> &key) const {
+    std::size_t total = 0;
+    for (std::vector<TypedValue>::size_type idx = 0; idx < key.size(); ++idx) {
+      if (!(*key_inline_)[idx]) {
+        total += key[idx].getDataSize();
+      }
+    }
+    return total;
+  }
+
+  inline bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
+                                       const std::uint8_t **value,
+                                       std::size_t *entry_num) const;
+
+  inline std::uint8_t* upsertCompositeKeyInternal(
+      const std::vector<TypedValue> &key,
+      const std::size_t variable_key_size);
+
+  template <bool has_aux_accessor>
+  inline bool upsertValueAccessorCompositeKeyInternal(
+      const std::vector<std::vector<attribute_id>> &argument_ids,
+      const std::vector<attribute_id> &key_attr_ids,
+      ValueAccessor *accessor,
+      ColumnVectorsValueAccessor *aux_accessor);
+
+  // Generate a hash for a composite key by hashing each component of 'key' and
+  // mixing their bits with CombineHashes().
+  inline std::size_t hashCompositeKey(const std::vector<TypedValue> &key) const;
+
+  // Set information about which key components are stored inline. This usually
+  // comes from a HashTableKeyManager, and is set by the constructor of a
+  // subclass of HashTable.
+  inline void setKeyInline(const std::vector<bool> *key_inline) {
+    scalar_key_inline_ = key_inline->front();
+    key_inline_ = key_inline;
+  }
+
+  inline static std::size_t ComputeTotalPayloadSize(
+      const std::vector<AggregationHandle *> &handles) {
+    std::size_t total_payload_size = sizeof(SpinMutex);
+    for (const auto *handle : handles) {
+      total_payload_size += handle->getPayloadSize();
+    }
+    return total_payload_size;
+  }
+
+  // Assign '*key_vector' with the attribute values specified by 'key_attr_ids'
+  // at the current position of 'accessor'. If 'check_for_null_keys' is true,
+  // stops and returns true if any of the values is null, otherwise returns
+  // false.
+  template <typename ValueAccessorT>
+  inline static bool GetCompositeKeyFromValueAccessor(
+      const ValueAccessorT &accessor,
+      const std::vector<attribute_id> &key_attr_ids,
+      const bool check_for_null_keys,
+      std::vector<TypedValue> *key_vector) {
+    for (std::vector<attribute_id>::size_type key_idx = 0;
+         key_idx < key_attr_ids.size();
+         ++key_idx) {
+      (*key_vector)[key_idx] = accessor.getTypedValue(key_attr_ids[key_idx]);
+      if (check_for_null_keys && (*key_vector)[key_idx].isNull()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  struct Header {
+    std::size_t num_slots;
+    std::size_t num_buckets;
+    alignas(kCacheLineBytes) std::atomic<std::size_t> buckets_allocated;
+    alignas(kCacheLineBytes)
+        std::atomic<std::size_t> variable_length_bytes_allocated;
+  };
+
+  // Type(s) of keys.
+  const std::vector<const Type *> key_types_;
+
+  // Information about whether key components are stored inline or in a
+  // separate variable-length storage region. This is usually determined by a
+  // HashTableKeyManager and set by calling setKeyInline().
+  bool scalar_key_inline_;
+  const std::vector<bool> *key_inline_;
+
+  const std::size_t num_handles_;
+  const std::vector<AggregationHandle *> handles_;
+
+  std::size_t total_payload_size_;
+  std::vector<std::size_t> payload_offsets_;
+  std::uint8_t *init_payload_;
+
+  StorageManager *storage_manager_;
+  MutableBlobReference blob_;
+
+  // Locked in shared mode for most operations, exclusive mode during resize.
+  // Not locked at all for non-resizable HashTables.
+  alignas(kCacheLineBytes) SpinSharedMutex<true> resize_shared_mutex_;
+
+  std::size_t kBucketAlignment;
+
+  // Value's offset in a bucket is the first alignof(ValueT) boundary after the
+  // next pointer and hash code.
+  std::size_t kValueOffset;
+
+  // Round bucket size up to a multiple of kBucketAlignment.
+  constexpr std::size_t ComputeBucketSize(const std::size_t fixed_key_size) {
+    return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) /
+             kBucketAlignment) +
+            1) *
+           kBucketAlignment;
+  }
+
+  // Attempt to find an empty bucket to insert 'hash_code' into, starting after
+  // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
+  // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
+  // empty bucket is found. Returns false if 'allow_duplicate_keys' is false
+  // and a hash collision is found (caller should then check whether there is a
+  // genuine key collision or the hash collision is spurious). Returns false
+  // and sets '*bucket' to NULL if there are no more empty buckets in the hash
+  // table. If 'variable_key_allocation_required' is nonzero, this method will
+  // attempt to allocate storage for a variable-length key BEFORE allocating a
+  // bucket, so that no bucket number below 'header_->num_buckets' is ever
+  // deallocated after being allocated.
+  inline bool locateBucketForInsertion(
+      const std::size_t hash_code,
+      const std::size_t variable_key_allocation_required,
+      void **bucket,
+      std::atomic<std::size_t> **pending_chain_ptr,
+      std::size_t *pending_chain_ptr_finish_value);
+
+  // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was
+  // found by locateBucketForInsertion(). Assumes that storage for a
+  // variable-length key copy (if any) was already allocated by a successful
+  // call to allocateVariableLengthKeyStorage().
+  inline void writeScalarKeyToBucket(
+      const TypedValue &key,
+      const std::size_t hash_code,
+      void *bucket);
+
+  // Write a composite 'key' and its 'hash_code' into the '*bucket', which was
+  // found by locateBucketForInsertion(). Assumes that storage for
+  // variable-length key copies (if any) was already allocated by a successful
+  // call to allocateVariableLengthKeyStorage().
+  inline void writeCompositeKeyToBucket(
+      const std::vector<TypedValue> &key,
+      const std::size_t hash_code,
+      void *bucket);
+
+  // Determine whether it is actually necessary to resize this hash table.
+  // Checks that there is at least one unallocated bucket, and that there is
+  // at least 'extra_variable_storage' bytes of variable-length storage free.
+  inline bool isFull(const std::size_t extra_variable_storage) const;
+
+  // Helper object to manage key storage.
+  HashTableKeyManager<false, true> key_manager_;
+
+  // In-memory structure is as follows:
+  //   - SeparateChainingHashTable::Header
+  //   - Array of slots, interpreted as follows:
+  //       - 0 = Points to nothing (empty)
+  //       - SIZE_T_MAX = Pending (some thread is starting a chain from this
+  //         slot and will overwrite it soon)
+  //       - Anything else = The number of the first bucket in the chain for
+  //         this slot PLUS ONE (i.e. subtract one to get the actual bucket
+  //         number).
+  //   - Array of buckets, each of which is:
+  //       - atomic size_t "next" pointer, interpreted the same as slots above.
+  //       - size_t hash value
+  //       - possibly some unused bytes as needed so that ValueT's alignment
+  //         requirement is met
+  //       - ValueT value slot
+  //       - fixed-length key storage (which may include pointers to external
+  //         memory or offsets of variable length keys stored within this hash
+  //         table)
+  //       - possibly some additional unused bytes so that bucket size is a
+  //         multiple of both alignof(std::atomic<std::size_t>) and
+  //         alignof(ValueT)
+  //   - Variable-length key storage region (referenced by offsets stored in
+  //     fixed-length keys).
+  Header *header_;
+
+  std::atomic<std::size_t> *slots_;
+  void *buckets_;
+  const std::size_t bucket_size_;
+
+  DISALLOW_COPY_AND_ASSIGN(PackedPayloadSeparateChainingAggregationStateHashTable);
+};
+
+/** @} */
+
+// ----------------------------------------------------------------------------
+// Implementations of template class methods follow.
+
+class HashTableMergerFast {
+ public:
+  /**
+   * @brief Constructor
+   *
+   * @param handle The Aggregation handle being used.
+   * @param destination_hash_table The destination hash table to which other
+   *        hash tables will be merged.
+   **/
+  explicit HashTableMergerFast(
+      AggregationStateHashTableBase *destination_hash_table)
+      : destination_hash_table_(
+            static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
+                destination_hash_table)) {}
+
+  /**
+   * @brief The operator for the functor.
+   *
+   * @param group_by_key The group by key being merged.
+   * @param source_state The aggregation state for the given key in the source
+   *        aggregation hash table.
+   **/
+  inline void operator()(const std::vector<TypedValue> &group_by_key,
+                         const std::uint8_t *source_state) {
+    destination_hash_table_->upsertCompositeKey(group_by_key, source_state);
+  }
+
+ private:
+  PackedPayloadSeparateChainingAggregationStateHashTable *destination_hash_table_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashTableMergerFast);
+};
+
+inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
+    ::hashCompositeKey(const std::vector<TypedValue> &key) const {
+  DEBUG_ASSERT(!key.empty());
+  DEBUG_ASSERT(key.size() == key_types_.size());
+  std::size_t hash = key.front().getHash();
+  for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
+       key_it != key.end();
+       ++key_it) {
+    hash = CombineHashes(hash, key_it->getHash());
+  }
+  return hash;
+}
+
+inline bool PackedPayloadSeparateChainingAggregationStateHashTable
+    ::getNextEntryCompositeKey(std::vector<TypedValue> *key,
+                               const std::uint8_t **value,
+                               std::size_t *entry_num) const {
+  if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+    const char *bucket =
+        static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
+    for (std::vector<const Type *>::size_type key_idx = 0;
+         key_idx < this->key_types_.size();
+         ++key_idx) {
+      key->emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx));
+    }
+    *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+    ++(*entry_num);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+
+inline bool PackedPayloadSeparateChainingAggregationStateHashTable
+    ::locateBucketForInsertion(const std::size_t hash_code,
+                               const std::size_t variable_key_allocation_required,
+                               void **bucket,
+                               std::atomic<std::size_t> **pending_chain_ptr,
+                               std::size_t *pending_chain_ptr_finish_value) {
+  if (*bucket == nullptr) {
+    *pending_chain_ptr = &(slots_[hash_code % header_->num_slots]);
+  } else {
+    *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
+  }
+  for (;;) {
+    std::size_t existing_chain_ptr = 0;
+    if ((*pending_chain_ptr)
+            ->compare_exchange_strong(existing_chain_ptr,
+                                      std::numeric_limits<std::size_t>::max(),
+                                      std::memory_order_acq_rel)) {
+      // Got to the end of the chain. Allocate a new bucket.
+
+      // First, allocate variable-length key storage, if needed (i.e. if this
+      // is an upsert and we didn't allocate up-front).
+      if (!key_manager_.allocateVariableLengthKeyStorage(
+              variable_key_allocation_required)) {
+        // Ran out of variable-length storage.
+        (*pending_chain_ptr)->store(0, std::memory_order_release);
+        *bucket = nullptr;
+        return false;
+      }
+
+      const std::size_t allocated_bucket_num =
+          header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed);
+      if (allocated_bucket_num >= header_->num_buckets) {
+        // Ran out of buckets.
+        header_->buckets_allocated.fetch_sub(1, std::memory_order_relaxed);
+        (*pending_chain_ptr)->store(0, std::memory_order_release);
+        *bucket = nullptr;
+        return false;
+      } else {
+        *bucket =
+            static_cast<char *>(buckets_) + allocated_bucket_num * bucket_size_;
+        *pending_chain_ptr_finish_value = allocated_bucket_num + 1;
+        return true;
+      }
+    }
+    // Spin until the real "next" pointer is available.
+    while (existing_chain_ptr == std::numeric_limits<std::size_t>::max()) {
+      existing_chain_ptr =
+          (*pending_chain_ptr)->load(std::memory_order_acquire);
+    }
+    if (existing_chain_ptr == 0) {
+      // Other thread had to roll back, so try again.
+      continue;
+    }
+    // Chase the next pointer.
+    *bucket =
+        static_cast<char *>(buckets_) + (existing_chain_ptr - 1) * bucket_size_;
+    *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket);
+    const std::size_t hash_in_bucket = *reinterpret_cast<const std::size_t *>(
+        static_cast<const char *>(*bucket) +
+        sizeof(std::atomic<std::size_t>));
+    if (hash_in_bucket == hash_code) {
+      return false;
+    }
+  }
+}
+
+inline const std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable
+    ::getSingleCompositeKey(const std::vector<TypedValue> &key) const {
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  std::size_t bucket_ref =
+      slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket =
+        static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) &&
+        key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+    }
+    bucket_ref =
+        reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+            std::memory_order_relaxed);
+  }
+
+  // Reached the end of the chain and didn't find a match.
+  return nullptr;
+}
+
+inline const std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable
+    ::getSingleCompositeKey(const std::vector<TypedValue> &key,
+                            const int index) const {
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  std::size_t bucket_ref =
+      slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket =
+        static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) &&
+        key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset) +
+             this->payload_offsets_[index];
+    }
+    bucket_ref =
+        reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
+            std::memory_order_relaxed);
+  }
+
+  // Reached the end of the chain and didn't find a match.
+  return nullptr;
+}
+
+inline bool PackedPayloadSeparateChainingAggregationStateHashTable
+    ::upsertCompositeKey(const std::vector<TypedValue> &key,
+                         const std::uint8_t *source_state) {
+  const std::size_t variable_size =
+      calculateVariableLengthCompositeKeyCopySize(key);
+  for (;;) {
+    {
+      SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
+      std::uint8_t *value =
+          upsertCompositeKeyInternal(key, variable_size);
+      if (value != nullptr) {
+        SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+        for (unsigned int k = 0; k < num_handles_; ++k) {
+          handles_[k]->mergeStates(source_state + payload_offsets_[k],
+                                   value + payload_offsets_[k]);
+        }
+        return true;
+      }
+    }
+    resize(0, variable_size);
+  }
+}
+
+inline std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable
+    ::upsertCompositeKeyInternal(const std::vector<TypedValue> &key,
+                                 const std::size_t variable_key_size) {
+  if (variable_key_size > 0) {
+    // Don't allocate yet, since the key may already be present. However, we
+    // do check if either the allocated variable storage space OR the free
+    // space is big enough to hold the key (at least one must be true: either
+    // the key is already present and allocated, or we need to be able to
+    // allocate enough space for it).
+    std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(
+        std::memory_order_relaxed);
+    if ((allocated_bytes < variable_key_size) &&
+        (allocated_bytes + variable_key_size >
+         key_manager_.getVariableLengthKeyStorageSize())) {
+      return nullptr;
+    }
+  }
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  void *bucket = nullptr;
+  std::atomic<std::size_t> *pending_chain_ptr;
+  std::size_t pending_chain_ptr_finish_value;
+  for (;;) {
+    if (locateBucketForInsertion(hash_code,
+                                 variable_key_size,
+                                 &bucket,
+                                 &pending_chain_ptr,
+                                 &pending_chain_ptr_finish_value)) {
+      // Found an empty bucket.
+      break;
+    } else if (bucket == nullptr) {
+      // Ran out of buckets or variable-key space.
+      return nullptr;
+    } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Found an already-existing entry for this key.
+      return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) +
+                                              kValueOffset);
+    }
+  }
+
+  // We are now writing to an empty bucket.
+  // Write the key and hash.
+  writeCompositeKeyToBucket(key, hash_code, bucket);
+
+  std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
+  std::memcpy(value, init_payload_, this->total_payload_size_);
+
+  // Update the previous chaing pointer to point to the new bucket.
+  pending_chain_ptr->store(pending_chain_ptr_finish_value,
+                           std::memory_order_release);
+
+  // Return the value.
+  return value;
+}
+
+template <bool has_aux_accessor>
+inline bool PackedPayloadSeparateChainingAggregationStateHashTable
+    ::upsertValueAccessorCompositeKeyInternal(
+        const std::vector<std::vector<attribute_id>> &argument_ids,
+        const std::vector<attribute_id> &key_attr_ids,
+        ValueAccessor *accessor,
+        ColumnVectorsValueAccessor *aux_accessor) {
+  std::size_t variable_size;
+  std::vector<TypedValue> key_vector;
+  key_vector.resize(key_attr_ids.size());
+
+  // TODO(jianqiao): determine this bool value
+  const bool check_for_null_keys = true;
+
+  return InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
+    bool continuing = true;
+    while (continuing) {
+      {
+        continuing = false;
+        SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
+        while (accessor->next()) {
+          if (has_aux_accessor) {
+            aux_accessor->next();
+          }
+          // TODO(jianqiao): templatize to involve aux_accessor
+          if (this->GetCompositeKeyFromValueAccessor(*accessor,
+                                                     key_attr_ids,
+                                                     check_for_null_keys,
+                                                     &key_vector)) {
+            continue;
+          }
+          variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
+          std::uint8_t *value = this->upsertCompositeKeyInternal(
+              key_vector, variable_size);
+          if (value == nullptr) {
+            continuing = true;
+            break;
+          } else {
+            SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
+            for (unsigned int k = 0; k < num_handles_; ++k) {
+              const auto &ids = argument_ids[k];
+              if (ids.empty()) {
+                handles_[k]->updateStateNullary(
+                    value + payload_offsets_[k]);
+              } else {
+                const attribute_id argument_id = ids.front();
+                if (has_aux_accessor && argument_id < 0) {
+                  DCHECK_NE(argument_id, kInvalidAttributeID);
+                  handles_[k]->updateStateUnary(aux_accessor->getTypedValue(-(argument_id+2)),
+                                                value + payload_offsets_[k]);
+                } else {
+                  handles_[k]->updateStateUnary(accessor->getTypedValue(argument_id),
+                                                value + payload_offsets_[k]);
+                }
+              }
+            }
+          }
+        }
+      }
+      if (continuing) {
+        this->resize(0, variable_size);
+        accessor->previous();
+        if (has_aux_accessor) {
+          aux_accessor->previous();
+        }
+      }
+    }
+    return true;
+  });
+}
+
+inline void PackedPayloadSeparateChainingAggregationStateHashTable
+    ::writeScalarKeyToBucket(const TypedValue &key,
+                             const std::size_t hash_code,
+                             void *bucket) {
+  *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
+                                   sizeof(std::atomic<std::size_t>)) =
+      hash_code;
+  key_manager_.writeKeyComponentToBucket(key, 0, bucket, nullptr);
+}
+
+inline void PackedPayloadSeparateChainingAggregationStateHashTable
+    ::writeCompositeKeyToBucket(const std::vector<TypedValue> &key,
+                                const std::size_t hash_code,
+                                void *bucket) {
+  DEBUG_ASSERT(key.size() == this->key_types_.size());
+  *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) +
+                                   sizeof(std::atomic<std::size_t>)) =
+      hash_code;
+  for (std::size_t idx = 0; idx < this->key_types_.size(); ++idx) {
+    key_manager_.writeKeyComponentToBucket(key[idx], idx, bucket, nullptr);
+  }
+}
+
+inline bool PackedPayloadSeparateChainingAggregationStateHashTable::isFull(
+    const std::size_t extra_variable_storage) const {
+  if (header_->buckets_allocated.load(std::memory_order_relaxed) >=
+      header_->num_buckets) {
+    // All buckets are allocated.
+    return true;
+  }
+
+  if (extra_variable_storage > 0) {
+    if (extra_variable_storage +
+            header_->variable_length_bytes_allocated.load(
+                std::memory_order_relaxed) >
+        key_manager_.getVariableLengthKeyStorageSize()) {
+      // Not enough variable-length key storage space.
+      return true;
+    }
+  }
+
+  return false;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
+    ::forEach(FunctorT *functor) const {
+  std::size_t entries_visited = 0;
+  std::size_t entry_num = 0;
+  std::vector<TypedValue> key;
+  const std::uint8_t *value_ptr;
+  while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
+    ++entries_visited;
+    (*functor)(key, value_ptr);
+    key.clear();
+  }
+  return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
+    ::forEach(FunctorT *functor, const int index) const {
+  std::size_t entries_visited = 0;
+  std::size_t entry_num = 0;
+  std::vector<TypedValue> key;
+  const std::uint8_t *value_ptr;
+  while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
+    ++entries_visited;
+    (*functor)(key, value_ptr + payload_offsets_[index]);
+    key.clear();
+  }
+  return entries_visited;
+}
+
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/PartitionedHashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp
index 95d1810..e9ca022 100644
--- a/storage/PartitionedHashTablePool.hpp
+++ b/storage/PartitionedHashTablePool.hpp
@@ -28,8 +28,7 @@
 
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/FastHashTableFactory.hpp"
+#include "storage/HashTableFactory.hpp"
 #include "utility/Macros.hpp"
 #include "utility/StringUtil.hpp"
 
@@ -54,33 +53,6 @@ class PartitionedHashTablePool {
   /**
    * @brief Constructor.
    *
-   * @param estimated_num_entries The maximum number of entries in a hash table.
-   * @param num_partitions The number of partitions (i.e. number of HashTables)
-   * @param hash_table_impl_type The type of hash table implementation.
-   * @param group_by_types A vector of pointer of types which form the group by
-   *        key.
-   * @param agg_handle The aggregation handle.
-   * @param storage_manager A pointer to the storage manager.
-   **/
-  PartitionedHashTablePool(const std::size_t estimated_num_entries,
-                           const std::size_t num_partitions,
-                           const HashTableImplType hash_table_impl_type,
-                           const std::vector<const Type *> &group_by_types,
-                           AggregationHandle *agg_handle,
-                           StorageManager *storage_manager)
-      : estimated_num_entries_(
-            setHashTableSize(estimated_num_entries, num_partitions)),
-        num_partitions_(num_partitions),
-        hash_table_impl_type_(hash_table_impl_type),
-        group_by_types_(group_by_types),
-        agg_handle_(DCHECK_NOTNULL(agg_handle)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
-    initializeAllHashTables();
-  }
-
-  /**
-   * @brief Constructor.
-   *
    * @note This constructor is relevant for the HashTable specialized for
    *       aggregation.
    *
@@ -89,8 +61,6 @@ class PartitionedHashTablePool {
    * @param hash_table_impl_type The type of hash table implementation.
    * @param group_by_types A vector of pointer of types which form the group by
    *        key.
-   * @param payload_sizes The sizes of the payload elements (i.e.
-   *        AggregationStates).
    * @param handles The aggregation handles.
    * @param storage_manager A pointer to the storage manager.
    **/
@@ -98,7 +68,6 @@ class PartitionedHashTablePool {
                            const std::size_t num_partitions,
                            const HashTableImplType hash_table_impl_type,
                            const std::vector<const Type *> &group_by_types,
-                           const std::vector<std::size_t> &payload_sizes,
                            const std::vector<AggregationHandle *> &handles,
                            StorageManager *storage_manager)
       : estimated_num_entries_(
@@ -106,7 +75,6 @@ class PartitionedHashTablePool {
         num_partitions_(num_partitions),
         hash_table_impl_type_(hash_table_impl_type),
         group_by_types_(group_by_types),
-        payload_sizes_(payload_sizes),
         handles_(handles),
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {
     initializeAllHashTables();
@@ -150,25 +118,17 @@ class PartitionedHashTablePool {
  private:
   void initializeAllHashTables() {
     for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) {
-      AggregationStateHashTableBase *part_hash_table = createNewHashTableFast();
+      AggregationStateHashTableBase *part_hash_table = createNewHashTable();
       hash_tables_.push_back(
           std::unique_ptr<AggregationStateHashTableBase>(part_hash_table));
     }
   }
 
   AggregationStateHashTableBase* createNewHashTable() {
-    return agg_handle_->createGroupByHashTable(hash_table_impl_type_,
-                                               group_by_types_,
-                                               estimated_num_entries_,
-                                               storage_manager_);
-  }
-
-  AggregationStateHashTableBase* createNewHashTableFast() {
-    return AggregationStateFastHashTableFactory::CreateResizable(
+    return AggregationStateHashTableFactory::CreateResizable(
                 hash_table_impl_type_,
                 group_by_types_,
                 estimated_num_entries_,
-                payload_sizes_,
                 handles_,
                 storage_manager_);
   }
@@ -189,10 +149,6 @@ class PartitionedHashTablePool {
   const HashTableImplType hash_table_impl_type_;
 
   const std::vector<const Type *> group_by_types_;
-
-  std::vector<std::size_t> payload_sizes_;
-
-  AggregationHandle *agg_handle_;
   const std::vector<AggregationHandle *> handles_;
   StorageManager *storage_manager_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index de2d25b..ba9ccb8 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -28,7 +28,6 @@
 
 #include "catalog/CatalogRelationSchema.hpp"
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/BasicColumnStoreTupleStorageSubBlock.hpp"
@@ -37,7 +36,6 @@
 #include "storage/CompressedColumnStoreTupleStorageSubBlock.hpp"
 #include "storage/CompressedPackedRowStoreTupleStorageSubBlock.hpp"
 #include "storage/CountedReference.hpp"
-#include "storage/HashTableBase.hpp"
 #include "storage/IndexSubBlock.hpp"
 #include "storage/InsertDestinationInterface.hpp"
 #include "storage/SMAIndexSubBlock.hpp"
@@ -396,166 +394,6 @@ void StorageBlock::selectSimple(const std::vector<attribute_id> &selection,
                                                       accessor.get());
 }
 
-AggregationState* StorageBlock::aggregate(
-    const AggregationHandle &handle,
-    const std::vector<std::unique_ptr<const Scalar>> &arguments,
-    const std::vector<attribute_id> *arguments_as_attributes,
-    const TupleIdSequence *filter) const {
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  // If all the arguments to this aggregate are plain relation attributes,
-  // aggregate directly on a ValueAccessor from this block to avoid a copy.
-  if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
-    DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
-        << "Mismatch between number of arguments and number of attribute_ids";
-    return aggregateHelperValueAccessor(handle, *arguments_as_attributes, filter);
-  }
-  // TODO(shoban): We may want to optimize for ScalarLiteral here.
-#endif
-
-  // Call aggregateHelperColumnVector() to materialize each argument as a
-  // ColumnVector, then aggregate over those.
-  return aggregateHelperColumnVector(handle, arguments, filter);
-}
-
-void StorageBlock::aggregateGroupBy(
-    const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
-    const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const TupleIdSequence *filter,
-    AggregationStateHashTableBase *hash_table,
-    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
-  DCHECK_GT(group_by.size(), 0u)
-      << "Called aggregateGroupBy() with zero GROUP BY expressions";
-
-  SubBlocksReference sub_blocks_ref(*tuple_store_,
-                                    indices_,
-                                    indices_consistent_);
-
-  // IDs of 'arguments' as attributes in the ValueAccessor we create below.
-  std::vector<attribute_id> argument_ids;
-
-  // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
-  std::vector<attribute_id> key_ids;
-
-  // An intermediate ValueAccessor that stores the materialized 'arguments' for
-  // this aggregate, as well as the GROUP BY expression values.
-  ColumnVectorsValueAccessor temp_result;
-  {
-    std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
-    attribute_id attr_id = 0;
-
-    // First, put GROUP BY keys into 'temp_result'.
-    if (reuse_group_by_vectors->empty()) {
-      // Compute GROUP BY values from group_by Scalars, and store them in
-      // reuse_group_by_vectors for reuse by other aggregates on this same
-      // block.
-      reuse_group_by_vectors->reserve(group_by.size());
-      for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
-        reuse_group_by_vectors->emplace_back(
-            group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
-        temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
-        key_ids.push_back(attr_id++);
-      }
-    } else {
-      // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
-      DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
-          << "Wrong number of reuse_group_by_vectors";
-      for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
-        temp_result.addColumn(reuse_cv.get(), false);
-        key_ids.push_back(attr_id++);
-      }
-    }
-
-    // Compute argument vectors and add them to 'temp_result'.
-    for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
-        for (const std::unique_ptr<const Scalar> &args : argument) {
-          temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
-          argument_ids.push_back(attr_id++);
-        }
-        if (argument.empty()) {
-          argument_ids.push_back(kInvalidAttributeID);
-        }
-     }
-  }
-
-  hash_table->upsertValueAccessorCompositeKeyFast(argument_ids,
-                                                  &temp_result,
-                                                  key_ids,
-                                                  true);
-}
-
-
-void StorageBlock::aggregateDistinct(
-    const AggregationHandle &handle,
-    const std::vector<std::unique_ptr<const Scalar>> &arguments,
-    const std::vector<attribute_id> *arguments_as_attributes,
-    const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const TupleIdSequence *filter,
-    AggregationStateHashTableBase *distinctify_hash_table,
-    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
-  DCHECK_GT(arguments.size(), 0u)
-      << "Called aggregateDistinct() with zero argument expressions";
-  DCHECK((group_by.size() == 0 || reuse_group_by_vectors != nullptr));
-
-  std::vector<attribute_id> key_ids;
-
-  // An intermediate ValueAccessor that stores the materialized 'arguments' for
-  // this aggregate, as well as the GROUP BY expression values.
-  ColumnVectorsValueAccessor temp_result;
-  {
-    std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-    // If all the arguments to this aggregate are plain relation attributes,
-    // aggregate directly on a ValueAccessor from this block to avoid a copy.
-    if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
-      DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
-          << "Mismatch between number of arguments and number of attribute_ids";
-      DCHECK_EQ(group_by.size(), 0u);
-      handle.insertValueAccessorIntoDistinctifyHashTable(
-          accessor.get(), *arguments_as_attributes, distinctify_hash_table);
-      return;
-    }
-#endif
-
-    SubBlocksReference sub_blocks_ref(*tuple_store_,
-                                      indices_,
-                                      indices_consistent_);
-    attribute_id attr_id = 0;
-
-    if (!group_by.empty()) {
-      // Put GROUP BY keys into 'temp_result'.
-      if (reuse_group_by_vectors->empty()) {
-        // Compute GROUP BY values from group_by Scalars, and store them in
-        // reuse_group_by_vectors for reuse by other aggregates on this same
-        // block.
-        reuse_group_by_vectors->reserve(group_by.size());
-        for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
-          reuse_group_by_vectors->emplace_back(
-              group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
-          temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
-          key_ids.push_back(attr_id++);
-        }
-      } else {
-        // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
-        DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
-            << "Wrong number of reuse_group_by_vectors";
-        for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
-          temp_result.addColumn(reuse_cv.get(), false);
-          key_ids.push_back(attr_id++);
-        }
-      }
-    }
-    // Compute argument vectors and add them to 'temp_result'.
-    for (const std::unique_ptr<const Scalar> &argument : arguments) {
-      temp_result.addColumn(argument->getAllValues(accessor.get(), &sub_blocks_ref));
-      key_ids.push_back(attr_id++);
-    }
-  }
-
-  handle.insertValueAccessorIntoDistinctifyHashTable(
-      &temp_result, key_ids, distinctify_hash_table);
-}
-
 // TODO(chasseur): Vectorization for updates.
 StorageBlock::UpdateResult StorageBlock::update(
     const unordered_map<attribute_id, unique_ptr<const Scalar>> &assignments,
@@ -1262,61 +1100,6 @@ std::unordered_map<attribute_id, TypedValue>* StorageBlock::generateUpdatedValue
   return update_map;
 }
 
-AggregationState* StorageBlock::aggregateHelperColumnVector(
-    const AggregationHandle &handle,
-    const std::vector<std::unique_ptr<const Scalar>> &arguments,
-    const TupleIdSequence *matches) const {
-  if (arguments.empty()) {
-    // Special case. This is a nullary aggregate (i.e. COUNT(*)).
-    return handle.accumulateNullary(matches == nullptr ? tuple_store_->numTuples()
-                                                       : matches->size());
-  } else {
-    // Set up a ValueAccessor that will be used when materializing argument
-    // values below (possibly filtered based on the '*matches' to a filter
-    // predicate).
-    std::unique_ptr<ValueAccessor> accessor;
-    if (matches == nullptr) {
-      accessor.reset(tuple_store_->createValueAccessor());
-    } else {
-      accessor.reset(tuple_store_->createValueAccessor(matches));
-    }
-
-    SubBlocksReference sub_blocks_ref(*tuple_store_,
-                                      indices_,
-                                      indices_consistent_);
-
-    // Materialize each argument's values for this block as a ColumnVector.
-    std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    for (const std::unique_ptr<const Scalar> &argument : arguments) {
-      column_vectors.emplace_back(argument->getAllValues(accessor.get(), &sub_blocks_ref));
-    }
-
-    // Have the AggregationHandle actually do the aggregation.
-    return handle.accumulateColumnVectors(column_vectors);
-  }
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* StorageBlock::aggregateHelperValueAccessor(
-    const AggregationHandle &handle,
-    const std::vector<attribute_id> &argument_ids,
-    const TupleIdSequence *matches) const {
-  // Set up a ValueAccessor to aggregate over (possibly filtered based on the
-  // '*matches' to a filter predicate).
-  std::unique_ptr<ValueAccessor> accessor;
-  if (matches == nullptr) {
-    accessor.reset(tuple_store_->createValueAccessor());
-  } else {
-    accessor.reset(tuple_store_->createValueAccessor(matches));
-  }
-
-  // Have the AggregationHandle actually do the aggregation.
-  return handle.accumulateValueAccessor(
-      accessor.get(),
-      argument_ids);
-}
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
 void StorageBlock::updateHeader() {
   DEBUG_ASSERT(*static_cast<const int*>(block_memory_) == block_header_.ByteSize());
 
@@ -1346,59 +1129,4 @@ const std::size_t StorageBlock::getNumTuples() const {
   return tuple_store_->numTuples();
 }
 
-void StorageBlock::aggregateGroupByPartitioned(
-    const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
-    const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const TupleIdSequence *filter,
-    const std::size_t num_partitions,
-    ColumnVectorsValueAccessor *temp_result,
-    std::vector<attribute_id> *argument_ids,
-    std::vector<attribute_id> *key_ids,
-    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
-  DCHECK(!group_by.empty())
-      << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions";
-
-  SubBlocksReference sub_blocks_ref(*tuple_store_,
-                                    indices_,
-                                    indices_consistent_);
-
-  std::unique_ptr<ValueAccessor> accessor(
-      tuple_store_->createValueAccessor(filter));
-
-  attribute_id attr_id = 0;
-
-  // First, put GROUP BY keys into 'temp_result'.
-  if (reuse_group_by_vectors->empty()) {
-    // Compute GROUP BY values from group_by Scalars, and store them in
-    // reuse_group_by_vectors for reuse by other aggregates on this same
-    // block.
-    reuse_group_by_vectors->reserve(group_by.size());
-    for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
-      reuse_group_by_vectors->emplace_back(
-          group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
-      temp_result->addColumn(reuse_group_by_vectors->back().get(), false);
-      key_ids->push_back(attr_id++);
-    }
-  } else {
-    // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
-    DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
-        << "Wrong number of reuse_group_by_vectors";
-    for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
-      temp_result->addColumn(reuse_cv.get(), false);
-      key_ids->push_back(attr_id++);
-    }
-  }
-
-  // Compute argument vectors and add them to 'temp_result'.
-  for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
-    for (const std::unique_ptr<const Scalar> &args : argument) {
-      temp_result->addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
-      argument_ids->push_back(attr_id++);
-    }
-    if (argument.empty()) {
-      argument_ids->push_back(kInvalidAttributeID);
-    }
-  }
-}
-
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 16ea50f..d09ed3c 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -27,7 +27,6 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "storage/CountedReference.hpp"
-#include "storage/HashTableBase.hpp"
 #include "storage/IndexSubBlock.hpp"
 #include "storage/StorageBlockBase.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -39,8 +38,6 @@
 
 namespace quickstep {
 
-class AggregationHandle;
-class AggregationState;
 class CatalogRelationSchema;
 class ColumnVector;
 class ColumnVectorsValueAccessor;
@@ -431,156 +428,6 @@ class StorageBlock : public StorageBlockBase {
                     InsertDestinationInterface *destination) const;
 
   /**
-   * @brief Perform non GROUP BY aggregation on the tuples in the this storage
-   *        block, returning the aggregated result (for this block) in an
-   *        AggregationState.
-   *
-   * @param handle Aggregation handle that will be used to compute aggregate.
-   * @param arguments The arguments of the aggregate function as expressions.
-   * @param arguments_as_attributes If non-NULL, indicates a valid attribute_id
-   *        for each of the elements in arguments, and is used to elide a copy.
-   *        Has no effect if NULL, or if VECTOR_COPY_ELISION_LEVEL is NONE.
-   * @param filter If non-NULL, then only tuple IDs which are set in the
-   *        filter will be checked (all others will be assumed to be false).
-   *
-   * @return Aggregated state for this block in the form of an
-   *         AggregationState. AggregationHandle::mergeStates() can be called
-   *         to merge with states from other blocks, and
-   *         AggregationHandle::finalize() can be used to generate a final
-   *         result.
-   **/
-  AggregationState* aggregate(
-      const AggregationHandle &handle,
-      const std::vector<std::unique_ptr<const Scalar>> &arguments,
-      const std::vector<attribute_id> *arguments_as_attributes,
-      const TupleIdSequence *filter) const;
-
-  /**
-   * @brief Perform GROUP BY aggregation on the tuples in the this storage
-   *        block.
-   *
-   * @param arguments The arguments to the aggregation function as Scalars.
-   * @param group_by The list of GROUP BY attributes/expressions. The tuples in
-   *        this storage block are grouped by these attributes before
-   *        aggregation.
-   * @param filter If non-NULL, then only tuple IDs which are set in the
-   *        filter will be checked (all others will be assumed to be false).
-   * @param hash_table Hash table to store aggregation state mapped based on
-   *        GROUP BY value list (defined by \c group_by).
-   * @param reuse_group_by_vectors This parameter is used to store and reuse
-   *        GROUP BY attribute vectors pre-computed in an earlier invocation of
-   *        aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
-   *        for ease of use. Current invocation of aggregateGroupBy() will reuse
-   *        ColumnVectors if non-empty, otherwise computes ColumnVectors based
-   *        on \c group_by and stores them in \c reuse_group_by_vectors.
-   *
-   * For sample usage of aggregateGroupBy, see this relevant pseudo-C++ code:
-   * \code
-   * std::vector<std::unique_ptr<ColumnVector>> group_by_vectors;
-   * for each aggregate {
-   *   block.aggregateGroupBy(..., &group_by_vectors);
-   * }
-   * \endcode
-   **/
-  /*
-   * TODO(shoban): Currently, we use ColumnVectorsValueAccessor to compute
-   * temporary result for Scalars of aggregation attributes and GROUP BY
-   * attributes.  We will have to support specifying aggregation and GROUP BY
-   * attributes as std::vector<attribute_id> (like in selectSimple()) for fast
-   * path when there are no expressions specified in the query.
-   */
-  void aggregateGroupBy(
-      const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
-      const std::vector<std::unique_ptr<const Scalar>> &group_by,
-      const TupleIdSequence *filter,
-      AggregationStateHashTableBase *hash_table,
-      std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
-
-
-  /**
-   * @brief Perform the GROUP BY aggregation for the case when aggregation is
-   *        partitioned.
-   *
-   * TODO(harshad) - Refactor this class to use only one function
-   *       aggregateGroupBy.
-   * @note The difference between this method and the aggregateGroupBy method
-   *       is that in this method, the tuples are routed to different HashTables
-   *       based on the partition to which they belong to. The partition is
-   *       determined by the GROUP BY attributes. Right now hash based
-   *       partitioning is performed.
-   *
-   * @note This function only creates the ColumnVectorsValueAccessor needed for
-   *       the insertion in the hash table. The actual insertion in respective
-   *       hash tables should be handled by the caller. See
-   *       AggregationOperationState::aggregateHashTable() for one such
-   *       implementation.
-   *
-   * @param arguments The arguments to the aggregation function as Scalars.
-   * @param group_by The list of GROUP BY attributes/expressions. The tuples in
-   *        this storage block are grouped by these attributes before
-   *        aggregation.
-   * @param filter If non-NULL, then only tuple IDs which are set in the
-   *        filter will be checked (all others will be assumed to be false).
-   * @param num_partitions The number of partitions used for the aggregation.
-   * @param temp_result The ColumnVectorsValueAccessor used for collecting
-   *        the attribute values from this StorageBlock.
-   * @param arguments_ids The attribute IDs used for the aggregation, which
-   *        come from the arguments vector. If arguments is empty, this vector
-   *        is filled with invalid attribute IDs.
-   * @param key_ids The attribute IDs of the group by attributes.
-   * @param reuse_group_by_vectors This parameter is used to store and reuse
-   *        GROUP BY attribute vectors pre-computed in an earlier invocation of
-   *        aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
-   *        for ease of use. Current invocation of aggregateGroupBy() will reuse
-   *        ColumnVectors if non-empty, otherwise computes ColumnVectors based
-   *        on \c group_by and stores them in \c reuse_group_by_vectors.
-   **/
-  void aggregateGroupByPartitioned(
-      const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
-      const std::vector<std::unique_ptr<const Scalar>> &group_by,
-      const TupleIdSequence *filter,
-      const std::size_t num_partitions,
-      ColumnVectorsValueAccessor *temp_result,
-      std::vector<attribute_id> *argument_ids,
-      std::vector<attribute_id> *key_ids,
-      std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
-
-  /**
-   * @brief Inserts the GROUP BY expressions and aggregation arguments together
-   *        as keys into the distinctify hash table.
-   *
-   * This is the first step for DISTINCT aggregation. It populates the distinctify
-   * hash table so that arguments are distinctified within each GROUP BY group.
-   * Later, a second-round aggregation on the distinctify hash table will be
-   * performed to actually compute the aggregated result for each GROUP BY group.
-   *
-   * @param handle Aggregation handle to compute aggregates with.
-   * @param arguments The arguments to the aggregation function as Scalars.
-   * @param arguments_as_attributes If non-NULL, indicates a valid attribute_id
-   *        for each of the elements in arguments, and is used to elide a copy.
-   *        Has no effect if NULL, or if VECTOR_COPY_ELISION_LEVEL is NONE.
-   * @param group_by The list of GROUP BY attributes/expressions.
-   * @param filter If non-NULL, then only tuple IDs which are set in the
-   *        filter will be checked (all others will be assumed to be false).
-   * @param distinctify_hash_table Hash table to store the arguments and GROUP
-   *        BY expressions together as hash table key and a bool constant \c true
-   *        as hash table value. (So the hash table actually serves as a hash set.)
-   * @param reuse_group_by_vectors This parameter is used to store and reuse
-   *        GROUP BY attribute vectors pre-computed in an earlier invocation of
-   *        aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
-   *        for ease of use. Current invocation of aggregateGroupBy() will reuse
-   *        ColumnVectors if non-empty, otherwise computes ColumnVectors based
-   *        on \c group_by and stores them in \c reuse_group_by_vectors.
-   */
-  void aggregateDistinct(const AggregationHandle &handle,
-                         const std::vector<std::unique_ptr<const Scalar>> &arguments,
-                         const std::vector<attribute_id> *arguments_as_attributes,
-                         const std::vector<std::unique_ptr<const Scalar>> &group_by,
-                         const TupleIdSequence *filter,
-                         AggregationStateHashTableBase *distinctify_hash_table,
-                         std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
-
-  /**
    * @brief Perform an UPDATE query over the tuples in this StorageBlock.
    * @warning In some edge cases, calling this method may cause IndexSubBlocks
    *          in this block to become inconsistent (the TupleStorageSubBlock
@@ -702,18 +549,6 @@ class StorageBlock : public StorageBlockBase {
       const tuple_id tuple,
       const std::unordered_map<attribute_id, std::unique_ptr<const Scalar>> &assignments) const;
 
-  AggregationState* aggregateHelperColumnVector(
-      const AggregationHandle &handle,
-      const std::vector<std::unique_ptr<const Scalar>> &arguments,
-      const TupleIdSequence *matches) const;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* aggregateHelperValueAccessor(
-      const AggregationHandle &handle,
-      const std::vector<attribute_id> &argument_ids,
-      const TupleIdSequence *matches) const;
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
   // Sort the tuples in storage block based on `sort_attribute'. If
   // `use_input_sequence' is set, we assume a pre-existing order of tuple-id
   // sequence specified by `sorted_sequence' and use stable sort to maintain

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index aeff388..3f31f09 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -172,6 +172,7 @@ add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
 add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp)
+add_library(quickstep_utility_ConcurrentBitVector ../empty_src.cpp ConcurrentBitVector.hpp)
 add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp)
 add_library(quickstep_utility_DisjointTreeForest ../empty_src.cpp DisjointTreeForest.hpp)
 add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp)
@@ -238,6 +239,9 @@ target_link_libraries(quickstep_utility_CompositeHash
                       quickstep_types_TypedValue
                       quickstep_utility_HashPair
                       glog)
+target_link_libraries(quickstep_utility_ConcurrentBitVector
+                      quickstep_utility_BitManipulation
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_DAG
                       glog
                       quickstep_utility_Macros)
@@ -338,6 +342,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf
                       quickstep_utility_CompositeHash
+                      quickstep_utility_ConcurrentBitVector
                       quickstep_utility_DAG
                       quickstep_utility_DisjointTreeForest
                       quickstep_utility_EqualsAnyConstant

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/utility/ConcurrentBitVector.hpp
----------------------------------------------------------------------
diff --git a/utility/ConcurrentBitVector.hpp b/utility/ConcurrentBitVector.hpp
new file mode 100644
index 0000000..e36f03c
--- /dev/null
+++ b/utility/ConcurrentBitVector.hpp
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_CONCURRENT_BIT_VECTOR_HPP_
+#define QUICKSTEP_UTILITY_CONCURRENT_BIT_VECTOR_HPP_
+
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <limits>
+
+#include "utility/BitManipulation.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+class ConcurrentBitVector {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param memory_location The location of memory to use for the ConcurrentBitVector.
+   * @param num_bits The length of the ConcurrentBitVector in bits.
+   * @param initialize If true, initialize all the bytes of the memory to 0.
+   */
+  ConcurrentBitVector(void *memory_location,
+                      const std::size_t num_bits,
+                      const bool initialize)
+      : owned_(false),
+        num_bits_(num_bits),
+        data_array_(static_cast<DataType *>(memory_location)),
+        data_array_size_((num_bits >> kHigherOrderShift) + (num_bits & kLowerOrderMask ? 1 : 0)) {
+    DCHECK_GT(num_bits, 0);
+    DCHECK(data_array_ != nullptr);
+
+    if (initialize) {
+      clear();
+    }
+  }
+
+  explicit ConcurrentBitVector(const std::size_t num_bits)
+      : owned_(true),
+        num_bits_(num_bits),
+        data_array_(static_cast<DataType *>(std::malloc(BytesNeeded(num_bits)))),
+        data_array_size_((num_bits >> kHigherOrderShift) + (num_bits & kLowerOrderMask ? 1 : 0)) {
+    DCHECK_GT(num_bits, 0);
+    clear();
+  }
+
+  ~ConcurrentBitVector() {
+    if (owned_ && (num_bits_ != 0)) {
+      std::free(data_array_);
+    }
+  }
+
+  inline static std::size_t BytesNeeded(const std::size_t num_bits) {
+    if (num_bits & kLowerOrderMask) {
+      return ((num_bits >> kHigherOrderShift) + 1) * kDataSize;
+    } else {
+      return (num_bits >> kHigherOrderShift) * kDataSize;
+    }
+  }
+
+  inline std::size_t size() const {
+    return num_bits_;
+  }
+
+  inline void clear() {
+    std::memset(data_array_, 0, BytesNeeded(num_bits_));
+  }
+
+  inline bool getBit(const std::size_t bit_num) const {
+    const std::size_t data_value =
+        data_array_[bit_num >> kHigherOrderShift].load(std::memory_order_relaxed);
+    return (data_value << (bit_num & kLowerOrderMask)) & kTopBit;
+  }
+
+  inline void setBit(const std::size_t bit_num) const {
+    data_array_[bit_num >> kHigherOrderShift].fetch_or(
+        kTopBit >> (bit_num & kLowerOrderMask), std::memory_order_relaxed);
+  }
+
+  inline std::size_t firstOne(std::size_t position = 0) const {
+    DCHECK_LT(position, num_bits_);
+
+    const std::size_t position_index = position >> kHigherOrderShift;
+    const std::size_t data_value =
+        data_array_[position_index].load(std::memory_order_relaxed)
+            & (std::numeric_limits<std::size_t>::max() >> (position & kLowerOrderMask));
+    if (data_value) {
+      return (position & ~kLowerOrderMask) | leading_zero_count<std::size_t>(data_value);
+    }
+
+    for (std::size_t array_idx = position_index + 1;
+         array_idx < data_array_size_;
+         ++array_idx) {
+      const std::size_t data_value =
+          data_array_[array_idx].load(std::memory_order_relaxed);
+      if (data_value) {
+        return (array_idx << kHigherOrderShift) | leading_zero_count<std::size_t>(data_value);
+      }
+    }
+
+    return num_bits_;
+  }
+
+  inline std::size_t nextOne(const std::size_t position) const {
+    const std::size_t search_pos = position + 1;
+    return search_pos >= num_bits_ ? num_bits_ : firstOne(search_pos);
+  }
+
+  inline std::size_t onesCount() const {
+    std::size_t count = 0;
+    for (std::size_t array_idx = 0;
+         array_idx < data_array_size_;
+         ++array_idx) {
+      count += population_count<std::size_t>(
+          data_array_[array_idx].load(std::memory_order_relaxed));
+    }
+    return count;
+  }
+
+  inline std::size_t onesCount(const std::size_t start_position,
+                               const std::size_t end_position) const {
+    DCHECK_LE(start_position, end_position);
+    DCHECK_LT(start_position, num_bits_);
+    DCHECK_LE(end_position, num_bits_);
+
+    const std::size_t start_index = start_position >> kHigherOrderShift;
+    const std::size_t end_index = end_position >> kHigherOrderShift;
+    if (start_index == end_index) {
+      const std::size_t data_value =
+          data_array_[start_index].load(std::memory_order_relaxed)
+              & (std::numeric_limits<std::size_t>::max() >> (start_position & kLowerOrderMask))
+              &  ~(std::numeric_limits<std::size_t>::max() >> (end_position & kLowerOrderMask));
+      return population_count<std::size_t>(data_value);
+    } else {
+      const std::size_t first_data =
+          data_array_[start_index].load(std::memory_order_relaxed)
+              & (std::numeric_limits<std::size_t>::max() >> (start_position & kLowerOrderMask));
+      std::size_t count = population_count<std::size_t>(first_data);
+
+      for (std::size_t array_idx = start_index + 1;
+           array_idx < end_index;
+           ++array_idx) {
+        count += population_count<std::size_t>(
+            data_array_[array_idx].load(std::memory_order_relaxed));
+      }
+
+      const std::size_t last_offset = end_position & kLowerOrderMask;
+      if (last_offset != 0) {
+        const std::size_t last_data =
+            data_array_[end_index].load(std::memory_order_relaxed)
+                &  ~(std::numeric_limits<std::size_t>::max() >> last_offset);
+        count += population_count<std::size_t>(last_data);
+      }
+
+      return count;
+    }
+  }
+
+ private:
+  typedef std::atomic<std::size_t> DataType;
+  static constexpr std::size_t kDataSize = sizeof(DataType);
+
+  // This works as long as the bit-width of size_t is power of 2:
+  static constexpr std::size_t kLowerOrderMask = (sizeof(std::size_t) << 3) - 1;
+  // This works for 32-bit or 64-bit size_t:
+  static constexpr std::size_t kHigherOrderShift = sizeof(std::size_t) == 4 ? 5 : 6;
+
+  static constexpr std::size_t kOne = static_cast<std::size_t>(1);
+  static constexpr std::size_t kTopBit = kOne << kLowerOrderMask;
+
+  const bool owned_;
+  const std::size_t num_bits_;
+  DataType *data_array_;
+  const std::size_t data_array_size_;
+
+  DISALLOW_COPY_AND_ASSIGN(ConcurrentBitVector);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_CONCURRENT_BIT_VECTOR_HPP_