You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/20 20:30:35 UTC

[7/8] incubator-quickstep git commit: Initial commit for QUICKSTEP-28 and QUICKSTEP-29.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 7e38473..5fb9f44 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
@@ -55,25 +56,24 @@ class AggregationStateMax : public AggregationState {
   /**
    * @brief Copy constructor (ignores mutex).
    */
-  AggregationStateMax(const AggregationStateMax &orig)
-      : max_(orig.max_) {
-  }
+  AggregationStateMax(const AggregationStateMax &orig) : max_(orig.max_) {}
 
   /**
    * @brief Destructor.
    */
-  ~AggregationStateMax() override {};
+  ~AggregationStateMax() override{};
+
+  const std::uint8_t* getPayloadAddress() const {
+    return reinterpret_cast<const uint8_t *>(&max_);
+  }
 
  private:
   friend class AggregationHandleMax;
 
   explicit AggregationStateMax(const Type &type)
-      : max_(type.getNullableVersion().makeNullValue()) {
-  }
+      : max_(type.getNullableVersion().makeNullValue()) {}
 
-  explicit AggregationStateMax(TypedValue &&value)
-      : max_(std::move(value)) {
-  }
+  explicit AggregationStateMax(TypedValue &&value) : max_(std::move(value)) {}
 
   TypedValue max_;
   SpinMutex mutex_;
@@ -84,8 +84,7 @@ class AggregationStateMax : public AggregationState {
  **/
 class AggregationHandleMax : public AggregationConcreteHandle {
  public:
-  ~AggregationHandleMax() override {
-  }
+  ~AggregationHandleMax() override {}
 
   AggregationState* createInitialState() const override {
     return new AggregationStateMax(type_);
@@ -93,20 +92,46 @@ class AggregationHandleMax : public AggregationConcreteHandle {
 
   AggregationStateHashTableBase* createGroupByHashTable(
       const HashTableImplType hash_table_impl,
-      const std::vector<const Type*> &group_by_types,
+      const std::vector<const Type *> &group_by_types,
       const std::size_t estimated_num_groups,
       StorageManager *storage_manager) const override;
 
   /**
    * @brief Iterate with max aggregation state.
    */
-  inline void iterateUnaryInl(AggregationStateMax *state, const TypedValue &value) const {
+  inline void iterateUnaryInl(AggregationStateMax *state,
+                              const TypedValue &value) const {
     DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
-    compareAndUpdate(static_cast<AggregationStateMax*>(state), value);
+    compareAndUpdate(static_cast<AggregationStateMax *>(state), value);
+  }
+
+  inline void iterateUnaryInlFast(const TypedValue &value,
+                                  std::uint8_t *byte_ptr) const {
+    DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
+    TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+    compareAndUpdateFast(max_ptr, value);
+  }
+
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    if (!block_update_) {
+      iterateUnaryInlFast(argument, byte_ptr);
+    }
+  }
+
+  void blockUpdate() override { block_update_ = true; }
+
+  void allowUpdate() override { block_update_ = false; }
+
+  void initPayload(std::uint8_t *byte_ptr) const override {
+    TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+    TypedValue t1 = (type_.getNullableVersion().makeNullValue());
+    *max_ptr = t1;
   }
 
   AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
+      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+      const override;
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   AggregationState* accumulateValueAccessor(
@@ -123,37 +148,49 @@ class AggregationHandleMax : public AggregationConcreteHandle {
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
+  void mergeStatesFast(const std::uint8_t *source,
+                       std::uint8_t *destination) const override;
+
   TypedValue finalize(const AggregationState &state) const override {
-    return TypedValue(static_cast<const AggregationStateMax&>(state).max_);
+    return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
+  }
+
+  inline TypedValue finalizeHashTableEntry(
+      const AggregationState &state) const {
+    return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
   }
 
-  inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
-    return TypedValue(static_cast<const AggregationStateMax&>(state).max_);
+  inline TypedValue finalizeHashTableEntryFast(
+      const std::uint8_t *byte_ptr) const {
+    const TypedValue *max_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
+    return TypedValue(*max_ptr);
   }
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const override;
 
   /**
-   * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
+   * @brief Implementation of
+   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
    *        for MAX aggregation.
    */
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table) const override;
-
+      const AggregationStateHashTableBase &distinctify_hash_table)
+      const override;
 
   /**
-   * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
+   * @brief Implementation of
+   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
    *        for MAX aggregation.
    */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const override;
+      AggregationStateHashTableBase *aggregation_hash_table,
+      std::size_t index) const override;
 
-  void mergeGroupByHashTables(
-      const AggregationStateHashTableBase &source_hash_table,
-      AggregationStateHashTableBase *destination_hash_table) const override;
+  std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
 
  private:
   friend class AggregateFunctionMax;
@@ -166,24 +203,37 @@ class AggregationHandleMax : public AggregationConcreteHandle {
   explicit AggregationHandleMax(const Type &type);
 
   /**
-   * @brief compare the value with max_ and update it if the value is larger than
-   *        current maximum. NULLs are ignored.
+   * @brief compare the value with max_ and update it if the value is larger
+   *        than current maximum. NULLs are ignored.
    *
    * @param value A TypedValue to compare
    **/
-  inline void compareAndUpdate(AggregationStateMax *state, const TypedValue &value) const {
+  inline void compareAndUpdate(AggregationStateMax *state,
+                               const TypedValue &value) const {
     // TODO(chasseur): Avoid null-checks when aggregating a non-nullable Type.
     if (value.isNull()) return;
 
     SpinMutexLock lock(state->mutex_);
-    if (state->max_.isNull() || fast_comparator_->compareTypedValues(value, state->max_)) {
+    if (state->max_.isNull() ||
+        fast_comparator_->compareTypedValues(value, state->max_)) {
       state->max_ = value;
     }
   }
 
+  inline void compareAndUpdateFast(TypedValue *max_ptr,
+                                   const TypedValue &value) const {
+    if (value.isNull()) return;
+    if (max_ptr->isNull() ||
+        fast_comparator_->compareTypedValues(value, *max_ptr)) {
+      *max_ptr = value;
+    }
+  }
+
   const Type &type_;
   std::unique_ptr<UncheckedComparator> fast_comparator_;
 
+  bool block_update_;
+
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index e860d8d..a07f299 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -39,22 +39,19 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleMin::AggregationHandleMin(const Type &type)
-    : type_(type) {
-  fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kLess)
-                         .makeUncheckedComparatorForTypes(type,
-                                                          type.getNonNullableVersion()));
+    : type_(type), block_update_(false) {
+  fast_comparator_.reset(
+      ComparisonFactory::GetComparison(ComparisonID::kLess)
+          .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
 }
 
 AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable(
     const HashTableImplType hash_table_impl,
-    const std::vector<const Type*> &group_by_types,
+    const std::vector<const Type *> &group_by_types,
     const std::size_t estimated_num_groups,
     StorageManager *storage_manager) const {
   return AggregationStateHashTableFactory<AggregationStateMin>::CreateResizable(
-      hash_table_impl,
-      group_by_types,
-      estimated_num_groups,
-      storage_manager);
+      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
 }
 
 AggregationState* AggregationHandleMin::accumulateColumnVectors(
@@ -62,9 +59,8 @@ AggregationState* AggregationHandleMin::accumulateColumnVectors(
   DCHECK_EQ(1u, column_vectors.size())
       << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size();
 
-  return new AggregationStateMin(
-      fast_comparator_->accumulateColumnVector(type_.getNullableVersion().makeNullValue(),
-                                               *column_vectors.front()));
+  return new AggregationStateMin(fast_comparator_->accumulateColumnVector(
+      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
 }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -74,10 +70,10 @@ AggregationState* AggregationHandleMin::accumulateValueAccessor(
   DCHECK_EQ(1u, accessor_ids.size())
       << "Got wrong number of attributes for MIN: " << accessor_ids.size();
 
-  return new AggregationStateMin(
-      fast_comparator_->accumulateValueAccessor(type_.getNullableVersion().makeNullValue(),
-                                                accessor,
-                                                accessor_ids.front()));
+  return new AggregationStateMin(fast_comparator_->accumulateValueAccessor(
+      type_.getNullableVersion().makeNullValue(),
+      accessor,
+      accessor_ids.front()));
 }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
@@ -88,66 +84,55 @@ void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
     AggregationStateHashTableBase *hash_table) const {
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of arguments for MIN: " << argument_ids.size();
-
-  aggregateValueAccessorIntoHashTableUnaryHelper<
-      AggregationHandleMin,
-      AggregationStateMin,
-      AggregationStateHashTable<AggregationStateMin>>(
-          accessor,
-          argument_ids.front(),
-          group_by_key_ids,
-          AggregationStateMin(type_),
-          hash_table);
 }
 
-void AggregationHandleMin::mergeStates(
-    const AggregationState &source,
-    AggregationState *destination) const {
-  const AggregationStateMin &min_source = static_cast<const AggregationStateMin&>(source);
-  AggregationStateMin *min_destination = static_cast<AggregationStateMin*>(destination);
+void AggregationHandleMin::mergeStates(const AggregationState &source,
+                                       AggregationState *destination) const {
+  const AggregationStateMin &min_source =
+      static_cast<const AggregationStateMin &>(source);
+  AggregationStateMin *min_destination =
+      static_cast<AggregationStateMin *>(destination);
 
   if (!min_source.min_.isNull()) {
     compareAndUpdate(min_destination, min_source.min_);
   }
 }
 
+void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source,
+                                           std::uint8_t *destination) const {
+  const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source);
+  TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination);
+
+  if (!(src_min_ptr->isNull())) {
+    compareAndUpdateFast(dst_min_ptr, *src_min_ptr);
+  }
+}
+
 ColumnVector* AggregationHandleMin::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys) const {
-  return finalizeHashTableHelper<AggregationHandleMin,
-                                 AggregationStateHashTable<AggregationStateMin>>(
-      type_.getNonNullableVersion(),
-      hash_table,
-      group_by_keys);
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
+  return finalizeHashTableHelperFast<AggregationHandleMin,
+                                     AggregationStateFastHashTable>(
+      type_.getNonNullableVersion(), hash_table, group_by_keys, index);
 }
 
-AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
+AggregationState*
+AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
       AggregationHandleMin,
-      AggregationStateMin>(
-          distinctify_hash_table);
+      AggregationStateMin>(distinctify_hash_table);
 }
 
 void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+    AggregationStateHashTableBase *aggregation_hash_table,
+    std::size_t index) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
       AggregationHandleMin,
-      AggregationStateMin,
-      AggregationStateHashTable<AggregationStateMin>>(
-          distinctify_hash_table,
-          AggregationStateMin(type_),
-          aggregation_hash_table);
-}
-
-void AggregationHandleMin::mergeGroupByHashTables(
-    const AggregationStateHashTableBase &source_hash_table,
-    AggregationStateHashTableBase *destination_hash_table) const {
-  mergeGroupByHashTablesHelper<AggregationHandleMin,
-                               AggregationStateMin,
-                               AggregationStateHashTable<AggregationStateMin>>(
-      source_hash_table, destination_hash_table);
+      AggregationStateFastHashTable>(
+      distinctify_hash_table, aggregation_hash_table, index);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 924698c..173911d 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
@@ -55,24 +56,26 @@ class AggregationStateMin : public AggregationState {
   /**
    * @brief Copy constructor (ignores mutex).
    */
-  AggregationStateMin(const AggregationStateMin &orig)
-      : min_(orig.min_) {
-  }
+  AggregationStateMin(const AggregationStateMin &orig) : min_(orig.min_) {}
 
   /**
    * @brief Destructor.
    */
   ~AggregationStateMin() override {}
 
+  std::size_t getPayloadSize() const { return sizeof(TypedValue); }
+
+  const std::uint8_t *getPayloadAddress() const {
+    return reinterpret_cast<const uint8_t *>(&min_);
+  }
+
  private:
   friend class AggregationHandleMin;
 
   explicit AggregationStateMin(const Type &type)
       : min_(type.getNullableVersion().makeNullValue()) {}
 
-  explicit AggregationStateMin(TypedValue &&value)
-      : min_(std::move(value)) {
-  }
+  explicit AggregationStateMin(TypedValue &&value) : min_(std::move(value)) {}
 
   TypedValue min_;
   SpinMutex mutex_;
@@ -83,8 +86,7 @@ class AggregationStateMin : public AggregationState {
  **/
 class AggregationHandleMin : public AggregationConcreteHandle {
  public:
-  ~AggregationHandleMin() override {
-  }
+  ~AggregationHandleMin() override {}
 
   AggregationState* createInitialState() const override {
     return new AggregationStateMin(type_);
@@ -92,20 +94,46 @@ class AggregationHandleMin : public AggregationConcreteHandle {
 
   AggregationStateHashTableBase* createGroupByHashTable(
       const HashTableImplType hash_table_impl,
-      const std::vector<const Type*> &group_by_types,
+      const std::vector<const Type *> &group_by_types,
       const std::size_t estimated_num_groups,
       StorageManager *storage_manager) const override;
 
   /**
    * @brief Iterate with min aggregation state.
    */
-  inline void iterateUnaryInl(AggregationStateMin *state, const TypedValue &value) const {
+  inline void iterateUnaryInl(AggregationStateMin *state,
+                              const TypedValue &value) const {
     DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
     compareAndUpdate(state, value);
   }
 
+  inline void iterateUnaryInlFast(const TypedValue &value,
+                                  std::uint8_t *byte_ptr) const {
+    DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
+    TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+    compareAndUpdateFast(min_ptr, value);
+  }
+
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    if (!block_update_) {
+      iterateUnaryInlFast(argument, byte_ptr);
+    }
+  }
+
+  void blockUpdate() override { block_update_ = true; }
+
+  void allowUpdate() override { block_update_ = false; }
+
+  void initPayload(std::uint8_t *byte_ptr) const override {
+    TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+    TypedValue t1 = (type_.getNullableVersion().makeNullValue());
+    *min_ptr = t1;
+  }
+
   AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
+      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+      const override;
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   AggregationState* accumulateValueAccessor(
@@ -122,36 +150,49 @@ class AggregationHandleMin : public AggregationConcreteHandle {
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
+  void mergeStatesFast(const std::uint8_t *source,
+                       std::uint8_t *destination) const override;
+
   TypedValue finalize(const AggregationState &state) const override {
-    return static_cast<const AggregationStateMin&>(state).min_;
+    return static_cast<const AggregationStateMin &>(state).min_;
+  }
+
+  inline TypedValue finalizeHashTableEntry(
+      const AggregationState &state) const {
+    return static_cast<const AggregationStateMin &>(state).min_;
   }
 
-  inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
-    return static_cast<const AggregationStateMin&>(state).min_;
+  inline TypedValue finalizeHashTableEntryFast(
+      const std::uint8_t *byte_ptr) const {
+    const TypedValue *min_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
+    return TypedValue(*min_ptr);
   }
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const override;
 
   /**
-   * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
+   * @brief Implementation of
+   * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
    *        for MIN aggregation.
    */
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table) const override;
+      const AggregationStateHashTableBase &distinctify_hash_table)
+      const override;
 
   /**
-   * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
+   * @brief Implementation of
+   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
    *        for MIN aggregation.
    */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const override;
+      AggregationStateHashTableBase *aggregation_hash_table,
+      std::size_t index) const override;
 
-  void mergeGroupByHashTables(
-      const AggregationStateHashTableBase &source_hash_table,
-      AggregationStateHashTableBase *destination_hash_table) const override;
+  std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
 
  private:
   friend class AggregateFunctionMin;
@@ -164,23 +205,36 @@ class AggregationHandleMin : public AggregationConcreteHandle {
   explicit AggregationHandleMin(const Type &type);
 
   /**
-   * @brief compare the value with min_ and update it if the value is smaller than
-   *        current minimum. NULLs are ignored.
+   * @brief compare the value with min_ and update it if the value is smaller
+   *        than current minimum. NULLs are ignored.
    *
    * @param value A TypedValue to compare.
    **/
-  inline void compareAndUpdate(AggregationStateMin *state, const TypedValue &value) const {
+  inline void compareAndUpdate(AggregationStateMin *state,
+                               const TypedValue &value) const {
     if (value.isNull()) return;
 
     SpinMutexLock lock(state->mutex_);
-    if (state->min_.isNull() || fast_comparator_->compareTypedValues(value, state->min_)) {
+    if (state->min_.isNull() ||
+        fast_comparator_->compareTypedValues(value, state->min_)) {
       state->min_ = value;
     }
   }
 
+  inline void compareAndUpdateFast(TypedValue *min_ptr,
+                                   const TypedValue &value) const {
+    if (value.isNull()) return;
+    if (min_ptr->isNull() ||
+        fast_comparator_->compareTypedValues(value, *min_ptr)) {
+      *min_ptr = value;
+    }
+  }
+
   const Type &type_;
   std::unique_ptr<UncheckedComparator> fast_comparator_;
 
+  bool block_update_;
+
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index b5036a8..642d88d 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -43,7 +43,7 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleSum::AggregationHandleSum(const Type &type)
-    : argument_type_(type) {
+    : argument_type_(type), block_update_(false) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_precision_id;
@@ -66,11 +66,13 @@ AggregationHandleSum::AggregationHandleSum(const Type &type)
 
   // Make operators to do arithmetic:
   // Add operator for summing argument values.
-  fast_operator_.reset(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
-                       .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_));
+  fast_operator_.reset(
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+          .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_));
   // Add operator for merging states.
-  merge_operator_.reset(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
-                        .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
+  merge_operator_.reset(
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+          .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type));
 
   // Result is nullable, because SUM() over 0 values (or all NULL values) is
   // NULL.
@@ -79,26 +81,20 @@ AggregationHandleSum::AggregationHandleSum(const Type &type)
 
 AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable(
     const HashTableImplType hash_table_impl,
-    const std::vector<const Type*> &group_by_types,
+    const std::vector<const Type *> &group_by_types,
     const std::size_t estimated_num_groups,
     StorageManager *storage_manager) const {
   return AggregationStateHashTableFactory<AggregationStateSum>::CreateResizable(
-      hash_table_impl,
-      group_by_types,
-      estimated_num_groups,
-      storage_manager);
+      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
 }
 
 AggregationState* AggregationHandleSum::accumulateColumnVectors(
     const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
   DCHECK_EQ(1u, column_vectors.size())
       << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size();
-
   std::size_t num_tuples = 0;
   TypedValue cv_sum = fast_operator_->accumulateColumnVector(
-      blank_state_.sum_,
-      *column_vectors.front(),
-      &num_tuples);
+      blank_state_.sum_, *column_vectors.front(), &num_tuples);
   return new AggregationStateSum(std::move(cv_sum), num_tuples == 0);
 }
 
@@ -111,10 +107,7 @@ AggregationState* AggregationHandleSum::accumulateValueAccessor(
 
   std::size_t num_tuples = 0;
   TypedValue va_sum = fast_operator_->accumulateValueAccessor(
-      blank_state_.sum_,
-      accessor,
-      accessor_ids.front(),
-      &num_tuples);
+      blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples);
   return new AggregationStateSum(std::move(va_sum), num_tuples == 0);
 }
 #endif
@@ -126,32 +119,39 @@ void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
     AggregationStateHashTableBase *hash_table) const {
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of arguments for SUM: " << argument_ids.size();
-
-  aggregateValueAccessorIntoHashTableUnaryHelper<
-      AggregationHandleSum,
-      AggregationStateSum,
-      AggregationStateHashTable<AggregationStateSum>>(
-          accessor,
-          argument_ids.front(),
-          group_by_key_ids,
-          blank_state_,
-          hash_table);
 }
 
-void AggregationHandleSum::mergeStates(
-    const AggregationState &source,
-    AggregationState *destination) const {
-  const AggregationStateSum &sum_source = static_cast<const AggregationStateSum&>(source);
-  AggregationStateSum *sum_destination = static_cast<AggregationStateSum*>(destination);
+void AggregationHandleSum::mergeStates(const AggregationState &source,
+                                       AggregationState *destination) const {
+  const AggregationStateSum &sum_source =
+      static_cast<const AggregationStateSum &>(source);
+  AggregationStateSum *sum_destination =
+      static_cast<AggregationStateSum *>(destination);
 
   SpinMutexLock lock(sum_destination->mutex_);
-  sum_destination->sum_ = merge_operator_->applyToTypedValues(sum_destination->sum_,
-                                                              sum_source.sum_);
+  sum_destination->sum_ = merge_operator_->applyToTypedValues(
+      sum_destination->sum_, sum_source.sum_);
   sum_destination->null_ = sum_destination->null_ && sum_source.null_;
 }
 
+void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source,
+                                           std::uint8_t *destination) const {
+  const TypedValue *src_sum_ptr =
+      reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
+  const bool *src_null_ptr =
+      reinterpret_cast<const bool *>(source + blank_state_.null_offset_);
+  TypedValue *dst_sum_ptr =
+      reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_);
+  bool *dst_null_ptr =
+      reinterpret_cast<bool *>(destination + blank_state_.null_offset_);
+  *dst_sum_ptr =
+      merge_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
+  *dst_null_ptr = (*dst_null_ptr) && (*src_null_ptr);
+}
+
 TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
-  const AggregationStateSum &agg_state = static_cast<const AggregationStateSum&>(state);
+  const AggregationStateSum &agg_state =
+      static_cast<const AggregationStateSum &>(state);
   if (agg_state.null_) {
     // SUM() over no values is NULL.
     return result_type_->makeNullValue();
@@ -162,41 +162,29 @@ TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
 
 ColumnVector* AggregationHandleSum::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys) const {
-  return finalizeHashTableHelper<AggregationHandleSum,
-                                 AggregationStateHashTable<AggregationStateSum>>(
-      *result_type_,
-      hash_table,
-      group_by_keys);
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
+  return finalizeHashTableHelperFast<AggregationHandleSum,
+                                     AggregationStateFastHashTable>(
+      *result_type_, hash_table, group_by_keys, index);
 }
 
-AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
+AggregationState*
+AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
       AggregationHandleSum,
-      AggregationStateSum>(
-          distinctify_hash_table);
+      AggregationStateSum>(distinctify_hash_table);
 }
 
 void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+    AggregationStateHashTableBase *aggregation_hash_table,
+    std::size_t index) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
       AggregationHandleSum,
-      AggregationStateSum,
-      AggregationStateHashTable<AggregationStateSum>>(
-          distinctify_hash_table,
-          blank_state_,
-          aggregation_hash_table);
-}
-
-void AggregationHandleSum::mergeGroupByHashTables(
-    const AggregationStateHashTableBase &source_hash_table,
-    AggregationStateHashTableBase *destination_hash_table) const {
-  mergeGroupByHashTablesHelper<AggregationHandleSum,
-                               AggregationStateSum,
-                               AggregationStateHashTable<AggregationStateSum>>(
-      source_hash_table, destination_hash_table);
+      AggregationStateFastHashTable>(
+      distinctify_hash_table, aggregation_hash_table, index);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index 3382646..6c334a6 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
@@ -57,25 +58,40 @@ class AggregationStateSum : public AggregationState {
    */
   AggregationStateSum(const AggregationStateSum &orig)
       : sum_(orig.sum_),
-        null_(orig.null_) {
+        null_(orig.null_),
+        sum_offset_(orig.sum_offset_),
+        null_offset_(orig.null_offset_) {}
+
+  std::size_t getPayloadSize() const {
+    std::size_t p1 = reinterpret_cast<std::size_t>(&sum_);
+    std::size_t p2 = reinterpret_cast<std::size_t>(&mutex_);
+    return (p2 - p1);
+  }
+
+  const std::uint8_t* getPayloadAddress() const {
+    return reinterpret_cast<const uint8_t *>(&sum_);
   }
 
  private:
   friend class AggregationHandleSum;
 
   AggregationStateSum()
-      : sum_(0), null_(true) {
-  }
+      : sum_(0),
+        null_(true),
+        sum_offset_(0),
+        null_offset_(reinterpret_cast<std::uint8_t *>(&null_) -
+                     reinterpret_cast<std::uint8_t *>(&sum_)) {}
 
   AggregationStateSum(TypedValue &&sum, const bool is_null)
-      : sum_(std::move(sum)), null_(is_null) {
-  }
+      : sum_(std::move(sum)), null_(is_null) {}
 
   // TODO(shoban): We might want to specialize sum_ to use atomics for int types
   // similar to in AggregationStateCount.
   TypedValue sum_;
   bool null_;
   SpinMutex mutex_;
+
+  int sum_offset_, null_offset_;
 };
 
 /**
@@ -83,8 +99,7 @@ class AggregationStateSum : public AggregationState {
  **/
 class AggregationHandleSum : public AggregationConcreteHandle {
  public:
-  ~AggregationHandleSum() override {
-  }
+  ~AggregationHandleSum() override {}
 
   AggregationState* createInitialState() const override {
     return new AggregationStateSum(blank_state_);
@@ -92,11 +107,12 @@ class AggregationHandleSum : public AggregationConcreteHandle {
 
   AggregationStateHashTableBase* createGroupByHashTable(
       const HashTableImplType hash_table_impl,
-      const std::vector<const Type*> &group_by_types,
+      const std::vector<const Type *> &group_by_types,
       const std::size_t estimated_num_groups,
       StorageManager *storage_manager) const override;
 
-  inline void iterateUnaryInl(AggregationStateSum *state, const TypedValue &value) const {
+  inline void iterateUnaryInl(AggregationStateSum *state,
+                              const TypedValue &value) const {
     DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
     if (value.isNull()) return;
 
@@ -105,8 +121,41 @@ class AggregationHandleSum : public AggregationConcreteHandle {
     state->null_ = false;
   }
 
+  inline void iterateUnaryInlFast(const TypedValue &value,
+                                  std::uint8_t *byte_ptr) const {
+    DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
+    if (value.isNull()) return;
+    TypedValue *sum_ptr =
+        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+    bool *null_ptr =
+        reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
+    *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value);
+    *null_ptr = false;
+  }
+
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    if (!block_update_) {
+      iterateUnaryInlFast(argument, byte_ptr);
+    }
+  }
+
+  void blockUpdate() override { block_update_ = true; }
+
+  void allowUpdate() override { block_update_ = false; }
+
+  void initPayload(std::uint8_t *byte_ptr) const override {
+    TypedValue *sum_ptr =
+        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+    bool *null_ptr =
+        reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
+    *sum_ptr = blank_state_.sum_;
+    *null_ptr = true;
+  }
+
   AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
+      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+      const override;
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   AggregationState* accumulateValueAccessor(
@@ -123,34 +172,51 @@ class AggregationHandleSum : public AggregationConcreteHandle {
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
+  void mergeStatesFast(const std::uint8_t *source,
+                       std::uint8_t *destination) const override;
+
   TypedValue finalize(const AggregationState &state) const override;
 
-  inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
-    return static_cast<const AggregationStateSum&>(state).sum_;
+  inline TypedValue finalizeHashTableEntry(
+      const AggregationState &state) const {
+    return static_cast<const AggregationStateSum &>(state).sum_;
+  }
+
+  inline TypedValue finalizeHashTableEntryFast(
+      const std::uint8_t *byte_ptr) const {
+    std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
+    TypedValue *sum_ptr =
+        reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_);
+    return *sum_ptr;
   }
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const override;
 
   /**
-   * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
+   * @brief Implementation of
+   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
    *        for SUM aggregation.
    */
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table) const override;
+      const AggregationStateHashTableBase &distinctify_hash_table)
+      const override;
 
   /**
-   * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
+   * @brief Implementation of
+   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
    *        for SUM aggregation.
    */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const override;
+      AggregationStateHashTableBase *aggregation_hash_table,
+      std::size_t index) const override;
 
-  void mergeGroupByHashTables(
-      const AggregationStateHashTableBase &source_hash_table,
-      AggregationStateHashTableBase *destination_hash_table) const override;
+  std::size_t getPayloadSize() const override {
+    return blank_state_.getPayloadSize();
+  }
 
  private:
   friend class AggregateFunctionSum;
@@ -168,6 +234,8 @@ class AggregationHandleSum : public AggregationConcreteHandle {
   std::unique_ptr<UncheckedBinaryOperator> fast_operator_;
   std::unique_ptr<UncheckedBinaryOperator> merge_operator_;
 
+  bool block_update_;
+
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 888d95c..e9503f7 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -146,9 +146,11 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
                       glog
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
+                      quickstep_threading_SpinMutex
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_utility_Macros)
@@ -163,6 +165,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
@@ -180,6 +183,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
@@ -204,6 +208,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
@@ -220,6 +225,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
@@ -236,6 +242,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
@@ -292,6 +299,7 @@ target_link_libraries(AggregationHandle_tests
                       quickstep_expressions_aggregation_AggregationHandleMin
                       quickstep_expressions_aggregation_AggregationHandleSum
                       quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_AggregationOperationState
                       quickstep_storage_HashTableBase
                       quickstep_storage_StorageManager
                       quickstep_types_CharType

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
index afc02ec..79d4448 100644
--- a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
@@ -28,6 +28,8 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationHandleAvg.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/FastHashTableFactory.hpp"
 #include "storage/StorageManager.hpp"
 #include "types/CharType.hpp"
 #include "types/DateOperatorOverloads.hpp"
@@ -53,51 +55,56 @@
 
 namespace quickstep {
 
-class AggregationHandleAvgTest : public::testing::Test {
+class AggregationHandleAvgTest : public ::testing::Test {
  protected:
   static const int kNumSamples = 100;
 
   // Helper method that calls AggregationHandleAvg::iterateUnaryInl() to
   // aggregate 'value' into '*state'.
   void iterateHandle(AggregationState *state, const TypedValue &value) {
-    static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_).iterateUnaryInl(
-        static_cast<AggregationStateAvg*>(state),
-        value);
+    static_cast<const AggregationHandleAvg &>(*aggregation_handle_avg_)
+        .iterateUnaryInl(static_cast<AggregationStateAvg *>(state), value);
   }
 
   void initializeHandle(const Type &type) {
     aggregation_handle_avg_.reset(
-        AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
-            std::vector<const Type*>(1, &type)));
+        AggregateFunctionFactory::Get(AggregationID::kAvg)
+            .createHandle(std::vector<const Type *>(1, &type)));
     aggregation_handle_avg_state_.reset(
         aggregation_handle_avg_->createInitialState());
   }
 
   static bool ApplyToTypesTest(TypeID typeID) {
-    const Type &type = (typeID == kChar || typeID == kVarChar) ?
-        TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
-        TypeFactory::GetType(typeID);
+    const Type &type =
+        (typeID == kChar || typeID == kVarChar)
+            ? TypeFactory::GetType(typeID, static_cast<std::size_t>(10))
+            : TypeFactory::GetType(typeID);
 
-    return AggregateFunctionFactory::Get(AggregationID::kAvg).canApplyToTypes(
-        std::vector<const Type*>(1, &type));
+    return AggregateFunctionFactory::Get(AggregationID::kAvg)
+        .canApplyToTypes(std::vector<const Type *>(1, &type));
   }
 
   static bool ResultTypeForArgumentTypeTest(TypeID input_type_id,
                                             TypeID output_type_id) {
-    const Type *result_type
-        = AggregateFunctionFactory::Get(AggregationID::kAvg).resultTypeForArgumentTypes(
-            std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+    const Type *result_type =
+        AggregateFunctionFactory::Get(AggregationID::kAvg)
+            .resultTypeForArgumentTypes(std::vector<const Type *>(
+                1, &TypeFactory::GetType(input_type_id)));
     return (result_type->getTypeID() == output_type_id);
   }
 
   template <typename CppType>
-  static void CheckAvgValue(
-      CppType expected,
-      const AggregationHandle &handle,
-      const AggregationState &state) {
+  static void CheckAvgValue(CppType expected,
+                            const AggregationHandle &handle,
+                            const AggregationState &state) {
     EXPECT_EQ(expected, handle.finalize(state).getLiteral<CppType>());
   }
 
+  template <typename CppType>
+  static void CheckAvgValue(CppType expected, const TypedValue &value) {
+    EXPECT_EQ(expected, value.getLiteral<CppType>());
+  }
+
   // Static templated method for set a meaningful value to data types.
   template <typename CppType>
   static void SetDataType(int value, CppType *data) {
@@ -108,7 +115,9 @@ class AggregationHandleAvgTest : public::testing::Test {
   void checkAggregationAvgGeneric() {
     const GenericType &type = GenericType::Instance(true);
     initializeHandle(type);
-    EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_)
+            .isNull());
 
     typename GenericType::cpptype val;
     typename GenericType::cpptype sum;
@@ -119,15 +128,16 @@ class AggregationHandleAvgTest : public::testing::Test {
       if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
         SetDataType(i - 10, &val);
       } else {
-        SetDataType(static_cast<float>(i - 10)/10, &val);
+        SetDataType(static_cast<float>(i - 10) / 10, &val);
       }
       iterateHandle(aggregation_handle_avg_state_.get(), type.makeValue(&val));
       sum += val;
     }
     iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue());
-    CheckAvgValue<typename OutputType::cpptype>(static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
-                                                *aggregation_handle_avg_,
-                                                *aggregation_handle_avg_state_);
+    CheckAvgValue<typename OutputType::cpptype>(
+        static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+        *aggregation_handle_avg_,
+        *aggregation_handle_avg_state_);
 
     // Test mergeStates().
     std::unique_ptr<AggregationState> merge_state(
@@ -140,7 +150,7 @@ class AggregationHandleAvgTest : public::testing::Test {
       if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
         SetDataType(i - 10, &val);
       } else {
-        SetDataType(static_cast<float>(i - 10)/10, &val);
+        SetDataType(static_cast<float>(i - 10) / 10, &val);
       }
       iterateHandle(merge_state.get(), type.makeValue(&val));
       sum += val;
@@ -155,7 +165,8 @@ class AggregationHandleAvgTest : public::testing::Test {
   }
 
   template <typename GenericType>
-  ColumnVector *createColumnVectorGeneric(const Type &type, typename GenericType::cpptype *sum) {
+  ColumnVector* createColumnVectorGeneric(const Type &type,
+                                          typename GenericType::cpptype *sum) {
     NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3);
 
     typename GenericType::cpptype val;
@@ -166,12 +177,12 @@ class AggregationHandleAvgTest : public::testing::Test {
       if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
         SetDataType(i - 10, &val);
       } else {
-        SetDataType(static_cast<float>(i - 10)/10, &val);
+        SetDataType(static_cast<float>(i - 10) / 10, &val);
       }
       column->appendTypedValue(type.makeValue(&val));
       *sum += val;
       // One NULL in the middle.
-      if (i == kNumSamples/2) {
+      if (i == kNumSamples / 2) {
         column->appendTypedValue(type.makeNullValue());
       }
     }
@@ -184,12 +195,15 @@ class AggregationHandleAvgTest : public::testing::Test {
   void checkAggregationAvgGenericColumnVector() {
     const GenericType &type = GenericType::Instance(true);
     initializeHandle(type);
-    EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_)
+            .isNull());
 
     typename GenericType::cpptype sum;
     SetDataType(0, &sum);
     std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    column_vectors.emplace_back(createColumnVectorGeneric<GenericType>(type, &sum));
+    column_vectors.emplace_back(
+        createColumnVectorGeneric<GenericType>(type, &sum));
 
     std::unique_ptr<AggregationState> cv_state(
         aggregation_handle_avg_->accumulateColumnVectors(column_vectors));
@@ -201,7 +215,8 @@ class AggregationHandleAvgTest : public::testing::Test {
         *aggregation_handle_avg_,
         *cv_state);
 
-    aggregation_handle_avg_->mergeStates(*cv_state, aggregation_handle_avg_state_.get());
+    aggregation_handle_avg_->mergeStates(*cv_state,
+                                         aggregation_handle_avg_state_.get());
     CheckAvgValue<typename OutputType::cpptype>(
         static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
         *aggregation_handle_avg_,
@@ -213,16 +228,19 @@ class AggregationHandleAvgTest : public::testing::Test {
   void checkAggregationAvgGenericValueAccessor() {
     const GenericType &type = GenericType::Instance(true);
     initializeHandle(type);
-    EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+    EXPECT_TRUE(
+        aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_)
+            .isNull());
 
     typename GenericType::cpptype sum;
     SetDataType(0, &sum);
-    std::unique_ptr<ColumnVectorsValueAccessor> accessor(new ColumnVectorsValueAccessor());
+    std::unique_ptr<ColumnVectorsValueAccessor> accessor(
+        new ColumnVectorsValueAccessor());
     accessor->addColumn(createColumnVectorGeneric<GenericType>(type, &sum));
 
     std::unique_ptr<AggregationState> va_state(
-        aggregation_handle_avg_->accumulateValueAccessor(accessor.get(),
-                                                         std::vector<attribute_id>(1, 0)));
+        aggregation_handle_avg_->accumulateValueAccessor(
+            accessor.get(), std::vector<attribute_id>(1, 0)));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
@@ -231,7 +249,8 @@ class AggregationHandleAvgTest : public::testing::Test {
         *aggregation_handle_avg_,
         *va_state);
 
-    aggregation_handle_avg_->mergeStates(*va_state, aggregation_handle_avg_state_.get());
+    aggregation_handle_avg_->mergeStates(*va_state,
+                                         aggregation_handle_avg_state_.get());
     CheckAvgValue<typename OutputType::cpptype>(
         static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
         *aggregation_handle_avg_,
@@ -255,12 +274,14 @@ void AggregationHandleAvgTest::CheckAvgValue<double>(
 }
 
 template <>
-void AggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(int value, DatetimeIntervalLit *data) {
+void AggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(
+    int value, DatetimeIntervalLit *data) {
   data->interval_ticks = value;
 }
 
 template <>
-void AggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(int value, YearMonthIntervalLit *data) {
+void AggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(
+    int value, YearMonthIntervalLit *data) {
   data->months = value;
 }
 
@@ -307,11 +328,13 @@ TEST_F(AggregationHandleAvgTest, DoubleTypeColumnVectorTest) {
 }
 
 TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<DatetimeIntervalType, DatetimeIntervalType>();
+  checkAggregationAvgGenericColumnVector<DatetimeIntervalType,
+                                         DatetimeIntervalType>();
 }
 
 TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeColumnVectorTest) {
-  checkAggregationAvgGenericColumnVector<YearMonthIntervalType, YearMonthIntervalType>();
+  checkAggregationAvgGenericColumnVector<YearMonthIntervalType,
+                                         YearMonthIntervalType>();
 }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -332,11 +355,13 @@ TEST_F(AggregationHandleAvgTest, DoubleTypeValueAccessorTest) {
 }
 
 TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeValueAccessorTest) {
-  checkAggregationAvgGenericValueAccessor<DatetimeIntervalType, DatetimeIntervalType>();
+  checkAggregationAvgGenericValueAccessor<DatetimeIntervalType,
+                                          DatetimeIntervalType>();
 }
 
 TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeValueAccessorTest) {
-  checkAggregationAvgGenericValueAccessor<YearMonthIntervalType, YearMonthIntervalType>();
+  checkAggregationAvgGenericValueAccessor<YearMonthIntervalType,
+                                          YearMonthIntervalType>();
 }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
@@ -365,38 +390,53 @@ TEST_F(AggregationHandleAvgDeathTest, WrongTypeTest) {
   double double_val = 0;
   float float_val = 0;
 
-  iterateHandle(aggregation_handle_avg_state_.get(), int_non_null_type.makeValue(&int_val));
+  iterateHandle(aggregation_handle_avg_state_.get(),
+                int_non_null_type.makeValue(&int_val));
 
-  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), long_type.makeValue(&long_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), double_type.makeValue(&double_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), float_type.makeValue(&float_val)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), char_type.makeValue("asdf", 5)), "");
-  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), varchar_type.makeValue("asdf", 5)), "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(),
+                             long_type.makeValue(&long_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(),
+                             double_type.makeValue(&double_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(),
+                             float_type.makeValue(&float_val)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(),
+                             char_type.makeValue("asdf", 5)),
+               "");
+  EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(),
+                             varchar_type.makeValue("asdf", 5)),
+               "");
 
   // Test mergeStates() with incorrectly typed handles.
   std::unique_ptr<AggregationHandle> aggregation_handle_avg_double(
-      AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
-          std::vector<const Type*>(1, &double_type)));
+      AggregateFunctionFactory::Get(AggregationID::kAvg)
+          .createHandle(std::vector<const Type *>(1, &double_type)));
   std::unique_ptr<AggregationState> aggregation_state_avg_merge_double(
       aggregation_handle_avg_double->createInitialState());
-  static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_double).iterateUnaryInl(
-      static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_double.get()),
-      double_type.makeValue(&double_val));
-  EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double,
-                                                    aggregation_handle_avg_state_.get()),
-               "");
+  static_cast<const AggregationHandleAvg &>(*aggregation_handle_avg_double)
+      .iterateUnaryInl(static_cast<AggregationStateAvg *>(
+                           aggregation_state_avg_merge_double.get()),
+                       double_type.makeValue(&double_val));
+  EXPECT_DEATH(
+      aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double,
+                                           aggregation_handle_avg_state_.get()),
+      "");
 
   std::unique_ptr<AggregationHandle> aggregation_handle_avg_float(
-      AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
-          std::vector<const Type*>(1, &float_type)));
+      AggregateFunctionFactory::Get(AggregationID::kAvg)
+          .createHandle(std::vector<const Type *>(1, &float_type)));
   std::unique_ptr<AggregationState> aggregation_state_avg_merge_float(
       aggregation_handle_avg_float->createInitialState());
