You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/04/09 01:48:54 UTC

[14/14] incubator-quickstep git commit: Thread private numeric aggregation

Thread private numeric aggregation


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/cc1a86fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/cc1a86fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/cc1a86fc

Branch: refs/heads/common-subexpression
Commit: cc1a86fc481230c6f61f710917b3a1194283b97b
Parents: ab3a667
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Sat Apr 8 19:04:12 2017 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Sat Apr 8 20:48:27 2017 -0500

----------------------------------------------------------------------
 query_optimizer/ExecutionGenerator.cpp          |  15 +-
 .../cost_model/StarSchemaSimpleCostModel.cpp    |  51 ++
 .../cost_model/StarSchemaSimpleCostModel.hpp    |   3 +
 storage/AggregationOperationState.cpp           |  57 ++-
 storage/AggregationOperationState.hpp           |   6 +-
 storage/CMakeLists.txt                          |  20 +
 storage/CollisionFreeVectorTable.hpp            |   4 +
 storage/HashTable.proto                         |   1 +
 storage/HashTableBase.hpp                       |   5 +-
 storage/HashTableFactory.hpp                    |  13 +-
 storage/HashTablePool.hpp                       |   4 +
 storage/PackedPayloadHashTable.hpp              |   4 +
 storage/ThreadPrivateNumericHashTable.cpp       |   0
 storage/ThreadPrivateNumericHashTable.hpp       | 483 +++++++++++++++++++
 14 files changed, 650 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 6fec85b..387f26a 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1512,14 +1512,23 @@ void ExecutionGenerator::convertAggregate(
             ->canUseCollisionFreeAggregation(physical_plan,
                                              estimated_num_groups,
                                              &max_num_groups)) {
+      std::cout << "Use collision free\n";
       aggr_state_proto->set_hash_table_impl_type(
           serialization::HashTableImplType::COLLISION_FREE_VECTOR);
       aggr_state_proto->set_estimated_num_entries(max_num_groups);
       use_parallel_initialization = true;
     } else {
-      // Otherwise, use SeparateChaining.
-      aggr_state_proto->set_hash_table_impl_type(
-          serialization::HashTableImplType::SEPARATE_CHAINING);
+      if (cost_model_for_aggregation_->canUseTwoPhaseNumericAggregation(
+              physical_plan, estimated_num_groups)) {
+        std::cout << "Use two phase numeric\n";
+        aggr_state_proto->set_hash_table_impl_type(
+            serialization::HashTableImplType::THREAD_PRIVATE_NUMERIC);
+      } else {
+        // Otherwise, use SeparateChaining.
+        std::cout << "Use normal\n";
+        aggr_state_proto->set_hash_table_impl_type(
+            serialization::HashTableImplType::SEPARATE_CHAINING);
+      }
       aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
     }
   } else {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index fc775c7..d70e156 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -688,6 +688,57 @@ bool StarSchemaSimpleCostModel::canUseCollisionFreeAggregation(
   return true;
 }
 
+bool StarSchemaSimpleCostModel::canUseTwoPhaseNumericAggregation(
+    const physical::AggregatePtr &aggregate,
+    const std::size_t estimated_num_groups) {
+  if (estimated_num_groups >= 1000u) {
+    return false;
+  }
+
+  std::size_t total_key_size = 0;
+  for (const auto &key_expr : aggregate->grouping_expressions()) {
+    const Type &type = key_expr->getValueType();
+    if (type.isVariableLength()) {
+      return false;
+    }
+
+    const std::size_t key_size = type.maximumByteLength();
+    if (!QUICKSTEP_EQUALS_ANY_CONSTANT(key_size, 1u, 2u, 4u, 8u)) {
+      return false;
+    }
+
+    total_key_size += key_size;
+    if (total_key_size > 8u) {
+      return false;
+    }
+  }
+
+  for (const auto &agg_alias : aggregate->aggregate_expressions()) {
+    const E::AggregateFunctionPtr agg_expr =
+        std::static_pointer_cast<const E::AggregateFunction>(agg_alias->expression());
+    if (agg_expr->is_distinct()) {
+      return false;
+    }
+    switch (agg_expr->getAggregate().getAggregationID()) {
+      case AggregationID::kCount:
+        break;
+      case AggregationID::kSum: {
+        DCHECK_EQ(1u, agg_expr->getArguments().size());
+        const auto &argument = agg_expr->getArguments().front();
+        if (!QUICKSTEP_EQUALS_ANY_CONSTANT(argument->getValueType().getTypeID(),
+                                           kInt, kLong, kFloat, kDouble)) {
+          return false;
+        }
+        break;
+      }
+      default:
+        return false;
+    }
+  }
+
+  return true;
+}
+
 }  // namespace cost
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index afb2ef9..aa09c31 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -183,6 +183,9 @@ class StarSchemaSimpleCostModel : public CostModel {
                                       const std::size_t estimated_num_groups,
                                       std::size_t *max_num_groups);
 
