You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/31 23:38:12 UTC

[6/8] incubator-quickstep git commit: Initial commit.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 591e3a1..44803fc 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -33,7 +33,9 @@
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTablePool.hpp"
 #include "storage/PartitionedHashTablePool.hpp"
+#include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/ConcurrentBitVector.hpp"
 #include "utility/Macros.hpp"
 
 #include "gflags/gflags.h"
@@ -43,9 +45,11 @@ namespace quickstep {
 class AggregateFunction;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
+class ColumnVectorsValueAccessor;
 class InsertDestination;
 class LIPFilterAdaptiveProber;
 class StorageManager;
+class TupleIdSequence;
 
 DECLARE_int32(num_aggregation_partitions);
 DECLARE_int32(partition_aggregation_num_groups_threshold);
@@ -166,127 +170,99 @@ class AggregationOperationState {
    *        the block.
    **/
   void aggregateBlock(const block_id input_block,
-                      LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
+                      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr);
 
   /**
    * @brief Generate the final results for the aggregates managed by this
    *        AggregationOperationState and write them out to StorageBlock(s).
    *
+   * @param partition_id The partition id of this finalize operation.
    * @param output_destination An InsertDestination where the finalized output
    *        tuple(s) from this aggregate are to be written.
    **/
-  void finalizeAggregate(InsertDestination *output_destination);
-
-  /**
-   * @brief Destroy the payloads in the aggregation hash tables.
-   **/
-  void destroyAggregationHashTablePayload();
-
-  /**
-   * @brief Generate the final results for the aggregates managed by this
-   *        AggregationOperationState and write them out to StorageBlock(s).
-   *        In this implementation, each thread picks a hash table belonging to
-   *        a partition and writes its values to StorageBlock(s). There is no
-   *        need to merge multiple hash tables in one, because there is no
-   *        overlap in the keys across two hash tables.
-   *
-   * @param partition_id The ID of the partition for which finalize is being
-   *        performed.
-   * @param output_destination An InsertDestination where the finalized output
-   *        tuple(s) from this aggregate are to be written.
-   **/
-  void finalizeAggregatePartitioned(
-      const std::size_t partition_id, InsertDestination *output_destination);
-
-  static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
-                                     AggregationStateHashTableBase *dst);
-
-  bool isAggregatePartitioned() const {
-    return is_aggregate_partitioned_;
-  }
+  void finalizeAggregate(const std::size_t partition_id,
+                         InsertDestination *output_destination);
 
   /**
    * @brief Get the number of partitions to be used for the aggregation.
    *        For non-partitioned aggregations, we return 1.
    **/
-  std::size_t getNumPartitions() const {
-    return is_aggregate_partitioned_
-               ? partitioned_group_by_hashtable_pool_->getNumPartitions()
-               : 1;
-  }
+  std::size_t getNumPartitions() const;
 
-  int dflag;
+  std::size_t getNumInitializationPartitions() const;
+
+  void initializeState(const std::size_t partition_id);
 
  private:
-  // Merge locally (per storage block) aggregated states with global aggregation
-  // states.
-  void mergeSingleState(
-      const std::vector<std::unique_ptr<AggregationState>> &local_state);
+  bool checkAggregatePartitioned(
+      const std::size_t estimated_num_groups,
+      const std::vector<bool> &is_distinct,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const std::vector<const AggregateFunction *> &aggregate_functions) const;
 
   // Aggregate on input block.
   void aggregateBlockSingleState(const block_id input_block);
   void aggregateBlockHashTable(const block_id input_block,
                                LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
 
-  void finalizeSingleState(InsertDestination *output_destination);
-  void finalizeHashTable(InsertDestination *output_destination);
+  // Merge locally (per storage block) aggregated states with global aggregation
+  // states.
+  void mergeSingleState(
+      const std::vector<std::unique_ptr<AggregationState>> &local_state);
+  void mergeGroupByHashTables(AggregationStateHashTableBase *src,
+                              AggregationStateHashTableBase *dst) const;
 
-  bool checkAggregatePartitioned(
-      const std::size_t estimated_num_groups,
-      const std::vector<bool> &is_distinct,
-      const std::vector<std::unique_ptr<const Scalar>> &group_by,
-      const std::vector<const AggregateFunction *> &aggregate_functions) const {
-    // If there's no aggregation, return false.
-    if (aggregate_functions.empty()) {
-      return false;
-    }
-    // Check if there's a distinct operation involved in any aggregate, if so
-    // the aggregate can't be partitioned.
-    for (auto distinct : is_distinct) {
-      if (distinct) {
-        return false;
-      }
-    }
-    // There's no distinct aggregation involved, Check if there's at least one
-    // GROUP BY operation.
-    if (group_by.empty()) {
-      return false;
-    }
-    // There are GROUP BYs without DISTINCT. Check if the estimated number of
-    // groups is large enough to warrant a partitioned aggregation.
-    return estimated_num_groups >
-           static_cast<std::size_t>(
-               FLAGS_partition_aggregation_num_groups_threshold);
-  }
+  // Finalize the aggregation results into output_destination.
+  void finalizeSingleState(InsertDestination *output_destination);
+  void finalizeHashTable(const std::size_t partition_id,
+                         InsertDestination *output_destination);
+
+  // Specialized implementations for aggregateBlockHashTable.
+  void aggregateBlockHashTableImplCollisionFree(ValueAccessor *accessor,
+                                                ColumnVectorsValueAccessor *aux_accessor);
+  void aggregateBlockHashTableImplPartitioned(ValueAccessor *accessor,
+                                              ColumnVectorsValueAccessor *aux_accessor);
+  void aggregateBlockHashTableImplThreadPrivate(ValueAccessor *accessor,
+                                                ColumnVectorsValueAccessor *aux_accessor);
+
+  // Specialized implementations for finalizeHashTable.
+  void finalizeHashTableImplCollisionFree(const std::size_t partition_id,
+                                          InsertDestination *output_destination);
+  void finalizeHashTableImplPartitioned(const std::size_t partition_id,
+                                        InsertDestination *output_destination);
+  void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
 
   // Common state for all aggregates in this operation: the input relation, the
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
 
+  // Whether the aggregation is collision free or not.
+  bool is_aggregate_collision_free_;
+
   // Whether the aggregation is partitioned or not.
-  const bool is_aggregate_partitioned_;
+  bool is_aggregate_partitioned_;
 
   std::unique_ptr<const Predicate> predicate_;
-  std::vector<std::unique_ptr<const Scalar>> group_by_list_;
 
   // Each individual aggregate in this operation has an AggregationHandle and
-  // some number of Scalar arguments.
-  std::vector<AggregationHandle *> handles_;
-  std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
+  // zero (indicated by -1) or one argument.
+  std::vector<std::unique_ptr<AggregationHandle>> handles_;
 
   // For each aggregate, whether DISTINCT should be applied to the aggregate's
   // arguments.
   std::vector<bool> is_distinct_;
 
-  // Hash table for obtaining distinct (i.e. unique) arguments.
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
-      distinctify_hashtables_;
+  // Non-trivial group-by/argument expressions that need to be evaluated.
+  std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  // If all an aggregate's argument expressions are simply attributes in
-  // 'input_relation_', then this caches the attribute IDs of those arguments.
-  std::vector<std::vector<attribute_id>> arguments_as_attributes_;
-#endif
+  std::vector<attribute_id> group_by_key_ids_;
+  std::vector<std::vector<attribute_id>> argument_ids_;
+
+  std::vector<const Type *> group_by_types_;
+
+  // Hash table for obtaining distinct (i.e. unique) arguments.
+//  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+//      distinctify_hashtables_;
 
   // Per-aggregate global states for aggregation without GROUP BY.
   std::vector<std::unique_ptr<AggregationState>> single_states_;
@@ -303,6 +279,8 @@ class AggregationOperationState {
 
   std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
 
+  std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_;
+
   StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index fddea1f..c7bc28f 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -165,6 +165,9 @@ if(QUICKSTEP_HAVE_BITWEAVING)
               bitweaving/BitWeavingVIndexSubBlock.hpp)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
+add_library(quickstep_storage_CollisionFreeAggregationStateHashTable
+            CollisionFreeAggregationStateHashTable.cpp
+            CollisionFreeAggregationStateHashTable.hpp)
 add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp)
 add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp)
 add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -194,9 +197,6 @@ if (ENABLE_DISTRIBUTED)
 endif()
 
 add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