-  static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_float).iterateUnaryInl(
-      static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_float.get()),
-      float_type.makeValue(&float_val));
-  EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float,
-                                                    aggregation_handle_avg_state_.get()),
-               "");
+  static_cast<const AggregationHandleAvg &>(*aggregation_handle_avg_float)
+      .iterateUnaryInl(static_cast<AggregationStateAvg *>(
+                           aggregation_state_avg_merge_float.get()),
+                       float_type.makeValue(&float_val));
+  EXPECT_DEATH(
+      aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float,
+                                           aggregation_handle_avg_state_.get()),
+      "");
 }
 #endif
 
@@ -417,8 +457,10 @@ TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) {
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kDouble));
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble));
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
-  EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
+  EXPECT_TRUE(
+      ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
+  EXPECT_TRUE(
+      ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
 }
 
 TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
@@ -426,25 +468,28 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
   initializeHandle(long_non_null_type);
   storage_manager_.reset(new StorageManager("./test_avg_data"));
   std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
-      aggregation_handle_avg_->createGroupByHashTable(
-          HashTableImplType::kSimpleScalarSeparateChaining,
+      AggregationStateFastHashTableFactory::CreateResizable(
+          HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
+          {aggregation_handle_avg_.get()->getPayloadSize()},
+          {aggregation_handle_avg_.get()},
           storage_manager_.get()));
   std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
-      aggregation_handle_avg_->createGroupByHashTable(
-          HashTableImplType::kSimpleScalarSeparateChaining,
+      AggregationStateFastHashTableFactory::CreateResizable(
+          HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
+          {aggregation_handle_avg_.get()->getPayloadSize()},
+          {aggregation_handle_avg_.get()},
           storage_manager_.get()));
 
-  AggregationStateHashTable<AggregationStateAvg> *destination_hash_table_derived =
-      static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+  AggregationStateFastHashTable *destination_hash_table_derived =
+      static_cast<AggregationStateFastHashTable *>(
           destination_hash_table.get());
 
-  AggregationStateHashTable<AggregationStateAvg> *source_hash_table_derived =
-      static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
-          source_hash_table.get());
+  AggregationStateFastHashTable *source_hash_table_derived =
+      static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
 
   AggregationHandleAvg *aggregation_handle_avg_derived =
       static_cast<AggregationHandleAvg *>(aggregation_handle_avg_.get());