+  bool canUseTwoPhaseNumericAggregation(const physical::AggregatePtr &aggregate,
+                                        const std::size_t estimated_num_groups);
+
  private:
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index facc7fa..d6f21bc 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -49,6 +49,7 @@
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
 #include "storage/SubBlocksReference.hpp"
+#include "storage/ThreadPrivateNumericHashTable.hpp"
 #include "storage/TupleIdSequence.hpp"
 #include "storage/TupleStorageSubBlock.hpp"
 #include "storage/ValueAccessor.hpp"
@@ -94,11 +95,15 @@ AggregationOperationState::AggregationOperationState(
                                     !is_distinct_.empty(), std::logical_and<bool>())),
       storage_manager_(storage_manager) {
   if (!group_by.empty()) {
-    if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
-      is_aggregate_collision_free_ = true;
-    } else {
-      is_aggregate_partitioned_ = checkAggregatePartitioned(
-          estimated_num_entries, is_distinct_, group_by, aggregate_functions);
+    switch (hash_table_impl_type) {
+      case HashTableImplType::kCollisionFreeVector:
+        is_aggregate_collision_free_ = true;
+        break;
+      case HashTableImplType::kThreadPrivateNumeric:
+        break;
+      default:
+        is_aggregate_partitioned_ = checkAggregatePartitioned(
+            estimated_num_entries, is_distinct_, group_by, aggregate_functions);
     }
   }
 
@@ -715,7 +720,17 @@ void AggregationOperationState::finalizeHashTable(
     finalizeHashTableImplPartitioned(partition_id, output_destination);
   } else {
     DCHECK_EQ(0u, partition_id);
-    finalizeHashTableImplThreadPrivate(output_destination);
+    DCHECK(group_by_hashtable_pool_ != nullptr);
+    switch (group_by_hashtable_pool_->getHashTableImplType()) {
+      case HashTableImplType::kSeparateChaining:
+        finalizeHashTableImplThreadPrivatePackedPayload(output_destination);
+        break;
+      case HashTableImplType::kThreadPrivateNumeric:
+        finalizeHashTableImplThreadPrivateNumeric(output_destination);
+        break;
+      default:
+        LOG(FATAL) << "Not supported";
+    }
   }
 }
 