-add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp)
-add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp)
-add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp)
 add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
@@ -225,6 +225,9 @@ add_library(quickstep_storage_InsertDestination_proto
 add_library(quickstep_storage_LinearOpenAddressingHashTable
             ../empty_src.cpp
             LinearOpenAddressingHashTable.hpp)
+add_library(quickstep_storage_PackedPayloadAggregationStateHashTable
+            PackedPayloadAggregationStateHashTable.cpp
+            PackedPayloadAggregationStateHashTable.hpp)
 add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
 add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
 add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
@@ -276,22 +279,25 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_storage_AggregationOperationState_proto
-                      quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTablePool
                       quickstep_storage_InsertDestination
                       quickstep_storage_PartitionedHashTablePool
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
+                      quickstep_storage_SubBlocksReference
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_types_containers_Tuple
                       quickstep_utility_Macros
+                      quickstep_utility_ConcurrentBitVector
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber)
 target_link_libraries(quickstep_storage_AggregationOperationState_proto
                       quickstep_expressions_Expressions_proto
@@ -429,6 +435,24 @@ if(QUICKSTEP_HAVE_BITWEAVING)
                         quickstep_utility_Macros)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
+target_link_libraries(quickstep_storage_CollisionFreeAggregationStateHashTable
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_ConcurrentBitVector
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_ColumnStoreUtil
                       quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogRelationSchema
@@ -626,52 +650,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy
                       quickstep_threading_SpinMutex
                       quickstep_threading_SpinSharedMutex
                       quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTable
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_StorageBlob
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageConstants
-                      quickstep_storage_StorageManager
-                      quickstep_storage_TupleReference
-                      quickstep_storage_ValueAccessor
-                      quickstep_storage_ValueAccessorUtil
-                      quickstep_threading_SpinMutex
-                      quickstep_threading_SpinSharedMutex
-                      quickstep_types_Type
-                      quickstep_types_TypedValue
-                      quickstep_utility_HashPair
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTableFactory
-                      glog
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastSeparateChainingHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTable_proto
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
-                      quickstep_storage_LinearOpenAddressingHashTable
-                      quickstep_storage_SeparateChainingHashTable
-                      quickstep_storage_SimpleScalarSeparateChainingHashTable
-                      quickstep_storage_TupleReference
-                      quickstep_types_TypeFactory
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
-                      quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableKeyManager
-                      quickstep_storage_StorageBlob
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageConstants
-                      quickstep_storage_StorageManager
-                      quickstep_threading_SpinSharedMutex
-                      quickstep_types_Type
-                      quickstep_types_TypedValue
-                      quickstep_utility_Alignment
-                      quickstep_utility_Macros
-                      quickstep_utility_PrimeNumber)
 target_link_libraries(quickstep_storage_FileManager
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
@@ -734,10 +712,12 @@ target_link_libraries(quickstep_storage_HashTable_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_storage_HashTableFactory
                       glog
+                      quickstep_storage_CollisionFreeAggregationStateHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase
                       quickstep_storage_LinearOpenAddressingHashTable
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_storage_SeparateChainingHashTable
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
                       quickstep_storage_TupleReference
@@ -757,9 +737,8 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
 target_link_libraries(quickstep_storage_HashTablePool
                       glog
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastHashTableFactory
                       quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableFactory
                       quickstep_threading_SpinMutex
                       quickstep_utility_Macros
                       quickstep_utility_StringUtil)
@@ -817,12 +796,32 @@ target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_utility_Alignment
                       quickstep_utility_Macros
                       quickstep_utility_PrimeNumber)
+target_link_libraries(quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableKeyManager
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_storage_TupleReference
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_threading_SpinMutex
+                      quickstep_threading_SpinSharedMutex
+                      quickstep_types_Type
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_Alignment
+                      quickstep_utility_HashPair
+                      quickstep_utility_Macros
+                      quickstep_utility_PrimeNumber)
 target_link_libraries(quickstep_storage_PartitionedHashTablePool
                       glog
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastHashTableFactory
                       quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableFactory
                       quickstep_utility_Macros
                       quickstep_utility_StringUtil)
 target_link_libraries(quickstep_storage_PreloaderThread
@@ -933,7 +932,6 @@ target_link_libraries(quickstep_storage_StorageBlock
                       glog
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_storage_BasicColumnStoreTupleStorageSubBlock
@@ -942,7 +940,6 @@ target_link_libraries(quickstep_storage_StorageBlock
                       quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
                       quickstep_storage_CompressedPackedRowStoreTupleStorageSubBlock
                       quickstep_storage_CountedReference
-                      quickstep_storage_HashTableBase
                       quickstep_storage_IndexSubBlock
                       quickstep_storage_InsertDestinationInterface
                       quickstep_storage_SMAIndexSubBlock
@@ -1111,6 +1108,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_BasicColumnStoreValueAccessor
                       quickstep_storage_BloomFilterIndexSubBlock
                       quickstep_storage_CSBTreeIndexSubBlock
+                      quickstep_storage_CollisionFreeAggregationStateHashTable
                       quickstep_storage_ColumnStoreUtil
                       quickstep_storage_CompressedBlockBuilder
                       quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -1123,9 +1121,6 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_EvictionPolicy
                       quickstep_storage_FileManager
                       quickstep_storage_FileManagerLocal
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastHashTableFactory
-                      quickstep_storage_FastSeparateChainingHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase
@@ -1139,6 +1134,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_InsertDestination_proto
                       quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_storage_PartitionedHashTablePool
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_storage_PreloaderThread
                       quickstep_storage_SMAIndexSubBlock
                       quickstep_storage_SeparateChainingHashTable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/CollisionFreeAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp b/storage/CollisionFreeAggregationStateHashTable.cpp
new file mode 100644
index 0000000..15d4dfe
--- /dev/null
+++ b/storage/CollisionFreeAggregationStateHashTable.cpp
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/CollisionFreeAggregationStateHashTable.hpp"
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+
+namespace quickstep {
+
+CollisionFreeAggregationStateHashTable::CollisionFreeAggregationStateHashTable(
+    const std::vector<const Type *> &key_types,
+    const std::size_t num_entries,
+    const std::vector<AggregationHandle *> &handles,
+    StorageManager *storage_manager)
+    : key_type_(key_types.front()),
+      num_entries_(num_entries),
+      num_handles_(handles.size()),
+      handles_(handles),
+      num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)),
+      storage_manager_(storage_manager) {
+  CHECK_EQ(1u, key_types.size());
+  DCHECK_GT(num_entries, 0u);
+
+  std::map<std::string, std::size_t> memory_offsets;
+  std::size_t required_memory = 0;
+
+  memory_offsets.emplace("existence_map", required_memory);
+  required_memory +=
+      CacheLineAlignedBytes(ConcurrentBitVector::BytesNeeded(num_entries));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    const AggregationHandle *handle = handles_[i];
+    const std::vector<const Type *> argument_types = handle->getArgumentTypes();
+
+    std::size_t state_size = 0;
+    switch (handle->getAggregationID()) {
+      case AggregationID::kCount: {
+        state_size = sizeof(std::atomic<std::size_t>);
+        break;
+      }
+      case AggregationID::kSum: {
+        CHECK_EQ(1u, argument_types.size());
+        switch (argument_types.front()->getTypeID()) {
+          case TypeID::kInt:  // Fall through
+          case TypeID::kLong:
+            state_size = sizeof(std::atomic<std::int64_t>);
+            break;
+          case TypeID::kFloat:  // Fall through
+          case TypeID::kDouble:
+            state_size = sizeof(std::atomic<double>);
+            break;
+          default:
+            LOG(FATAL) << "Not implemented";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Not implemented";
+    }
+
+    memory_offsets.emplace(std::string("state") + std::to_string(i),
+                           required_memory);
+    required_memory += CacheLineAlignedBytes(state_size * num_entries);
+  }
+
+  const std::size_t num_storage_slots =
+      storage_manager_->SlotsNeededForBytes(required_memory);
+
+  const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
+  blob_ = storage_manager_->getBlobMutable(blob_id);
+
+  void *memory_start = blob_->getMemoryMutable();
+  existence_map_.reset(new ConcurrentBitVector(
+      reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"),
+      num_entries,
+      false /* initialize */));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    vec_tables_.emplace_back(
+        reinterpret_cast<char *>(memory_start) +
+            memory_offsets.at(std::string("state") + std::to_string(i)));
+  }
+
+  memory_size_ = required_memory;
+  num_init_partitions_ = std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL);
+}
+
+CollisionFreeAggregationStateHashTable::~CollisionFreeAggregationStateHashTable() {
+  const block_id blob_id = blob_->getID();
+  blob_.release();
+  storage_manager_->deleteBlockOrBlobFile(blob_id);
+}
+
+void CollisionFreeAggregationStateHashTable::destroyPayload() {
+}
+
+bool CollisionFreeAggregationStateHashTable::upsertValueAccessor(
+    const std::vector<std::vector<attribute_id>> &argument_ids,
+    const std::vector<attribute_id> &key_attr_ids,
+    ValueAccessor *base_accessor,
+    ColumnVectorsValueAccessor *aux_accessor) {
+  DCHECK_EQ(1u, key_attr_ids.size());
+
+  const attribute_id key_attr_id = key_attr_ids.front();
+  const bool is_key_nullable = key_type_->isNullable();
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    DCHECK_LE(argument_ids[i].size(), 1u);
+
+    const attribute_id argument_id =
+        argument_ids[i].empty() ? kInvalidAttributeID : argument_ids[i].front();
+
+    const AggregationHandle *handle = handles_[i];
+    const auto &argument_types = handle->getArgumentTypes();
+
+    const Type *argument_type;
+    bool is_argument_nullable;
+    if (argument_types.empty()) {
+      argument_type = nullptr;
+      is_argument_nullable = false;
+    } else {
+      argument_type = argument_types.front();
+      is_argument_nullable = argument_type->isNullable();
+    }
+
+    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+        base_accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      if (key_attr_id >= 0) {
+        if (argument_id >= 0) {
+          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+                                                   is_argument_nullable,
+                                                   key_type_,
+                                                   argument_type,
+                                                   handle->getAggregationID(),
+                                                   key_attr_id,
+                                                   argument_id,
+                                                   vec_tables_[i],
+                                                   accessor,
+                                                   accessor);
+        } else {
+          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+                                                  is_argument_nullable,
+                                                  key_type_,
+                                                  argument_type,
+                                                  handle->getAggregationID(),
+                                                  key_attr_id,
+                                                  -(argument_id+2),
+                                                  vec_tables_[i],
+                                                  accessor,
+                                                  aux_accessor);
+        }
+      } else {
+        if (argument_id >= 0) {
+          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+                                                  is_argument_nullable,
+                                                  key_type_,
+                                                  argument_type,
+                                                  handle->getAggregationID(),
+                                                  -(key_attr_id+2),
+                                                  argument_id,
+                                                  vec_tables_[i],
+                                                  aux_accessor,
+                                                  accessor);
+        } else {
+          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+                                                   is_argument_nullable,
+                                                   key_type_,
+                                                   argument_type,
+                                                   handle->getAggregationID(),
+                                                   -(key_attr_id+2),
+                                                   -(argument_id+2),
+                                                   vec_tables_[i],
+                                                   aux_accessor,
+                                                   aux_accessor);
+        }
+      }
+    });
+  }
+  return true;
+}
+
+void CollisionFreeAggregationStateHashTable::finalizeKey(
+    const std::size_t partition_id,
+    NativeColumnVector *output_cv) const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  switch (key_type_->getTypeID()) {
+    case TypeID::kInt:
+      finalizeKeyInternal<int>(start_position, end_position, output_cv);
+      return;
+    case TypeID::kLong:
+      finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+void CollisionFreeAggregationStateHashTable::finalizeState(
+    const std::size_t partition_id,
+    std::size_t handle_id,
+    NativeColumnVector *output_cv) const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  const AggregationHandle *handle = handles_[handle_id];
+  const auto &argument_types = handle->getArgumentTypes();
+  const Type *argument_type =
+      argument_types.empty() ? nullptr : argument_types.front();
+
+  finalizeStateDispatchHelper(handle->getAggregationID(),
+                              argument_type,
+                              vec_tables_[handle_id],
+                              start_position,
+                              end_position,
+                              output_cv);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/CollisionFreeAggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.hpp b/storage/CollisionFreeAggregationStateHashTable.hpp
new file mode 100644
index 0000000..f3edfd8
--- /dev/null
+++ b/storage/CollisionFreeAggregationStateHashTable.hpp
@@ -0,0 +1,568 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "utility/ConcurrentBitVector.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ColumnVectorsValueAccessor;
+class StorageMnager;
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+class CollisionFreeAggregationStateHashTable : public AggregationStateHashTableBase {
+ public:
+  CollisionFreeAggregationStateHashTable(
+      const std::vector<const Type *> &key_types,
+      const std::size_t num_entries,
+      const std::vector<AggregationHandle *> &handles,
+      StorageManager *storage_manager);
+
+  ~CollisionFreeAggregationStateHashTable() override;
+
+  void destroyPayload() override;
+
+  inline std::size_t getNumInitializationPartitions() const {
+    return num_init_partitions_;
+  }
+
+  inline std::size_t getNumFinalizationPartitions() const {
+    return num_finalize_partitions_;
+  }
+
+  inline std::size_t getNumTuplesInPartition(
+      const std::size_t partition_id) const {
+    const std::size_t start_position =
+        calculatePartitionStartPosition(partition_id);
+    const std::size_t end_position =
+        calculatePartitionEndPosition(partition_id);
+    return existence_map_->onesCount(start_position, end_position);
+  }
+
+  inline void initialize(const std::size_t partition_id) {
+    const std::size_t memory_segment_size =
+        (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_;
+    const std::size_t memory_start = memory_segment_size * partition_id;
+    std::memset(reinterpret_cast<char *>(blob_->getMemoryMutable()) + memory_start,
+                0,
+                std::min(memory_segment_size, memory_size_ - memory_start));
+  }
+
+  bool upsertValueAccessor(
+      const std::vector<std::vector<attribute_id>> &argument_ids,
+      const std::vector<attribute_id> &key_attr_ids,
+      ValueAccessor *base_accessor,
+      ColumnVectorsValueAccessor *aux_accessor = nullptr) override;
+
+  void finalizeKey(const std::size_t partition_id,
+                   NativeColumnVector *output_cv) const;
+
+  void finalizeState(const std::size_t partition_id,
+                     std::size_t handle_id,
+                     NativeColumnVector *output_cv) const;
+
+ private:
+  inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
+    return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
+  }
+
+  inline std::size_t calculatePartitionLength() const {
+    const std::size_t partition_length =
+        (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_;
+    DCHECK_GE(partition_length, 0u);
+    return partition_length;
+  }
+
+  inline std::size_t calculatePartitionStartPosition(
+      const std::size_t partition_id) const {
+    return calculatePartitionLength() * partition_id;
+  }
+
+  inline std::size_t calculatePartitionEndPosition(
+      const std::size_t partition_id) const {
+    return std::min(calculatePartitionLength() * (partition_id + 1),
+                    num_entries_);
+  }
+
+  template <bool use_two_accessors, typename ...ArgTypes>
+  inline void upsertValueAccessorDispatchHelper(
+      const bool is_key_nullable,
+      const bool is_argument_nullable,
+      ArgTypes &&...args);
+
+  template <bool ...bool_values, typename ...ArgTypes>
+  inline void upsertValueAccessorDispatchHelper(
+      const Type *key_type,
+      ArgTypes &&...args);
+
+  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename ...ArgTypes>
+  inline void upsertValueAccessorDispatchHelper(
+      const Type *argument_type,
+      const AggregationID agg_id,
+      ArgTypes &&...args);
+
+  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+  inline void upsertValueAccessorCountHelper(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      void *vec_table,
+      KeyValueAccessorT *key_accessor,
+      ArgumentValueAccessorT *argument_accessor);
+
+  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+  inline void upsertValueAccessorSumHelper(
+      const Type *argument_type,
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      void *vec_table,
+      KeyValueAccessorT *key_accessor,
+      ArgumentValueAccessorT *argument_accessor);
+
+  template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
+  inline void upsertValueAccessorCountNullary(
+      const attribute_id key_attr_id,
+      std::atomic<std::size_t> *vec_table,
+      KeyValueAccessorT *key_accessor);
+
+  template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
+            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+  inline void upsertValueAccessorCountUnary(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      std::atomic<std::size_t> *vec_table,
+      KeyValueAccessorT *key_accessor,
+      ArgumentValueAccessorT *argument_accessor);
+
+  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename ArgumentT, typename StateT,
+            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+  inline void upsertValueAccessorIntegerSum(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      std::atomic<StateT> *vec_table,
+      KeyValueAccessorT *key_accessor,
+      ArgumentValueAccessorT *argument_accessor);
+
+  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename ArgumentT, typename StateT,
+            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+  inline void upsertValueAccessorGenericSum(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      std::atomic<StateT> *vec_table,
+      KeyValueAccessorT *key_accessor,
+      ArgumentValueAccessorT *argument_accessor);
+
+  template <typename KeyT>
+  inline void finalizeKeyInternal(const std::size_t start_position,
+                                  const std::size_t end_position,
+                                  NativeColumnVector *output_cv) const {
+    std::size_t loc = start_position - 1;
+    while ((loc = existence_map_->nextOne(loc)) < end_position) {
+      *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
+    }
+  }
+
+  template <typename ...ArgTypes>
+  inline void finalizeStateDispatchHelper(const AggregationID agg_id,
+                                          const Type *argument_type,
+                                          const void *vec_table,
+                                          ArgTypes &&...args) const {
+    switch (agg_id) {
+       case AggregationID::kCount:
+         finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table),
+                            std::forward<ArgTypes>(args)...);
+         return;
+       case AggregationID::kSum:
+         finalizeStateSumHelper(argument_type,
+                                vec_table,
+                                std::forward<ArgTypes>(args)...);
+         return;
+       default:
+         LOG(FATAL) << "Not supported";
+    }
+  }
+
+  template <typename ...ArgTypes>
+  inline void finalizeStateSumHelper(const Type *argument_type,
+                                     const void *vec_table,
+                                     ArgTypes &&...args) const {
+    DCHECK(argument_type != nullptr);
+
+    switch (argument_type->getTypeID()) {
+      case TypeID::kInt:    // Fall through
+      case TypeID::kLong:
+        finalizeStateSum<std::int64_t>(
+            static_cast<const std::atomic<std::int64_t> *>(vec_table),
+            std::forward<ArgTypes>(args)...);
+        return;
+      case TypeID::kFloat:  // Fall through
+      case TypeID::kDouble:
+        finalizeStateSum<double>(
+            static_cast<const std::atomic<double> *>(vec_table),
+            std::forward<ArgTypes>(args)...);
+        return;
+      default:
+        LOG(FATAL) << "Not supported";
+    }
+  }
+
+  inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table,
+                                 const std::size_t start_position,
+                                 const std::size_t end_position,
+                                 NativeColumnVector *output_cv) const {
+    std::size_t loc = start_position - 1;
+    while ((loc = existence_map_->nextOne(loc)) < end_position) {
+      *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) =
+          vec_table[loc].load(std::memory_order_relaxed);
+    }
+  }
+
+  template <typename ResultT, typename StateT>
+  inline void finalizeStateSum(const std::atomic<StateT> *vec_table,
+                               const std::size_t start_position,
+                               const std::size_t end_position,
+                               NativeColumnVector *output_cv) const {
+    std::size_t loc = start_position - 1;
+    while ((loc = existence_map_->nextOne(loc)) < end_position) {
+      *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) =
+          vec_table[loc].load(std::memory_order_relaxed);
+    }
+  }
+
+  const Type *key_type_;
+  const std::size_t num_entries_;
+
+  const std::size_t num_handles_;
+  const std::vector<AggregationHandle *> handles_;
+
+  std::unique_ptr<ConcurrentBitVector> existence_map_;
+  std::vector<void *> vec_tables_;
+
+  const std::size_t num_finalize_partitions_;
+
+  StorageManager *storage_manager_;
+  MutableBlobReference blob_;
+
+  std::size_t memory_size_;
+  std::size_t num_init_partitions_;
+
+  DISALLOW_COPY_AND_ASSIGN(CollisionFreeAggregationStateHashTable);
+};
+
+// ----------------------------------------------------------------------------
+// Implementations of template methods follow.
+
+template <bool use_two_accessors, typename ...ArgTypes>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorDispatchHelper(
+        const bool is_key_nullable,
+        const bool is_argument_nullable,
+        ArgTypes &&...args) {
+  if (is_key_nullable) {
+    if (is_argument_nullable) {
+      upsertValueAccessorDispatchHelper<use_two_accessors, true, true>(
+          std::forward<ArgTypes>(args)...);
+    } else {
+      upsertValueAccessorDispatchHelper<use_two_accessors, true, false>(
+          std::forward<ArgTypes>(args)...);
+    }
+  } else {
+    if (is_argument_nullable) {
+      upsertValueAccessorDispatchHelper<use_two_accessors, false, true>(
+          std::forward<ArgTypes>(args)...);
+    } else {
+      upsertValueAccessorDispatchHelper<use_two_accessors, false, false>(
+          std::forward<ArgTypes>(args)...);
+    }
+  }
+}
+
+template <bool ...bool_values, typename ...ArgTypes>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorDispatchHelper(
+        const Type *key_type,
+        ArgTypes &&...args) {
+  switch (key_type->getTypeID()) {
+    case TypeID::kInt:
+      upsertValueAccessorDispatchHelper<bool_values..., int>(
+          std::forward<ArgTypes>(args)...);
+      return;
+    case TypeID::kLong:
+      upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>(
+          std::forward<ArgTypes>(args)...);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename ...ArgTypes>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorDispatchHelper(
+        const Type *argument_type,
+        const AggregationID agg_id,
+        ArgTypes &&...args) {
+  switch (agg_id) {
+     case AggregationID::kCount:
+       upsertValueAccessorCountHelper<
+           use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
+               std::forward<ArgTypes>(args)...);
+       return;
+     case AggregationID::kSum:
+       upsertValueAccessorSumHelper<
+           use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
+               argument_type, std::forward<ArgTypes>(args)...);
+       return;
+     default:
+       LOG(FATAL) << "Not supported";
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorCountHelper(
+        const attribute_id key_attr_id,
+        const attribute_id argument_id,
+        void *vec_table,
+        KeyValueAccessorT *key_accessor,
+        ArgumentValueAccessorT *argument_accessor) {
+  DCHECK_GE(key_attr_id, 0u);
+
+  if (is_argument_nullable && argument_id != kInvalidAttributeID) {
+    upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>(
+        key_attr_id,
+        argument_id,
+        static_cast<std::atomic<std::size_t> *>(vec_table),
+        key_accessor,
+        argument_accessor);
+    return;
+  } else {
+    upsertValueAccessorCountNullary<is_key_nullable, KeyT>(
+        key_attr_id,
+        static_cast<std::atomic<std::size_t> *>(vec_table),
+        key_accessor);
+    return;
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorSumHelper(
+        const Type *argument_type,
+        const attribute_id key_attr_id,
+        const attribute_id argument_id,
+        void *vec_table,
+        KeyValueAccessorT *key_accessor,
+        ArgumentValueAccessorT *argument_accessor) {
+  DCHECK_GE(key_attr_id, 0u);
+  DCHECK_GE(argument_id, 0u);
+  DCHECK(argument_type != nullptr);
+
+  switch (argument_type->getTypeID()) {
+    case TypeID::kInt:
+      upsertValueAccessorIntegerSum<
+          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<std::int64_t> *>(vec_table),
+              key_accessor,
+              argument_accessor);
+      return;
+    case TypeID::kLong:
+      upsertValueAccessorIntegerSum<
+          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<std::int64_t> *>(vec_table),
+              key_accessor,
+              argument_accessor);
+      return;
+    case TypeID::kFloat:
+      upsertValueAccessorGenericSum<
+          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<double> *>(vec_table),
+              key_accessor,
+              argument_accessor);
+      return;
+    case TypeID::kDouble:
+      upsertValueAccessorGenericSum<
+          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<double> *>(vec_table),
+              key_accessor,
+              argument_accessor);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorCountNullary(
+        const attribute_id key_attr_id,
+        std::atomic<std::size_t> *vec_table,
+        ValueAccessorT *accessor) {
+  accessor->beginIteration();
+  while (accessor->next()) {
+    const KeyT *key = static_cast<const KeyT *>(
+        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
+    existence_map_->setBit(loc);
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
+          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorCountUnary(
+        const attribute_id key_attr_id,
+        const attribute_id argument_id,
+        std::atomic<std::size_t> *vec_table,
+        KeyValueAccessorT *key_accessor,
+        ArgumentValueAccessorT *argument_accessor) {
+  key_accessor->beginIteration();
+  while (key_accessor->next()) {
+    if (use_two_accessors) {
+      argument_accessor->next();
+    }
+    const KeyT *key = static_cast<const KeyT *>(
+        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    existence_map_->setBit(loc);
+    if (argument_accessor->getUntypedValue(argument_id) == nullptr) {
+      continue;
+    }
+    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename ArgumentT, typename StateT,
+          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorIntegerSum(
+        const attribute_id key_attr_id,
+        const attribute_id argument_id,
+        std::atomic<StateT> *vec_table,
+        KeyValueAccessorT *key_accessor,
+        ArgumentValueAccessorT *argument_accessor) {
+  key_accessor->beginIteration();
+  while (key_accessor->next()) {
+    if (use_two_accessors) {
+      argument_accessor->next();
+    }
+    const KeyT *key = static_cast<const KeyT *>(
+        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    existence_map_->setBit(loc);
+    const ArgumentT *argument = static_cast<const ArgumentT *>(
+        argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
+    if (is_argument_nullable && argument == nullptr) {
+      continue;
+    }
+    vec_table[loc].fetch_add(*argument, std::memory_order_relaxed);
+  }
+}
+
+template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename ArgumentT, typename StateT,
+          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorGenericSum(
+        const attribute_id key_attr_id,
+        const attribute_id argument_id,
+        std::atomic<StateT> *vec_table,
+        KeyValueAccessorT *key_accessor,
+        ArgumentValueAccessorT *argument_accessor) {
+  key_accessor->beginIteration();
+  while (key_accessor->next()) {
+    if (use_two_accessors) {
+      argument_accessor->next();
+    }
+    const KeyT *key = static_cast<const KeyT *>(
+        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    existence_map_->setBit(loc);
+    const ArgumentT *argument = static_cast<const ArgumentT *>(
+        argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
+    if (is_argument_nullable && argument == nullptr) {
+      continue;
+    }
+    const ArgumentT arg_val = *argument;
+    std::atomic<StateT> &state = vec_table[loc];
+    StateT state_val = state.load(std::memory_order_relaxed);
+    while(!state.compare_exchange_weak(state_val, state_val + arg_val)) {}
+  }
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_