@@ -496,36 +541,56 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
       exclusive_key_source_state.get(), exclusive_key_source_avg_val);
 
   // Add the key-state pairs to the hash tables.
-  source_hash_table_derived->putCompositeKey(common_key,
-                                             *common_key_source_state);
-  destination_hash_table_derived->putCompositeKey(
-      common_key, *common_key_destination_state);
-  source_hash_table_derived->putCompositeKey(exclusive_source_key,
-                                             *exclusive_key_source_state);
-  destination_hash_table_derived->putCompositeKey(
-      exclusive_destination_key, *exclusive_key_destination_state);
+  unsigned char buffer[100];
+  buffer[0] = '\0';
+  memcpy(buffer + 1,
+         common_key_source_state.get()->getPayloadAddress(),
+         aggregation_handle_avg_.get()->getPayloadSize());
+  source_hash_table_derived->putCompositeKey(common_key, buffer);
+
+  memcpy(buffer + 1,
+         common_key_destination_state.get()->getPayloadAddress(),
+         aggregation_handle_avg_.get()->getPayloadSize());
+  destination_hash_table_derived->putCompositeKey(common_key, buffer);
+
+  memcpy(buffer + 1,
+         exclusive_key_source_state.get()->getPayloadAddress(),
+         aggregation_handle_avg_.get()->getPayloadSize());
+  source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+
+  memcpy(buffer + 1,
+         exclusive_key_destination_state.get()->getPayloadAddress(),
+         aggregation_handle_avg_.get()->getPayloadSize());
+  destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
+                                                      buffer);
 
   EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
   EXPECT_EQ(2u, source_hash_table_derived->numEntries());
 