@@ -840,7 +855,7 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
-void AggregationOperationState::finalizeHashTableImplThreadPrivate(
+void AggregationOperationState::finalizeHashTableImplThreadPrivatePackedPayload(
     InsertDestination *output_destination) {
   // TODO(harshad) - The merge phase may be slower when each hash table contains
   // large number of entries. We should find ways in which we can perform a
@@ -948,6 +963,34 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
+void AggregationOperationState::finalizeHashTableImplThreadPrivateNumeric(
+    InsertDestination *output_destination) {
+  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+  DCHECK(hash_tables != nullptr);
+  if (hash_tables->empty()) {
+    return;
+  }
+
+  std::unique_ptr<ThreadPrivateNumericHashTable> final_hash_table(
+      static_cast<ThreadPrivateNumericHashTable*>(hash_tables->back().release()));
+  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+    std::unique_ptr<AggregationStateHashTableBase> hash_table(
+        hash_tables->at(i).release());
+    final_hash_table->merge(
+        static_cast<const ThreadPrivateNumericHashTable*>(hash_table.get()));
+    hash_table->destroyPayload();
+  }
+
+//  final_hash_table->print();
+
+  ColumnVectorsValueAccessor complete_result;
+  final_hash_table->finalize(&complete_result);
+  final_hash_table->destroyPayload();
+
+  // Bulk-insert the complete result.
+  output_destination->bulkInsertTuples(&complete_result);
+}
+
 std::size_t AggregationOperationState::getMemoryConsumptionBytes() const {
   std::size_t memory = getMemoryConsumptionBytesHelper(distinctify_hashtables_);
   memory += getMemoryConsumptionBytesHelper(group_by_hashtables_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index e6af494..e666a68 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -256,7 +256,11 @@ class AggregationOperationState {
   void finalizeHashTableImplPartitioned(const std::size_t partition_id,
                                         InsertDestination *output_destination);
 
-  void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
+  void finalizeHashTableImplThreadPrivatePackedPayload(
+      InsertDestination *output_destination);
+
+  void finalizeHashTableImplThreadPrivateNumeric(
+      InsertDestination *output_destination);
 
   std::size_t getMemoryConsumptionBytesHelper(
       const std::vector<std::unique_ptr<AggregationStateHashTableBase>>

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 0a1d484..b971240 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -250,6 +250,9 @@ add_library(quickstep_storage_StorageManager StorageManager.cpp StorageManager.h
 add_library(quickstep_storage_SubBlockTypeRegistry SubBlockTypeRegistry.cpp SubBlockTypeRegistry.hpp)
 add_library(quickstep_storage_SubBlockTypeRegistryMacros ../empty_src.cpp SubBlockTypeRegistryMacros.hpp)
 add_library(quickstep_storage_SubBlocksReference ../empty_src.cpp SubBlocksReference.hpp)
+add_library(quickstep_storage_ThreadPrivateNumericHashTable
+            ThreadPrivateNumericHashTable.cpp
+            ThreadPrivateNumericHashTable.hpp)
 add_library(quickstep_storage_TupleIdSequence ../empty_src.cpp TupleIdSequence.hpp)
 add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp)
 add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
@@ -289,6 +292,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
                       quickstep_storage_SubBlocksReference
+                      quickstep_storage_ThreadPrivateNumericHashTable
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
@@ -724,6 +728,7 @@ target_link_libraries(quickstep_storage_HashTableFactory
                       quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_SeparateChainingHashTable
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
+                      quickstep_storage_ThreadPrivateNumericHashTable
                       quickstep_storage_TupleReference
                       quickstep_types_Type
                       quickstep_types_TypeFactory
@@ -1039,6 +1044,20 @@ target_link_libraries(quickstep_storage_SubBlockTypeRegistry
 target_link_libraries(quickstep_storage_SubBlocksReference
                       glog
                       quickstep_utility_PtrVector)
+target_link_libraries(quickstep_storage_ThreadPrivateNumericHashTable
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_ValueAccessorMultiplexer
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_Macros
+                      quickstep_utility_ScopedBuffer)
 target_link_libraries(quickstep_storage_TupleIdSequence
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_BitVector
@@ -1164,6 +1183,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_SubBlockTypeRegistry
                       quickstep_storage_SubBlockTypeRegistryMacros
                       quickstep_storage_SubBlocksReference
+                      quickstep_storage_ThreadPrivateNumericHashTable
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_TupleReference
                       quickstep_storage_TupleStorageSubBlock

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 490a5cc..221a221 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -70,6 +70,10 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
 
   ~CollisionFreeVectorTable() override;
 
+  HashTableImplType getImplType() const override {
+    return HashTableImplType::kCollisionFreeVector;
+  }
+
   void destroyPayload() override;
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index 6839ebc..80e363c 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -26,6 +26,7 @@ enum HashTableImplType {
   LINEAR_OPEN_ADDRESSING = 1;
   SEPARATE_CHAINING = 2;
   SIMPLE_SCALAR_SEPARATE_CHAINING = 3;
+  THREAD_PRIVATE_NUMERIC = 4;
 }
 
 // NOTE(chasseur): This proto describes the run-time parameters for a resizable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index 8be388a..c3cbddf 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -44,7 +44,8 @@ enum class HashTableImplType {
   kCollisionFreeVector,
   kLinearOpenAddressing,
   kSeparateChaining,
-  kSimpleScalarSeparateChaining
+  kSimpleScalarSeparateChaining,
+  kThreadPrivateNumeric
 };
 
 /**
@@ -117,6 +118,8 @@ class AggregationStateHashTableBase {
 
   virtual std::size_t getMemoryConsumptionBytes() const = 0;
 
+  virtual HashTableImplType getImplType() const = 0;
+
  protected:
   AggregationStateHashTableBase() {}
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 9686429..52f4d5f 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -32,6 +32,7 @@
 #include "storage/PackedPayloadHashTable.hpp"
 #include "storage/SeparateChainingHashTable.hpp"
 #include "storage/SimpleScalarSeparateChainingHashTable.hpp"
+#include "storage/ThreadPrivateNumericHashTable.hpp"
 #include "storage/TupleReference.hpp"
 #include "types/TypeFactory.hpp"
 #include "utility/BloomFilter.hpp"
@@ -123,6 +124,8 @@ inline HashTableImplType HashTableImplTypeFromProto(
       return HashTableImplType::kSeparateChaining;
     case serialization::HashTableImplType::SIMPLE_SCALAR_SEPARATE_CHAINING:
       return HashTableImplType::kSimpleScalarSeparateChaining;
+    case serialization::HashTableImplType::THREAD_PRIVATE_NUMERIC:
+      return HashTableImplType::kThreadPrivateNumeric;
     default: {
       LOG(FATAL) << "Unrecognized serialization::HashTableImplType\n";
     }
@@ -355,7 +358,6 @@ class AggregationStateHashTableFactory {
    *        hash table constructor.
    * @return A new aggregation state hash table.
    **/
-
   static AggregationStateHashTableBase* CreateResizable(
       const HashTableImplType hash_table_type,
       const std::vector<const Type*> &key_types,
@@ -363,13 +365,16 @@ class AggregationStateHashTableFactory {
       const std::vector<AggregationHandle *> &handles,
       StorageManager *storage_manager) {
     switch (hash_table_type) {
-      case HashTableImplType::kSeparateChaining:
-        return new PackedPayloadHashTable(
-            key_types, num_entries, handles, storage_manager);
       case HashTableImplType::kCollisionFreeVector:
         DCHECK_EQ(1u, key_types.size());
         return new CollisionFreeVectorTable(
             key_types.front(), num_entries, handles, storage_manager);
+      case HashTableImplType::kSeparateChaining:
+        return new PackedPayloadHashTable(
+            key_types, num_entries, handles, storage_manager);
+      case HashTableImplType::kThreadPrivateNumeric:
+        return new ThreadPrivateNumericHashTable(
+            key_types, num_entries, handles, storage_manager);
       default: {
         LOG(FATAL) << "Unrecognized HashTableImplType in "
                    << "AggregationStateHashTableFactory::createResizable()";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index 6dbd7f9..7257906 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -75,6 +75,10 @@ class HashTablePool {
         handles_(handles),
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
 
+  HashTableImplType getHashTableImplType() const {
+    return hash_table_impl_type_;
+  }
+
   /**
    * @brief Check out a hash table for insertion.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index 960d5a7..3e89aab 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -88,6 +88,10 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
 
   ~PackedPayloadHashTable() override;
 
+  HashTableImplType getImplType() const override {
+    return HashTableImplType::kSeparateChaining;
+  }
+
   /**
    * @brief Erase all entries in this hash table.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/ThreadPrivateNumericHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateNumericHashTable.cpp b/storage/ThreadPrivateNumericHashTable.cpp
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/ThreadPrivateNumericHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateNumericHashTable.hpp b/storage/ThreadPrivateNumericHashTable.hpp
new file mode 100644
index 0000000..2991900
--- /dev/null
+++ b/storage/ThreadPrivateNumericHashTable.hpp
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/Macros.hpp"
+#include "utility/ScopedBuffer.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ThreadPrivateNumericHashTable : public AggregationStateHashTableBase {
+ public:
+  ThreadPrivateNumericHashTable(
+      const std::vector<const Type *> &key_types,
+      const std::size_t num_entries,
+      const std::vector<AggregationHandle *> &handles,
+      StorageManager *storage_manager)
+      : key_types_(key_types),
+        handles_(handles),
+        bucket_size_(0),
+        num_buckets_(num_entries),
+        buckets_allocated_(0) {
+    for (const Type *key_type : key_types) {
+      DCHECK(!key_type->isVariableLength());
+
+      const std::size_t key_size = key_type->maximumByteLength();
+      DCHECK(key_size == 1u || key_size == 2u || key_size == 4u || key_size == 8u);
+
+      key_sizes_.emplace_back(key_size);
+    }
+
+    for (const AggregationHandle *handle : handles) {
+      state_offsets_.emplace_back(bucket_size_);
+
+      const std::vector<const Type*> arg_types = handle->getArgumentTypes();
+      DCHECK_LE(arg_types.size(), 1u);
+
+      std::size_t state_size = 0;
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          state_size = sizeof(std::int64_t);
+          break;
+        }
+        case AggregationID::kSum: {
+          DCHECK_EQ(1u, arg_types.size());
+          switch (arg_types.front()->getTypeID()) {
+            case TypeID::kInt:  // Fall through
+            case TypeID::kLong:
+              state_size = sizeof(std::int64_t);
+              break;
+            case TypeID::kFloat:  // Fall through
+            case TypeID::kDouble:
+              state_size = sizeof(double);
+              break;
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+      bucket_size_ += state_size;
+    }
+
+    keys_.reset(sizeof(std::uint64_t) * num_buckets_);
+    buckets_.reset(bucket_size_ * num_buckets_);
+  }
+
+  ~ThreadPrivateNumericHashTable() override {}
+
+  HashTableImplType getImplType() const override {
+    return HashTableImplType::kThreadPrivateNumeric;
+  }
+
+  void destroyPayload() override {}
+
+  std::size_t getMemoryConsumptionBytes() const override {
+    return num_buckets_ * (bucket_size_ + sizeof(std::uint64_t));
+  }
+
+  inline std::size_t numEntries() const {
+    return buckets_allocated_;
+  }
+
+  bool upsertValueAccessorCompositeKey(
+      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_attr_ids,
+      const ValueAccessorMultiplexer &accessor_mux) override {
+    ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+    ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
+
+    DCHECK(base_accessor != nullptr);
+    const std::size_t num_tuples = base_accessor->getNumTuplesVirtual();
+
+    ScopedBuffer buffer(sizeof(std::uint64_t) * num_tuples);
+    std::uint64_t *key_codes = static_cast<std::uint64_t*>(buffer.get());
+    std::size_t key_code_offset = 0;
+    for (std::size_t i = 0; i < key_attr_ids.size(); ++i) {
+      const auto &key_attr_id = key_attr_ids[i];
+      ValueAccessor *accessor =
+          key_attr_id.source == ValueAccessorSource::kBase
+              ? base_accessor
+              : derived_accessor;
+      DCHECK(accessor != nullptr);
+
+      const std::size_t key_size = key_sizes_[i];
+      switch (key_size) {
+        case 1u:
+          ConstructKeyCode<std::uint8_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        case 2u:
+          ConstructKeyCode<std::uint16_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        case 4u:
+          ConstructKeyCode<std::uint32_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        case 8u:
+          ConstructKeyCode<std::uint64_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+
+      key_code_offset += key_size;
+    }
+
+    std::vector<BucketIndex> bucket_indices;
+    bucket_indices.reserve(num_tuples);
+    std::uint64_t *keys = static_cast<std::uint64_t*>(keys_.get());
+    for (std::size_t i = 0; i < num_tuples; ++i) {
+      const std::size_t code = key_codes[i];
+      const auto index_it = index_.find(code);
+      if (index_it == index_.end()) {
+        // TODO: Resize if overflow
+        index_.emplace(code, buckets_allocated_);
+        bucket_indices.emplace_back(buckets_allocated_);
+        keys[buckets_allocated_] = code;
+        ++buckets_allocated_;
+      } else {
+        bucket_indices.emplace_back(index_it->second);
+      }
+    }
+
+    // Dispatch
+    for (std::size_t i = 0; i < handles_.size(); ++i) {
+      const AggregationHandle *handle = handles_[i];
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          upsertValueAccessorCount(bucket_indices, state_offsets_[i]);
+          break;
+        }
+        case AggregationID::kSum: {
+          DCHECK_EQ(1u, argument_ids[i].size());
+          const auto &argument_id = argument_ids[i].front();
+          ValueAccessor *accessor =
+              argument_id.source == ValueAccessorSource::kBase
+                  ? base_accessor
+                  : derived_accessor;
+          DCHECK(accessor != nullptr);
+
+          DCHECK_EQ(1u, handle->getArgumentTypes().size());
+          const Type *argument_type = handle->getArgumentTypes().front();
+          switch (argument_type->getTypeID()) {
+            case kInt: {
+              upsertValueAccessorSum<int, std::int64_t>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+              break;
+            }
+            case kLong: {
+              upsertValueAccessorSum<std::int64_t, std::int64_t>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+              break;
+            }
+            case kFloat: {
+              upsertValueAccessorSum<float, double>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+              break;
+            }
+            case kDouble: {
+              upsertValueAccessorSum<double, double>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+              break;
+            }
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+    }
+
+    return true;
+  }
+
+  void merge(const ThreadPrivateNumericHashTable *other) {
+    std::vector<BucketIndex> dst_bucket_indices;
+    std::uint64_t *dst_keys = static_cast<std::uint64_t*>(keys_.get());
+
+    const char *src_buckets_start =
+        static_cast<const char*>(other->buckets_.get());
+    const std::uint64_t *src_keys =
+        static_cast<const std::uint64_t*>(other->keys_.get());
+
+    for (std::size_t i = 0; i < other->buckets_allocated_; ++i) {
+      const std::uint64_t code = src_keys[i];
+      const auto index_it = index_.find(code);
+
+      if (index_it == index_.end()) {
+        // TODO: Resize if overflow
+        index_.emplace(code, buckets_allocated_);
+        dst_bucket_indices.emplace_back(buckets_allocated_);
+        dst_keys[buckets_allocated_] = code;
+        ++buckets_allocated_;
+      } else {
+        dst_bucket_indices.emplace_back(index_it->second);
+      }
+    }
+
+    // Dispatch
+    for (std::size_t i = 0; i < handles_.size(); ++i) {
+      const AggregationHandle *handle = handles_[i];
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          mergeStateSum<std::int64_t>(
+              dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+          break;
+        }
+        case AggregationID::kSum: {
+          const Type *argument_type = handle->getArgumentTypes().front();
+          switch (argument_type->getTypeID()) {
+            case kInt:  // Fall through
+            case kLong: {
+              mergeStateSum<std::int64_t>(
+                  dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+              break;
+            }
+            case kFloat:  // Fall through
+            case kDouble: {
+              mergeStateSum<double>(
+                  dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+              break;
+            }
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+    }
+  }
+
+  void print() const {
+    std::cout << "num_entries = " << buckets_allocated_ << "\n";
+    const double *values = static_cast<const double*>(buckets_.get());
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      std::cout << values[i] << "\n";
+    }
+  }
+
+  void finalize(ColumnVectorsValueAccessor *output) const {
+    std::size_t key_offset = 0;
+    for (std::size_t i = 0; i < key_types_.size(); ++i) {
+      const Type &key_type = *key_types_[i];
+      std::unique_ptr<NativeColumnVector> native_cv(
+          new NativeColumnVector(key_type, buckets_allocated_));
+
+      const std::size_t key_size = key_sizes_[i];
+      switch (key_size) {
+        case 1u:
+          finalizeKey<std::uint8_t>(key_offset, native_cv.get());
+          break;
+        case 2u:
+          finalizeKey<std::uint16_t>(key_offset, native_cv.get());
+          break;
+        case 4u:
+          finalizeKey<std::uint32_t>(key_offset, native_cv.get());
+          break;
+        case 8u:
+          finalizeKey<std::uint64_t>(key_offset, native_cv.get());
+          break;
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+      output->addColumn(native_cv.release());
+
+      key_offset += key_size;
+    }
+
+    // Dispatch
+    for (std::size_t i = 0; i < handles_.size(); ++i) {
+      const AggregationHandle *handle = handles_[i];
+      const Type &result_type = *handle->getResultType();
+      std::unique_ptr<NativeColumnVector> native_cv(
+          new NativeColumnVector(result_type, buckets_allocated_));
+
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          finalizeStateSum<std::int64_t, std::int64_t>(
+              state_offsets_[i], native_cv.get());
+          break;
+        }
+        case AggregationID::kSum: {
+          const Type *argument_type = handle->getArgumentTypes().front();
+          switch (argument_type->getTypeID()) {
+            case kInt:  // Fall through
+            case kLong: {
+              finalizeStateSum<std::int64_t, std::int64_t>(
+                  state_offsets_[i], native_cv.get());
+              break;
+            }
+            case kFloat:  // Fall through
+            case kDouble: {
+              finalizeStateSum<double, double>(
+                  state_offsets_[i], native_cv.get());
+              break;
+            }
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+      output->addColumn(native_cv.release());
+    }
+  }
+
+ private:
+  using BucketIndex = std::uint32_t;
+
+  template <typename KeyT>
+  inline static void ConstructKeyCode(const std::size_t offset,
+                                      const attribute_id attr_id,
+                                      ValueAccessor *accessor,
+                                      void *key_code_start) {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      char *key_code_ptr = static_cast<char*>(key_code_start) + offset;
+      accessor->beginIteration();
+      while (accessor->next()) {
+        *reinterpret_cast<KeyT*>(key_code_ptr) =
+            *static_cast<const KeyT*>(
+                accessor->template getUntypedValue<false>(attr_id));
+        key_code_ptr += sizeof(std::uint64_t);
+      }
+    });
+  }
+
+  inline void upsertValueAccessorCount(const std::vector<BucketIndex> &bucket_indices,
+                                       const std::size_t state_offset) {
+    char *state_start = static_cast<char*>(buckets_.get()) + state_offset;
+    for (const BucketIndex idx : bucket_indices) {
+      char *state_ptr = state_start + bucket_size_ * idx;
+      *reinterpret_cast<std::int64_t*>(state_ptr) += 1;
+    }
+  }
+
+  template <typename ArgumentT, typename StateT>
+  inline void upsertValueAccessorSum(const std::vector<BucketIndex> &bucket_indices,
+                                     const std::size_t state_offset,
+                                     const attribute_id attr_id,
+                                     ValueAccessor *accessor) {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      accessor->beginIteration();
+
+      char *state_start = static_cast<char*>(buckets_.get()) + state_offset;
+      std::size_t loc = 0;
+      while (accessor->next()) {
+        char *state_ptr = state_start + bucket_size_ * bucket_indices[loc];
+        *reinterpret_cast<StateT*>(state_ptr) +=
+            *static_cast<const ArgumentT*>(
+                accessor->template getUntypedValue<false>(attr_id));
+        ++loc;
+      }
+    });
+  }
+
+  template <typename StateT>
+  inline void mergeStateSum(const std::vector<BucketIndex> &dst_bucket_indices,
+                            const void *src_buckets_start,
+                            const std::size_t state_offset) {
+    char *dst_state_start = static_cast<char*>(buckets_.get()) + state_offset;
+    const char* src_state_start =
+        static_cast<const char*>(src_buckets_start) + state_offset;
+    for (std::size_t i = 0; i < dst_bucket_indices.size(); ++i) {
+      char *dst_state_ptr = dst_state_start + bucket_size_ * dst_bucket_indices[i];
+      const char *src_state_ptr = src_state_start + bucket_size_ * i;
+      *reinterpret_cast<StateT*>(dst_state_ptr) +=
+          *reinterpret_cast<const StateT*>(src_state_ptr);
+    }
+  }
+
+  template <typename KeyT>
+  inline void finalizeKey(const std::size_t offset,
+                          NativeColumnVector *output_cv) const {
+    const char *key_ptr = static_cast<const char*>(keys_.get()) + offset;
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      *static_cast<KeyT*>(output_cv->getPtrForDirectWrite()) =
+          *reinterpret_cast<const KeyT*>(key_ptr);
+      key_ptr += sizeof(std::uint64_t);
+    }
+  }
+
+  template <typename StateT, typename ResultT>
+  inline void finalizeStateSum(const std::size_t state_offset,
+                               NativeColumnVector *output_cv) const {
+    char *state_ptr = static_cast<char*>(buckets_.get()) + state_offset;
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      *static_cast<ResultT*>(output_cv->getPtrForDirectWrite()) =
+          *reinterpret_cast<const StateT*>(state_ptr);
+      state_ptr += bucket_size_;
+    }
+  }
+
+  const std::vector<const Type*> key_types_;
+  const std::vector<AggregationHandle *> handles_;
+
+  std::vector<std::size_t> key_sizes_;
+  std::vector<std::size_t> state_offsets_;
+  std::size_t bucket_size_;
+
+  std::unordered_map<std::uint64_t, BucketIndex> index_;
+
+  std::size_t num_buckets_;
+  std::size_t buckets_allocated_;
+
+  ScopedBuffer keys_;
+  ScopedBuffer buckets_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPrivateNumericHashTable);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_