-  aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table,
-                                                  destination_hash_table.get());
+  AggregationOperationState::mergeGroupByHashTables(
+      source_hash_table.get(), destination_hash_table.get());
 
   EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
 
   CheckAvgValue<double>(
       (common_key_destination_avg_val.getLiteral<std::int64_t>() +
-          common_key_source_avg_val.getLiteral<std::int64_t>()) / static_cast<double>(2),
-      *aggregation_handle_avg_derived,
-      *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
-  CheckAvgValue<double>(exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
-                  *aggregation_handle_avg_derived,
-                  *(destination_hash_table_derived->getSingleCompositeKey(
-                      exclusive_destination_key)));
-  CheckAvgValue<double>(exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
-                  *aggregation_handle_avg_derived,
-                  *(source_hash_table_derived->getSingleCompositeKey(
-                      exclusive_source_key)));
+       common_key_source_avg_val.getLiteral<std::int64_t>()) /
+          static_cast<double>(2),
+      aggregation_handle_avg_derived->finalizeHashTableEntryFast(
+          destination_hash_table_derived->getSingleCompositeKey(common_key) +
+          1));
+  CheckAvgValue<double>(
+      exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
+      aggregation_handle_avg_derived->finalizeHashTableEntryFast(
+          destination_hash_table_derived->getSingleCompositeKey(
+              exclusive_destination_key) +
+          1));
+  CheckAvgValue<double>(
+      exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
+      aggregation_handle_avg_derived->finalizeHashTableEntryFast(
+          source_hash_table_derived->getSingleCompositeKey(
+              exclusive_source_key) +
+          1));
 }
 
 }  // namespace quickstep