You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/05 04:25:43 UTC

[09/17] incubator-quickstep git commit: Initial commit for QUICKSTEP-28 and QUICKSTEP-29.

Initial commit for QUICKSTEP-28 and QUICKSTEP-29.

- This PR creates a specialized hash table implementation for
  aggregation.
- A hash table's bucket can store multiple AggregationStates,
  corresponding to various AggregationHandles in the query. Earlier we
  created one hash table for each AggregationHandle.
- All Aggregation states in a single hash table bucket are protected
  using a single mutex. Earlier AggregationStates provided internal
  concurrency protection (mutex).
- Single aggregationGroupBy method in StorageBlock.
- New methods for separating unary and nullary updation of AggregationState.
- Added TODO to move method from HashTableBase class.
- Added doxygen for the AggregationHandle new functions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ac3512ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ac3512ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ac3512ce

Branch: refs/heads/lip-refactor
Commit: ac3512ceb16109702116aa2a51db382dc99ba23e
Parents: 43c7a42
Author: rathijit <ra...@node-2.hashtable.quickstep-pg0.wisc.cloudlab.us>
Authored: Mon Jul 4 02:44:48 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Sep 20 15:26:52 2016 -0500

----------------------------------------------------------------------
 catalog/CatalogTypedefs.hpp                     |    2 +
 .../aggregation/AggregationConcreteHandle.cpp   |   21 +-
 .../aggregation/AggregationConcreteHandle.hpp   |  372 +--
 expressions/aggregation/AggregationHandle.hpp   |  118 +-
 .../aggregation/AggregationHandleAvg.cpp        |  118 +-
 .../aggregation/AggregationHandleAvg.hpp        |  118 +-
 .../aggregation/AggregationHandleCount.cpp      |  180 +-
 .../aggregation/AggregationHandleCount.hpp      |  111 +-
 .../aggregation/AggregationHandleDistinct.cpp   |    5 +-
 .../aggregation/AggregationHandleDistinct.hpp   |   39 +-
 .../aggregation/AggregationHandleMax.cpp        |   98 +-
 .../aggregation/AggregationHandleMax.hpp        |  112 +-
 .../aggregation/AggregationHandleMin.cpp        |   99 +-
 .../aggregation/AggregationHandleMin.hpp        |  106 +-
 .../aggregation/AggregationHandleSum.cpp        |  112 +-
 .../aggregation/AggregationHandleSum.hpp        |  108 +-
 expressions/aggregation/CMakeLists.txt          |    8 +
 .../tests/AggregationHandleAvg_unittest.cpp     |  255 +-
 .../tests/AggregationHandleCount_unittest.cpp   |  311 ++-
 .../tests/AggregationHandleMax_unittest.cpp     |  382 +--
 .../tests/AggregationHandleMin_unittest.cpp     |  378 +--
 .../tests/AggregationHandleSum_unittest.cpp     |  291 +-
 query_optimizer/ExecutionGenerator.cpp          |   20 +-
 .../tests/AggregationOperator_unittest.cpp      |    3 +-
 storage/AggregationOperationState.cpp           |  314 ++-
 storage/AggregationOperationState.hpp           |   47 +-
 storage/CMakeLists.txt                          |   55 +
 storage/FastHashTable.hpp                       | 2515 ++++++++++++++++++
 storage/FastHashTableFactory.hpp                |  257 ++
 storage/FastSeparateChainingHashTable.hpp       | 1750 ++++++++++++
 storage/HashTable.hpp                           |   10 -
 storage/HashTableBase.hpp                       |   37 +-
 storage/HashTablePool.hpp                       |   65 +
 storage/StorageBlock.cpp                        |   27 +-
 storage/StorageBlock.hpp                        |   16 +-
 35 files changed, 6803 insertions(+), 1657 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/catalog/CatalogTypedefs.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogTypedefs.hpp b/catalog/CatalogTypedefs.hpp
index f7a2d53..70bac84 100644
--- a/catalog/CatalogTypedefs.hpp
+++ b/catalog/CatalogTypedefs.hpp
@@ -49,6 +49,8 @@ constexpr int kInvalidCatalogId = -1;
 // Used to indicate no preference for a NUMA Node ID.
 constexpr numa_node_id kAnyNUMANodeID = -1;
 
+constexpr attribute_id kInvalidAttributeID = -1;
+
 /** @} */
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index 719920f..e3fb520 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "storage/FastHashTable.hpp"
 #include "storage/HashTable.hpp"
 #include "storage/HashTableFactory.hpp"
 
@@ -51,22 +52,16 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
     AggregationStateHashTableBase *distinctify_hash_table) const {
   // If the key-value pair is already there, we don't need to update the value,
   // which should always be "true". I.e. the value is just a placeholder.
-  const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {};
 
-  AggregationStateHashTable<bool> *hash_table =
-      static_cast<AggregationStateHashTable<bool>*>(distinctify_hash_table);
+  AggregationStateFastHashTable *hash_table =
+      static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
   if (key_ids.size() == 1) {
-    hash_table->upsertValueAccessor(accessor,
-                                    key_ids[0],
-                                    true /* check_for_null_keys */,
-                                    true /* initial_value */,
-                                    &noop_upserter);
+    hash_table->upsertValueAccessorFast(
+        key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
   } else {
-    hash_table->upsertValueAccessorCompositeKey(accessor,
-                                                key_ids,
-                                                true /* check_for_null_keys */,
-                                                true /* initial_value */,
-                                                &noop_upserter);
+    std::vector<attribute_id> empty_args {kInvalidAttributeID};
+    hash_table->upsertValueAccessorCompositeKeyFast(
+        empty_args, accessor, key_ids, true /* check_for_null_keys */);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index c5ca061..398a032 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -21,13 +21,15 @@
 #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
 
 #include <cstddef>
-#include <vector>
 #include <utility>
+#include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
 #include "storage/HashTable.hpp"
 #include "storage/HashTableBase.hpp"
+#include "threading/SpinMutex.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "utility/Macros.hpp"
@@ -48,8 +50,8 @@ class ValueAccessor;
  * @brief An upserter class for modifying the destination hash table while
  *        merging two group by hash tables.
  **/
-template <typename HandleT, typename StateT>
-class HashTableStateUpserter {
+template <typename HandleT>
+class HashTableStateUpserterFast {
  public:
   /**
    * @brief Constructor.
@@ -59,7 +61,8 @@ class HashTableStateUpserter {
    *        table. The corresponding state (for the same key) in the destination
    *        hash table will be upserted.
    **/
-  HashTableStateUpserter(const HandleT &handle, const StateT &source_state)
+  HashTableStateUpserterFast(const HandleT &handle,
+                             const std::uint8_t *source_state)
       : handle_(handle), source_state_(source_state) {}
 
   /**
@@ -68,65 +71,15 @@ class HashTableStateUpserter {
    * @param destination_state The aggregation state in the aggregation hash
    *        table that is being upserted.
    **/
-  void operator()(StateT *destination_state) {
-    handle_.mergeStates(source_state_, destination_state);
+  void operator()(std::uint8_t *destination_state) {
+    handle_.mergeStatesFast(source_state_, destination_state);
   }
 
  private:
   const HandleT &handle_;
-  const StateT &source_state_;
+  const std::uint8_t *source_state_;
 
-  DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter);
-};
-
-/**
- * @brief A class to support the functor for merging group by hash tables.
- **/
-template <typename HandleT, typename StateT, typename HashTableT>
-class HashTableMerger {
- public:
-  /**
-   * @brief Constructor
-   *
-   * @param handle The Aggregation handle being used.
-   * @param destination_hash_table The destination hash table to which other
-   *        hash tables will be merged.
-   **/
-  HashTableMerger(const HandleT &handle,
-                  AggregationStateHashTableBase *destination_hash_table)
-      : handle_(handle),
-        destination_hash_table_(
-            static_cast<HashTableT *>(destination_hash_table)) {}
-
-  /**
-   * @brief The operator for the functor.
-   *
-   * @param group_by_key The group by key being merged.
-   * @param source_state The aggregation state for the given key in the source
-   *        aggregation hash table.
-   **/
-  inline void operator()(const std::vector<TypedValue> &group_by_key,
-                         const StateT &source_state) {
-    const StateT *original_state =
-        destination_hash_table_->getSingleCompositeKey(group_by_key);
-    if (original_state != nullptr) {
-      HashTableStateUpserter<HandleT, StateT> upserter(
-          handle_, source_state);
-      // The CHECK is required as upsertCompositeKey can return false if the
-      // hash table runs out of space during the upsert process. The ideal
-      // solution will be to retry again if the upsert fails.
-      CHECK(destination_hash_table_->upsertCompositeKey(
-          group_by_key, *original_state, &upserter));
-    } else {
-      destination_hash_table_->putCompositeKey(group_by_key, source_state);
-    }
-  }
-
- private:
-  const HandleT &handle_;
-  HashTableT *destination_hash_table_;
-
-  DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
+  DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast);
 };
 
 /**
@@ -156,14 +109,15 @@ class AggregationConcreteHandle : public AggregationHandle {
    */
   AggregationStateHashTableBase* createDistinctifyHashTable(
       const HashTableImplType hash_table_impl,
-      const std::vector<const Type*> &key_types,
+      const std::vector<const Type *> &key_types,
       const std::size_t estimated_num_distinct_keys,
       StorageManager *storage_manager) const override;
 
   /**
-   * @brief Implementaion for AggregationHandle::insertValueAccessorIntoDistinctifyHashTable()
-   *        that inserts the GROUP BY expressions and aggregation arguments together
-   *        as keys into the distinctify hash table.
+   * @brief Implementaion for
+   * AggregationHandle::insertValueAccessorIntoDistinctifyHashTable()
+   * that inserts the GROUP BY expressions and aggregation arguments together
+   * as keys into the distinctify hash table.
    */
   void insertValueAccessorIntoDistinctifyHashTable(
       ValueAccessor *accessor,
@@ -171,61 +125,40 @@ class AggregationConcreteHandle : public AggregationHandle {
       AggregationStateHashTableBase *distinctify_hash_table) const override;
 
  protected:
-  AggregationConcreteHandle() {
-  }
+  AggregationConcreteHandle() {}
 
-  template <typename HandleT,
-            typename StateT,
-            typename HashTableT>
-  void aggregateValueAccessorIntoHashTableNullaryHelper(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &group_by_key_ids,
-      const StateT &default_state,
-      AggregationStateHashTableBase *hash_table) const;
-
-  template <typename HandleT,
-            typename StateT,
-            typename HashTableT>
-  void aggregateValueAccessorIntoHashTableUnaryHelper(
-      ValueAccessor *accessor,
-      const attribute_id argument_id,
-      const std::vector<attribute_id> &group_by_key_ids,
-      const StateT &default_state,
-      AggregationStateHashTableBase *hash_table) const;
-
-  template <typename HandleT,
-            typename StateT>
-  StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelper(
+  template <typename HandleT, typename StateT>
+  StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
       const AggregationStateHashTableBase &distinctify_hash_table) const;
 
-  template <typename HandleT,
-            typename StateT,
-            typename HashTableT>
-  void aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
+  template <typename HandleT, typename HashTableT>
+  void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      const StateT &default_state,
-      AggregationStateHashTableBase *hash_table) const;
+      AggregationStateHashTableBase *hash_table,
+      std::size_t index) const;
 
-  template <typename HandleT,
-            typename HashTableT>
-  ColumnVector* finalizeHashTableHelper(
+  template <typename HandleT, typename HashTableT>
+  ColumnVector* finalizeHashTableHelperFast(
       const Type &result_type,
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const;
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const;
 
   template <typename HandleT, typename HashTableT>
-  inline TypedValue finalizeGroupInHashTable(
+  inline TypedValue finalizeGroupInHashTableFast(
       const AggregationStateHashTableBase &hash_table,
-      const std::vector<TypedValue> &group_key) const {
-    const AggregationState *group_state
-        = static_cast<const HashTableT&>(hash_table).getSingleCompositeKey(group_key);
+      const std::vector<TypedValue> &group_key,
+      int index) const {
+    const std::uint8_t *group_state =
+        static_cast<const HashTableT &>(hash_table).getSingleCompositeKey(group_key, index);
     DCHECK(group_state != nullptr)
         << "Could not find entry for specified group_key in HashTable";
-    return static_cast<const HandleT*>(this)->finalizeHashTableEntry(*group_state);
+    return static_cast<const HandleT *>(this)->finalizeHashTableEntryFast(
+        group_state);
   }
 
-  template <typename HandleT, typename StateT, typename HashTableT>
-  void mergeGroupByHashTablesHelper(
+  template <typename HandleT, typename HashTableT>
+  void mergeGroupByHashTablesHelperFast(
       const AggregationStateHashTableBase &source_hash_table,
       AggregationStateHashTableBase *destination_hash_table) const;
 
@@ -234,51 +167,6 @@ class AggregationConcreteHandle : public AggregationHandle {
 };
 
 /**
- * @brief Templated class to implement value-accessor-based upserter for each
- *        aggregation state payload type. This version is for nullary
- *        aggregates (those that take no arguments).
- **/
-template <typename HandleT, typename StateT>
-class NullaryAggregationStateValueAccessorUpserter {
- public:
-  explicit NullaryAggregationStateValueAccessorUpserter(const HandleT &handle)
-      : handle_(handle) {
-  }
-
-  template <typename ValueAccessorT>
-  inline void operator()(const ValueAccessorT &accessor, StateT *state) {
-    handle_.iterateNullaryInl(state);
-  }
-
- private:
-  const HandleT &handle_;
-};
-
-/**
- * @brief Templated class to implement value-accessor-based upserter for each
- *        aggregation state payload type. This version is for unary aggregates
- *        (those that take a single argument).
- **/
-template <typename HandleT, typename StateT>
-class UnaryAggregationStateValueAccessorUpserter {
- public:
-  UnaryAggregationStateValueAccessorUpserter(const HandleT &handle,
-                                             attribute_id value_id)
-    : handle_(handle),
-      value_id_(value_id) {
-  }
-
-  template <typename ValueAccessorT>
-  inline void operator()(const ValueAccessorT &accessor, StateT *state) {
-    handle_.iterateUnaryInl(state, accessor.getTypedValue(value_id_));
-  }
-
- private:
-  const HandleT &handle_;
-  const attribute_id value_id_;
-};
-
-/**
  * @brief Templated helper class used to implement
  *        AggregationHandle::finalizeHashTable() by visiting each entry (i.e.
  *        GROUP) in a HashTable, finalizing the aggregation for the GROUP, and
@@ -288,18 +176,26 @@ class UnaryAggregationStateValueAccessorUpserter {
 template <typename HandleT, typename ColumnVectorT>
 class HashTableAggregateFinalizer {
  public:
-  HashTableAggregateFinalizer(const HandleT &handle,
-                              std::vector<std::vector<TypedValue>> *group_by_keys,
-                              ColumnVectorT *output_column_vector)
+  HashTableAggregateFinalizer(
+      const HandleT &handle,
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      ColumnVectorT *output_column_vector)
       : handle_(handle),
         group_by_keys_(group_by_keys),
-        output_column_vector_(output_column_vector) {
-  }
+        output_column_vector_(output_column_vector) {}
 
   inline void operator()(const std::vector<TypedValue> &group_by_key,
                          const AggregationState &group_state) {
     group_by_keys_->emplace_back(group_by_key);
-    output_column_vector_->appendTypedValue(handle_.finalizeHashTableEntry(group_state));
+    output_column_vector_->appendTypedValue(
+        handle_.finalizeHashTableEntry(group_state));
+  }
+
+  inline void operator()(const std::vector<TypedValue> &group_by_key,
+                         const unsigned char *byte_ptr) {
+    group_by_keys_->emplace_back(group_by_key);
+    output_column_vector_->appendTypedValue(
+        handle_.finalizeHashTableEntryFast(byte_ptr));
   }
 
  private:
@@ -313,171 +209,117 @@ class HashTableAggregateFinalizer {
 // ----------------------------------------------------------------------------
 // Implementations of templated methods follow:
 
-template <typename HandleT,
-          typename StateT,
-          typename HashTableT>
-void AggregationConcreteHandle::aggregateValueAccessorIntoHashTableNullaryHelper(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &group_by_key_ids,
-    const StateT &default_state,
-    AggregationStateHashTableBase *hash_table) const {
-  NullaryAggregationStateValueAccessorUpserter<HandleT, StateT>
-      upserter(static_cast<const HandleT&>(*this));
-  static_cast<HashTableT*>(hash_table)->upsertValueAccessorCompositeKey(
-      accessor,
-      group_by_key_ids,
-      true,
-      default_state,
-      &upserter);
-}
-
-template <typename HandleT,
-          typename StateT,
-          typename HashTableT>
-void AggregationConcreteHandle::aggregateValueAccessorIntoHashTableUnaryHelper(
-    ValueAccessor *accessor,
-    const attribute_id argument_id,
-    const std::vector<attribute_id> &group_by_key_ids,
-    const StateT &default_state,
-    AggregationStateHashTableBase *hash_table) const {
-  UnaryAggregationStateValueAccessorUpserter<HandleT, StateT>
-      upserter(static_cast<const HandleT&>(*this), argument_id);
-  static_cast<HashTableT*>(hash_table)->upsertValueAccessorCompositeKey(
-      accessor,
-      group_by_key_ids,
-      true,
-      default_state,
-      &upserter);
-}
-
-template <typename HandleT,
-          typename StateT>
-StateT* AggregationConcreteHandle::aggregateOnDistinctifyHashTableForSingleUnaryHelper(
-    const AggregationStateHashTableBase &distinctify_hash_table) const {
-  const HandleT& handle = static_cast<const HandleT&>(*this);
-  StateT *state = static_cast<StateT*>(createInitialState());
+template <typename HandleT, typename StateT>
+StateT* AggregationConcreteHandle::
+    aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+        const AggregationStateHashTableBase &distinctify_hash_table) const {
+  const HandleT &handle = static_cast<const HandleT &>(*this);
+  StateT *state = static_cast<StateT *>(createInitialState());
 
   // A lambda function which will be called on each key from the distinctify
   // hash table.
-  const auto aggregate_functor = [&handle, &state](const TypedValue &key,
-                                                   const bool &dumb_placeholder) {
+  const auto aggregate_functor = [&handle, &state](
+      const TypedValue &key, const std::uint8_t &dumb_placeholder) {
     // For each (unary) key in the distinctify hash table, aggregate the key
     // into "state".
     handle.iterateUnaryInl(state, key);
   };
 
-  const AggregationStateHashTable<bool> &hash_table =
-      static_cast<const AggregationStateHashTable<bool>&>(distinctify_hash_table);
-  // Invoke the lambda function "aggregate_functor" on each key from the distinctify
-  // hash table.
+  const AggregationStateFastHashTable &hash_table =
+      static_cast<const AggregationStateFastHashTable &>(
+          distinctify_hash_table);
+  // Invoke the lambda function "aggregate_functor" on each key from the
+  // distinctify hash table.
   hash_table.forEach(&aggregate_functor);
 
   return state;
 }
 
-template <typename HandleT,
-          typename StateT,
-          typename HashTableT>
-void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
-    const AggregationStateHashTableBase &distinctify_hash_table,
-    const StateT &default_state,
-    AggregationStateHashTableBase *aggregation_hash_table) const {
-  const HandleT& handle = static_cast<const HandleT&>(*this);
-  HashTableT *target_hash_table = static_cast<HashTableT*>(aggregation_hash_table);
+template <typename HandleT, typename HashTableT>
+void AggregationConcreteHandle::
+    aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+        const AggregationStateHashTableBase &distinctify_hash_table,
+        AggregationStateHashTableBase *aggregation_hash_table,
+        std::size_t index) const {
+  const HandleT &handle = static_cast<const HandleT &>(*this);
+  HashTableT *target_hash_table =
+      static_cast<HashTableT *>(aggregation_hash_table);
 
   // A lambda function which will be called on each key-value pair from the
   // distinctify hash table.
-  const auto aggregate_functor = [&handle, &target_hash_table, &default_state](
-      std::vector<TypedValue> &key,
-      const bool &dumb_placeholder) {
+  const auto aggregate_functor = [&handle, &target_hash_table, &index](
+      std::vector<TypedValue> &key, const bool &dumb_placeholder) {
     // For each (composite) key vector in the distinctify hash table with size N.
-    // The first N-1 entries are GROUP BY columns and the last entry is the argument
-    // to be aggregated on.
+    // The first N-1 entries are GROUP BY columns and the last entry is the
+    // argument to be aggregated on.
     const TypedValue argument(std::move(key.back()));
     key.pop_back();
 
     // An upserter as lambda function for aggregating the argument into its
     // GROUP BY group's entry inside aggregation_hash_table.
-    const auto upserter = [&handle, &argument](StateT *state) {
-      handle.iterateUnaryInl(state, argument);
+    const auto upserter = [&handle, &argument](std::uint8_t *state) {
+      handle.iterateUnaryInlFast(argument, state);
     };
 
-    target_hash_table->upsertCompositeKey(key, default_state, &upserter);
+    target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index);
   };
 
-  const AggregationStateHashTable<bool> &source_hash_table =
-      static_cast<const AggregationStateHashTable<bool>&>(distinctify_hash_table);
+  const HashTableT &source_hash_table =
+      static_cast<const HashTableT &>(distinctify_hash_table);
   // Invoke the lambda function "aggregate_functor" on each composite key vector
   // from the distinctify hash table.
-  source_hash_table.forEachCompositeKey(&aggregate_functor);
+  source_hash_table.forEachCompositeKeyFast(&aggregate_functor);
 }
 
-template <typename HandleT,
-          typename HashTableT>
-ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
+template <typename HandleT, typename HashTableT>
+ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
     const Type &result_type,
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys) const {
-  const HandleT &handle = static_cast<const HandleT&>(*this);
-  const HashTableT &hash_table_concrete = static_cast<const HashTableT&>(hash_table);
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
+  const HandleT &handle = static_cast<const HandleT &>(*this);
+  const HashTableT &hash_table_concrete =
+      static_cast<const HashTableT &>(hash_table);
 
   if (group_by_keys->empty()) {
     if (NativeColumnVector::UsableForType(result_type)) {
-      NativeColumnVector *result = new NativeColumnVector(result_type,
-                                                          hash_table_concrete.numEntries());
+      NativeColumnVector *result =
+          new NativeColumnVector(result_type, hash_table_concrete.numEntries());
       HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer(
-          handle,
-          group_by_keys,
-          result);
-      hash_table_concrete.forEachCompositeKey(&finalizer);
+          handle, group_by_keys, result);
+      hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
       return result;
     } else {
-      IndirectColumnVector *result = new IndirectColumnVector(result_type,
-                                                              hash_table_concrete.numEntries());
+      IndirectColumnVector *result = new IndirectColumnVector(
+          result_type, hash_table_concrete.numEntries());
       HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer(
-          handle,
-          group_by_keys,
-          result);
-      hash_table_concrete.forEachCompositeKey(&finalizer);
+          handle, group_by_keys, result);
+      hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
       return result;
     }
   } else {
     if (NativeColumnVector::UsableForType(result_type)) {
-      NativeColumnVector *result = new NativeColumnVector(result_type,
-                                                          group_by_keys->size());
+      NativeColumnVector *result =
+          new NativeColumnVector(result_type, group_by_keys->size());
       for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
-        result->appendTypedValue(finalizeGroupInHashTable<HandleT, HashTableT>(hash_table,
-                                                                               group_by_key));
+        result->appendTypedValue(
+            finalizeGroupInHashTableFast<HandleT, HashTableT>(
+                hash_table, group_by_key, index));
       }
       return result;
     } else {
-      IndirectColumnVector *result = new IndirectColumnVector(result_type,
-                                                              hash_table_concrete.numEntries());
+      IndirectColumnVector *result = new IndirectColumnVector(
+          result_type, hash_table_concrete.numEntries());
       for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
-        result->appendTypedValue(finalizeGroupInHashTable<HandleT, HashTableT>(hash_table,
-                                                                               group_by_key));
+        result->appendTypedValue(
+            finalizeGroupInHashTableFast<HandleT, HashTableT>(
+                hash_table, group_by_key, index));
       }
       return result;
     }
   }
 }
 
-template <typename HandleT,
-          typename StateT,
-          typename HashTableT>
-void AggregationConcreteHandle::mergeGroupByHashTablesHelper(
-    const AggregationStateHashTableBase &source_hash_table,
-    AggregationStateHashTableBase *destination_hash_table) const {
-  const HandleT &handle = static_cast<const HandleT &>(*this);
-  const HashTableT &source_hash_table_concrete =
-      static_cast<const HashTableT &>(source_hash_table);
-
-  HashTableMerger<HandleT, StateT, HashTableT> merger(handle,
-                                                      destination_hash_table);
-
-  source_hash_table_concrete.forEachCompositeKey(&merger);
-}
-
 }  // namespace quickstep
 
 #endif  // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 3d6e872..48ce7fe 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -40,7 +40,6 @@ class ValueAccessor;
  *  @{
  */
 
-
 /**
  * @brief Abstract base class for aggregation state.
  **/
@@ -107,8 +106,7 @@ class AggregationHandle {
    * @brief Virtual destructor.
    *
    **/
-  virtual ~AggregationHandle() {
-  }
+  virtual ~AggregationHandle() {}
 
   /**
    * @brief Create an initial "blank" state for this aggregation.
@@ -132,11 +130,11 @@ class AggregationHandle {
    *        A StorageBlob will be allocated to serve as the HashTable's
    *        in-memory storage.
    * @return A new HashTable instance with the appropriate state type for this
-   *         aggregate as the ValueT.
+   *         aggregate.
    **/
   virtual AggregationStateHashTableBase* createGroupByHashTable(
       const HashTableImplType hash_table_impl,
-      const std::vector<const Type*> &group_by_types,
+      const std::vector<const Type *> &group_by_types,
       const std::size_t estimated_num_groups,
       StorageManager *storage_manager) const = 0;
 
@@ -261,14 +259,18 @@ class AggregationHandle {
    *        values returned in the ColumnVector. If this is already filled in,
    *        then this method will visit the GROUP BY keys in the exact order
    *        specified.
+   * @param index The index of the AggregationHandle to be finalized.
+   *
    * @return A ColumnVector containing each group's finalized aggregate value.
    **/
   virtual ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const = 0;
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const = 0;
 
   /**
-   * @brief Create a new HashTable for the distinctify step for DISTINCT aggregation.
+   * @brief Create a new HashTable for the distinctify step for DISTINCT
+   * aggregation.
    *
    * Distinctify is the first step for DISTINCT aggregation. This step inserts
    * the GROUP BY expression values and aggregation arguments together as keys
@@ -281,8 +283,8 @@ class AggregationHandle {
    * we simply treat it as a special GROUP BY case that the GROUP BY expression
    * vector is empty.
    *
-   * @param hash_table_impl The choice of which concrete HashTable implementation
-   *        to use.
+   * @param hash_table_impl The choice of which concrete HashTable
+   *        implementation to use.
    * @param key_types The types of the GROUP BY expressions together with the
    *        types of the aggregation arguments.
    * @param estimated_num_distinct_keys The estimated number of distinct keys
@@ -291,14 +293,15 @@ class AggregationHandle {
    *        This is an estimate only, and the HashTable will be resized if it
    *        becomes over-full.
    * @param storage_manager The StorageManager to use to create the HashTable.
-   *        A StorageBlob will be allocated to serve as the HashTable's in-memory
-   *        storage.
+   *        A StorageBlob will be allocated to serve as the HashTable's
+   *        in-memory storage.
+   *
    * @return A new HashTable instance with the appropriate state type for this
-   *         aggregate as the ValueT.
+   *         aggregate.
    */
   virtual AggregationStateHashTableBase* createDistinctifyHashTable(
       const HashTableImplType hash_table_impl,
-      const std::vector<const Type*> &key_types,
+      const std::vector<const Type *> &key_types,
       const std::size_t estimated_num_distinct_keys,
       StorageManager *storage_manager) const = 0;
 
@@ -306,15 +309,16 @@ class AggregationHandle {
    * @brief Inserts the GROUP BY expressions and aggregation arguments together
    * as keys into the distinctify hash table.
    *
-   * @param accessor The ValueAccessor that will be iterated over to read tuples.
+   * @param accessor The ValueAccessor that will be iterated over to read
+   *        tuples.
    * @param key_ids The attribute_ids of the GROUP BY expressions in accessor
    *        together with the attribute_ids of the arguments to this aggregate
    *        in accessor, in order.
-   * @param distinctify_hash_table The HashTable to store the GROUP BY expressions
-   *        and the aggregation arguments together as hash table keys and a bool
-   *        constant \c true as hash table value (So the hash table actually
-   *        serves as a hash set). This should have been created by calling
-   *        createDistinctifyHashTable();
+   * @param distinctify_hash_table The HashTable to store the GROUP BY
+   *        expressions and the aggregation arguments together as hash table
+   *        keys and a bool constant \c true as hash table value (So the hash
+   *        table actually serves as a hash set). This should have been created
+   *        by calling createDistinctifyHashTable();
    */
   virtual void insertValueAccessorIntoDistinctifyHashTable(
       ValueAccessor *accessor,
@@ -339,32 +343,82 @@ class AggregationHandle {
    * @brief Perform GROUP BY aggregation on the keys from the distinctify hash
    * table and upserts states into the aggregation hash table.
    *
-   * @param distinctify_hash_table Hash table which stores the GROUP BY expression
-   *        values and aggregation arguments together as hash table keys.
+   * @param distinctify_hash_table Hash table which stores the GROUP BY
+   *        expression values and aggregation arguments together as hash table
+   *        keys.
    * @param aggregation_hash_table The HashTable to upsert AggregationStates in.
    *        This should have been created by calling createGroupByHashTable() on
    *        this same AggregationHandle.
+   * @param index The index of the distinctify hash table for which we perform
+   *        the DISTINCT aggregation.
    */
   virtual void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const = 0;
+      AggregationStateHashTableBase *aggregation_hash_table,
+      std::size_t index) const = 0;
+
+  /**
+   * @brief Get the number of bytes needed to store the aggregation handle's
+   *        state.
+   **/
+  virtual std::size_t getPayloadSize() const { return 1; }
 
   /**
-   * @brief Merge two GROUP BY hash tables in one.
+   * @brief Update the aggregation state for nullary aggregation function e.g.
+   *        COUNT(*).
    *
-   * @note Both the hash tables should have the same structure.
+   * @note This function should be overloaded by those aggregation function
+   *       which can perform nullary operations, e.g. COUNT.
+   *
+   * @param byte_ptr The pointer where the aggregation state is stored.
+   **/
+  virtual void updateStateNullary(std::uint8_t *byte_ptr) const {}
+
+  /**
+   * @brief Update the aggregation state for unary aggregation function e.g.
+   *        SUM(a).
    *
-   * @param source_hash_table The hash table which will get merged.
-   * @param destination_hash_table The hash table to which we will merge the
-   *        other hash table.
+   * @param argument The argument which will be used to update the state of the
+   *        aggregation function.
+   * @param byte_ptr The pointer where the aggregation state is stored.
+   **/
+  virtual void updateStateUnary(const TypedValue &argument,
+                                std::uint8_t *byte_ptr) const {}
+
+  /**
+   * @brief Merge two aggregation states for this aggregation handle.
+   *
+   * @note This function should be used with the hash table specifically meant
+   *       for aggregations only.
+   *
+   * @param src A pointer to the source aggregation state.
+   * @param dst A pointer to the destination aggregation state.
+   **/
+  virtual void mergeStatesFast(const std::uint8_t *src,
+                               std::uint8_t *dst) const {}
+
+  /**
+   * @brief Initialize the payload (in the aggregation hash table) for the given
+   *        aggregation handle.
+   *
+   * @param byte_ptr The pointer to the aggregation state in the hash table.
+   **/
+  virtual void initPayload(std::uint8_t *byte_ptr) const {}
+
+  /**
+   * @brief Inform the aggregation handle to block (prohibit) updates on the
+   *        aggregation state.
+   **/
+  virtual void blockUpdate() {}
+
+  /**
+   * @brief Inform the aggregation handle to allow updates on the
+   *        aggregation state.
    **/
-  virtual void mergeGroupByHashTables(
-      const AggregationStateHashTableBase &source_hash_table,
-      AggregationStateHashTableBase *destination_hash_table) const = 0;
+  virtual void allowUpdate() {}
 
  protected:
-  AggregationHandle() {
-  }
+  AggregationHandle() {}
 
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationHandle);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 4bd43d6..2481092 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -42,7 +42,7 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleAvg::AggregationHandleAvg(const Type &type)
-    : argument_type_(type) {
+    : argument_type_(type), block_update_(false) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_precision_id;
@@ -76,26 +76,24 @@ AggregationHandleAvg::AggregationHandleAvg(const Type &type)
   // Divide operator for dividing sum by count to get final average.
   divide_operator_.reset(
       BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
-          .makeUncheckedBinaryOperatorForTypes(sum_type, TypeFactory::GetType(kDouble)));
+          .makeUncheckedBinaryOperatorForTypes(sum_type,
+                                               TypeFactory::GetType(kDouble)));
 
   // Result is nullable, because AVG() over 0 values (or all NULL values) is
   // NULL.
-  result_type_
-      = &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
-              .resultTypeForArgumentTypes(sum_type, TypeFactory::GetType(kDouble))
-                  ->getNullableVersion());
+  result_type_ =
+      &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+            .resultTypeForArgumentTypes(sum_type, TypeFactory::GetType(kDouble))
+            ->getNullableVersion());
 }
 
 AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
     const HashTableImplType hash_table_impl,
-    const std::vector<const Type*> &group_by_types,
+    const std::vector<const Type *> &group_by_types,
     const std::size_t estimated_num_groups,
     StorageManager *storage_manager) const {
   return AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable(
-      hash_table_impl,
-      group_by_types,
-      estimated_num_groups,
-      storage_manager);
+      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
 }
 
 AggregationState* AggregationHandleAvg::accumulateColumnVectors(
@@ -105,9 +103,8 @@ AggregationState* AggregationHandleAvg::accumulateColumnVectors(
 
   AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
   std::size_t count = 0;
-  state->sum_ = fast_add_operator_->accumulateColumnVector(state->sum_,
-                                                           *column_vectors.front(),
-                                                           &count);
+  state->sum_ = fast_add_operator_->accumulateColumnVector(
+      state->sum_, *column_vectors.front(), &count);
   state->count_ = count;
   return state;
 }
@@ -121,10 +118,8 @@ AggregationState* AggregationHandleAvg::accumulateValueAccessor(
 
   AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
   std::size_t count = 0;
-  state->sum_ = fast_add_operator_->accumulateValueAccessor(state->sum_,
-                                                            accessor,
-                                                            accessor_ids.front(),
-                                                            &count);
+  state->sum_ = fast_add_operator_->accumulateValueAccessor(
+      state->sum_, accessor, accessor_ids.front(), &count);
   state->count_ = count;
   return state;
 }
@@ -137,79 +132,74 @@ void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
     AggregationStateHashTableBase *hash_table) const {
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of arguments for AVG: " << argument_ids.size();
-
-  aggregateValueAccessorIntoHashTableUnaryHelper<
-      AggregationHandleAvg,
-      AggregationStateAvg,
-      AggregationStateHashTable<AggregationStateAvg>>(
-          accessor,
-          argument_ids.front(),
-          group_by_key_ids,
-          blank_state_,
-          hash_table);
 }
 
-void AggregationHandleAvg::mergeStates(
-    const AggregationState &source,
-    AggregationState *destination) const {
-  const AggregationStateAvg &avg_source = static_cast<const AggregationStateAvg&>(source);
-  AggregationStateAvg *avg_destination = static_cast<AggregationStateAvg*>(destination);
+void AggregationHandleAvg::mergeStates(const AggregationState &source,
+                                       AggregationState *destination) const {
+  const AggregationStateAvg &avg_source =
+      static_cast<const AggregationStateAvg &>(source);
+  AggregationStateAvg *avg_destination =
+      static_cast<AggregationStateAvg *>(destination);
 
   SpinMutexLock lock(avg_destination->mutex_);
   avg_destination->count_ += avg_source.count_;
-  avg_destination->sum_ = merge_add_operator_->applyToTypedValues(avg_destination->sum_,
-                                                                  avg_source.sum_);
+  avg_destination->sum_ = merge_add_operator_->applyToTypedValues(
+      avg_destination->sum_, avg_source.sum_);
+}
+
+void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source,
+                                           std::uint8_t *destination) const {
+  const TypedValue *src_sum_ptr =
+      reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
+  const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(
+      source + blank_state_.count_offset_);
+  TypedValue *dst_sum_ptr =
+      reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_);
+  std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(
+      destination + blank_state_.count_offset_);
+  (*dst_count_ptr) += (*src_count_ptr);
+  *dst_sum_ptr =
+      merge_add_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
 }
 
 TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
-  const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state);
+  const AggregationStateAvg &agg_state =
+      static_cast<const AggregationStateAvg &>(state);
   if (agg_state.count_ == 0) {
     // AVG() over no values is NULL.
     return result_type_->makeNullValue();
   } else {
     // Divide sum by count to get final average.
-    return divide_operator_->applyToTypedValues(agg_state.sum_,
-                                                TypedValue(static_cast<double>(agg_state.count_)));
+    return divide_operator_->applyToTypedValues(
+        agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
   }
 }
 
 ColumnVector* AggregationHandleAvg::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys) const {
-  return finalizeHashTableHelper<AggregationHandleAvg,
-                                 AggregationStateHashTable<AggregationStateAvg>>(
-      *result_type_,
-      hash_table,
-      group_by_keys);
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
+  return finalizeHashTableHelperFast<AggregationHandleAvg,
+                                     AggregationStateFastHashTable>(
+      *result_type_, hash_table, group_by_keys, index);
 }
 
-AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
+AggregationState*
+AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
       AggregationHandleAvg,
-      AggregationStateAvg>(
-          distinctify_hash_table);
+      AggregationStateAvg>(distinctify_hash_table);
 }
 
 void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+    AggregationStateHashTableBase *aggregation_hash_table,
+    std::size_t index) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
       AggregationHandleAvg,
-      AggregationStateAvg,
-      AggregationStateHashTable<AggregationStateAvg>>(
-          distinctify_hash_table,
-          blank_state_,
-          aggregation_hash_table);
-}
-
-void AggregationHandleAvg::mergeGroupByHashTables(
-    const AggregationStateHashTableBase &source_hash_table,
-    AggregationStateHashTableBase *destination_hash_table) const {
-  mergeGroupByHashTablesHelper<AggregationHandleAvg,
-                               AggregationStateAvg,
-                               AggregationStateHashTable<AggregationStateAvg>>(
-      source_hash_table, destination_hash_table);
+      AggregationStateFastHashTable>(
+      distinctify_hash_table, aggregation_hash_table, index);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 31997b1..3c6e0c2 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
@@ -57,26 +58,45 @@ class AggregationStateAvg : public AggregationState {
    */
   AggregationStateAvg(const AggregationStateAvg &orig)
       : sum_(orig.sum_),
-        count_(orig.count_) {
-  }
+        count_(orig.count_),
+        sum_offset_(orig.sum_offset_),
+        count_offset_(orig.count_offset_),
+        mutex_offset_(orig.mutex_offset_) {}
 
   /**
    * @brief Destructor.
    */
   ~AggregationStateAvg() override {}
 
+  std::size_t getPayloadSize() const {
+    std::size_t p1 = reinterpret_cast<std::size_t>(&sum_);
+    std::size_t p2 = reinterpret_cast<std::size_t>(&mutex_);
+    return (p2 - p1);
+  }
+
+  const std::uint8_t *getPayloadAddress() const {
+    return reinterpret_cast<const uint8_t *>(&sum_);
+  }
+
  private:
   friend class AggregationHandleAvg;
 
   AggregationStateAvg()
-      : sum_(0), count_(0) {
-  }
+      : sum_(0),
+        count_(0),
+        sum_offset_(0),
+        count_offset_(reinterpret_cast<std::uint8_t *>(&count_) -
+                      reinterpret_cast<std::uint8_t *>(&sum_)),
+        mutex_offset_(reinterpret_cast<std::uint8_t *>(&mutex_) -
+                      reinterpret_cast<std::uint8_t *>(&sum_)) {}
 
   // TODO(shoban): We might want to specialize sum_ and count_ to use atomics
   // for int types similar to in AggregationStateCount.
   TypedValue sum_;
   std::int64_t count_;
   SpinMutex mutex_;
+
+  int sum_offset_, count_offset_, mutex_offset_;
 };
 
 /**
@@ -84,8 +104,7 @@ class AggregationStateAvg : public AggregationState {
  **/
 class AggregationHandleAvg : public AggregationConcreteHandle {
  public:
-  ~AggregationHandleAvg() override {
-  }
+  ~AggregationHandleAvg() override {}
 
   AggregationState* createInitialState() const override {
     return new AggregationStateAvg(blank_state_);
@@ -93,14 +112,15 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
 
   AggregationStateHashTableBase* createGroupByHashTable(
       const HashTableImplType hash_table_impl,
-      const std::vector<const Type*> &group_by_types,
+      const std::vector<const Type *> &group_by_types,
       const std::size_t estimated_num_groups,
       StorageManager *storage_manager) const override;
 
   /**
    * @brief Iterate method with average aggregation state.
    **/
-  inline void iterateUnaryInl(AggregationStateAvg *state, const TypedValue &value) const {
+  inline void iterateUnaryInl(AggregationStateAvg *state,
+                              const TypedValue &value) const {
     DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
     if (value.isNull()) return;
 
@@ -109,8 +129,41 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
     ++state->count_;
   }
 
+  inline void iterateUnaryInlFast(const TypedValue &value,
+                                  std::uint8_t *byte_ptr) const {
+    DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
+    if (value.isNull()) return;
+    TypedValue *sum_ptr =
+        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+    std::int64_t *count_ptr =
+        reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset_);
+    *sum_ptr = fast_add_operator_->applyToTypedValues(*sum_ptr, value);
+    ++(*count_ptr);
+  }
+
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    if (!block_update_) {
+      iterateUnaryInlFast(argument, byte_ptr);
+    }
+  }
+
+  void blockUpdate() override { block_update_ = true; }
+
+  void allowUpdate() override { block_update_ = false; }
+
+  void initPayload(std::uint8_t *byte_ptr) const override {
+    TypedValue *sum_ptr =
+        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+    std::int64_t *count_ptr =
+        reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset_);
+    *sum_ptr = blank_state_.sum_;
+    *count_ptr = blank_state_.count_;
+  }
+
   AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
+      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+      const override;
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   AggregationState* accumulateValueAccessor(
@@ -127,40 +180,61 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
+  void mergeStatesFast(const std::uint8_t *source,
+                       std::uint8_t *destination) const override;
+
   TypedValue finalize(const AggregationState &state) const override;
 
-  inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
-    const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state);
+  inline TypedValue finalizeHashTableEntry(
+      const AggregationState &state) const {
+    const AggregationStateAvg &agg_state =
+        static_cast<const AggregationStateAvg &>(state);
     // TODO(chasseur): Could improve performance further if we made a special
     // version of finalizeHashTable() that collects all the sums into one
     // ColumnVector and all the counts into another and then applies
     // '*divide_operator_' to them in bulk.
-    return divide_operator_->applyToTypedValues(agg_state.sum_,
-                                                TypedValue(static_cast<double>(agg_state.count_)));
+    return divide_operator_->applyToTypedValues(
+        agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
+  }
+
+  inline TypedValue finalizeHashTableEntryFast(
+      const std::uint8_t *byte_ptr) const {
+    std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
+    TypedValue *sum_ptr =
+        reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_);
+    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(
+        value_ptr + blank_state_.count_offset_);
+    return divide_operator_->applyToTypedValues(
+        *sum_ptr, TypedValue(static_cast<double>(*count_ptr)));
   }
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const override;
 
   /**
-   * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
+   * @brief Implementation of
+   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
    *        for AVG aggregation.
    */
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table) const override;
+      const AggregationStateHashTableBase &distinctify_hash_table)
+      const override;
 
   /**
-   * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
+   * @brief Implementation of
+   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
    *        for AVG aggregation.
    */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const override;
+      AggregationStateHashTableBase *aggregation_hash_table,
+      std::size_t index) const override;
 
-  void mergeGroupByHashTables(
-      const AggregationStateHashTableBase &source_hash_table,
-      AggregationStateHashTableBase *destination_hash_table) const override;
+  std::size_t getPayloadSize() const override {
+    return blank_state_.getPayloadSize();
+  }
 
  private:
   friend class AggregateFunctionAvg;
@@ -179,6 +253,8 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
   std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
   std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
 
+  bool block_update_;
+
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index dfcf131..034c942 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -49,48 +49,47 @@ class ValueAccessor;
 
 template <bool count_star, bool nullable_type>
 AggregationStateHashTableBase*
-    AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable(
-        const HashTableImplType hash_table_impl,
-        const std::vector<const Type*> &group_by_types,
-        const std::size_t estimated_num_groups,
-        StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateCount>::CreateResizable(
-      hash_table_impl,
-      group_by_types,
-      estimated_num_groups,
-      storage_manager);
+AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable(
+    const HashTableImplType hash_table_impl,
+    const std::vector<const Type *> &group_by_types,
+    const std::size_t estimated_num_groups,
+    StorageManager *storage_manager) const {
+  return AggregationStateHashTableFactory<
+      AggregationStateCount>::CreateResizable(hash_table_impl,
+                                              group_by_types,
+                                              estimated_num_groups,
+                                              storage_manager);
 }
 
 template <bool count_star, bool nullable_type>
 AggregationState*
-    AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
-        const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
+    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
   DCHECK(!count_star)
       << "Called non-nullary accumulation method on an AggregationHandleCount "
       << "set up for nullary COUNT(*)";
 
   DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for COUNT: " << column_vectors.size();
+      << "Got wrong number of ColumnVectors for COUNT: "
+      << column_vectors.size();
 
   std::size_t count = 0;
   InvokeOnColumnVector(
       *column_vectors.front(),
       [&](const auto &column_vector) -> void {  // NOLINT(build/c++11)
-    if (nullable_type) {
-      // TODO(shoban): Iterating over the ColumnVector is a rather slow way to
-      // do this. We should look at extending the ColumnVector interface to do
-      // a quick count of the non-null values (i.e. the length minus the
-      // population count of the null bitmap). We should do something similar
-      // for ValueAccessor too.
-      for (std::size_t pos = 0;
-           pos < column_vector.size();
-           ++pos) {
-        count += !column_vector.getTypedValue(pos).isNull();
-      }
-    } else {
-      count = column_vector.size();
-    }
-  });
+        if (nullable_type) {
+          // TODO(shoban): Iterating over the ColumnVector is a rather slow way
+          // to do this. We should look at extending the ColumnVector interface
+          // to do a quick count of the non-null values (i.e. the length minus
+          // the population count of the null bitmap). We should do something
+          // similar for ValueAccessor too.
+          for (std::size_t pos = 0; pos < column_vector.size(); ++pos) {
+            count += !column_vector.getTypedValue(pos).isNull();
+          }
+        } else {
+          count = column_vector.size();
+        }
+      });
 
   return new AggregationStateCount(count);
 }
@@ -98,9 +97,9 @@ AggregationState*
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 template <bool count_star, bool nullable_type>
 AggregationState*
-    AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
-        ValueAccessor *accessor,
-        const std::vector<attribute_id> &accessor_ids) const {
+AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
+    ValueAccessor *accessor,
+    const std::vector<attribute_id> &accessor_ids) const {
   DCHECK(!count_star)
       << "Called non-nullary accumulation method on an AggregationHandleCount "
       << "set up for nullary COUNT(*)";
@@ -113,108 +112,91 @@ AggregationState*
   InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
       accessor,
       [&accessor_id, &count](auto *accessor) -> void {  // NOLINT(build/c++11)
-    if (nullable_type) {
-      while (accessor->next()) {
-        count += !accessor->getTypedValue(accessor_id).isNull();
-      }
-    } else {
-      count = accessor->getNumTuples();
-    }
-  });
+        if (nullable_type) {
+          while (accessor->next()) {
+            count += !accessor->getTypedValue(accessor_id).isNull();
+          }
+        } else {
+          count = accessor->getNumTuples();
+        }
+      });
 
   return new AggregationStateCount(count);
 }
 #endif
 
 template <bool count_star, bool nullable_type>
-    void AggregationHandleCount<count_star, nullable_type>::aggregateValueAccessorIntoHashTable(
+void AggregationHandleCount<count_star, nullable_type>::
+    aggregateValueAccessorIntoHashTable(
         ValueAccessor *accessor,
         const std::vector<attribute_id> &argument_ids,
         const std::vector<attribute_id> &group_by_key_ids,
         AggregationStateHashTableBase *hash_table) const {
   if (count_star) {
     DCHECK_EQ(0u, argument_ids.size())
-        << "Got wrong number of arguments for COUNT(*): " << argument_ids.size();
-    aggregateValueAccessorIntoHashTableNullaryHelper<
-        AggregationHandleCount<count_star, nullable_type>,
-        AggregationStateCount,
-        AggregationStateHashTable<AggregationStateCount>>(
-            accessor,
-            group_by_key_ids,
-            AggregationStateCount(),
-            hash_table);
+        << "Got wrong number of arguments for COUNT(*): "
+        << argument_ids.size();
   } else {
     DCHECK_EQ(1u, argument_ids.size())
         << "Got wrong number of arguments for COUNT: " << argument_ids.size();
-    aggregateValueAccessorIntoHashTableUnaryHelper<
-        AggregationHandleCount<count_star, nullable_type>,
-        AggregationStateCount,
-        AggregationStateHashTable<AggregationStateCount>>(
-            accessor,
-            argument_ids.front(),
-            group_by_key_ids,
-            AggregationStateCount(),
-            hash_table);
   }
 }
 
 template <bool count_star, bool nullable_type>
-    void AggregationHandleCount<count_star, nullable_type>::mergeStates(
-        const AggregationState &source,
-        AggregationState *destination) const {
-  const AggregationStateCount &count_source = static_cast<const AggregationStateCount&>(source);
-  AggregationStateCount *count_destination = static_cast<AggregationStateCount*>(destination);
-
-  count_destination->count_.fetch_add(count_source.count_.load(std::memory_order_relaxed),
-                                      std::memory_order_relaxed);
+void AggregationHandleCount<count_star, nullable_type>::mergeStates(
+    const AggregationState &source, AggregationState *destination) const {
+  const AggregationStateCount &count_source =
+      static_cast<const AggregationStateCount &>(source);
+  AggregationStateCount *count_destination =
+      static_cast<AggregationStateCount *>(destination);
+
+  count_destination->count_.fetch_add(
+      count_source.count_.load(std::memory_order_relaxed),
+      std::memory_order_relaxed);
 }
 
 template <bool count_star, bool nullable_type>
-    ColumnVector* AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
-        const AggregationStateHashTableBase &hash_table,
-        std::vector<std::vector<TypedValue>> *group_by_keys) const {
-  return finalizeHashTableHelper<AggregationHandleCount<count_star, nullable_type>,
-                                 AggregationStateHashTable<AggregationStateCount>>(
-      TypeFactory::GetType(kLong),
-      hash_table,
-      group_by_keys);
+void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast(
+    const std::uint8_t *source, std::uint8_t *destination) const {
+  const std::int64_t *src_count_ptr =
+      reinterpret_cast<const std::int64_t *>(source);
+  std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(destination);
+  (*dst_count_ptr) += (*src_count_ptr);
 }
 
 template <bool count_star, bool nullable_type>
-AggregationState* AggregationHandleCount<count_star, nullable_type>
-    ::aggregateOnDistinctifyHashTableForSingle(
-        const AggregationStateHashTableBase &distinctify_hash_table) const {
-  DCHECK_EQ(count_star, false);
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+ColumnVector*
+AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
+    const AggregationStateHashTableBase &hash_table,
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
+  return finalizeHashTableHelperFast<
       AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateCount>(
-          distinctify_hash_table);
+      AggregationStateFastHashTable>(
+      TypeFactory::GetType(kLong), hash_table, group_by_keys, index);
 }
 
 template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>
-    ::aggregateOnDistinctifyHashTableForGroupBy(
-        const AggregationStateHashTableBase &distinctify_hash_table,
-        AggregationStateHashTableBase *aggregation_hash_table) const {
+AggregationState* AggregationHandleCount<count_star, nullable_type>::
+    aggregateOnDistinctifyHashTableForSingle(
+        const AggregationStateHashTableBase &distinctify_hash_table) const {
   DCHECK_EQ(count_star, false);
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
       AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateCount,
-      AggregationStateHashTable<AggregationStateCount>>(
-          distinctify_hash_table,
-          AggregationStateCount(),
-          aggregation_hash_table);
+      AggregationStateCount>(distinctify_hash_table);
 }
 
 template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::mergeGroupByHashTables(
-    const AggregationStateHashTableBase &source_hash_table,
-    AggregationStateHashTableBase *destination_hash_table) const {
-  mergeGroupByHashTablesHelper<
-      AggregationHandleCount,
-      AggregationStateCount,
-      AggregationStateHashTable<AggregationStateCount>>(source_hash_table,
-                                                        destination_hash_table);
+void AggregationHandleCount<count_star, nullable_type>::
+    aggregateOnDistinctifyHashTableForGroupBy(
+        const AggregationStateHashTableBase &distinctify_hash_table,
+        AggregationStateHashTableBase *aggregation_hash_table,
+        std::size_t index) const {
+  DCHECK_EQ(count_star, false);
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+      AggregationHandleCount<count_star, nullable_type>,
+      AggregationStateFastHashTable>(
+      distinctify_hash_table, aggregation_hash_table, index);
 }
 
 // Explicitly instantiate and compile in the different versions of

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 1cd5bda..6aab0cd 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -29,6 +29,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
@@ -40,7 +41,8 @@ class StorageManager;
 class Type;
 class ValueAccessor;
 
-template <bool, bool> class AggregationHandleCount;
+template <bool, bool>
+class AggregationHandleCount;
 
 /** \addtogroup Expressions
  *  @{
@@ -62,19 +64,22 @@ class AggregationStateCount : public AggregationState {
    */
   ~AggregationStateCount() override {}
 
+  std::size_t getPayloadSize() const { return sizeof(count_); }
+
+  const std::uint8_t* getPayloadAddress() const {
+    return reinterpret_cast<const uint8_t *>(&count_);
+  }
+
  private:
   friend class AggregationHandleCount<false, false>;
   friend class AggregationHandleCount<false, true>;
   friend class AggregationHandleCount<true, false>;
   friend class AggregationHandleCount<true, true>;
 
-  AggregationStateCount()
-      : count_(0) {
-  }
+  AggregationStateCount() : count_(0) {}
 
   explicit AggregationStateCount(const std::int64_t initial_count)
-      : count_(initial_count) {
-  }
+      : count_(initial_count) {}
 
   std::atomic<std::int64_t> count_;
 };
@@ -91,8 +96,7 @@ class AggregationStateCount : public AggregationState {
 template <bool count_star, bool nullable_type>
 class AggregationHandleCount : public AggregationConcreteHandle {
  public:
-  ~AggregationHandleCount() override {
-  }
+  ~AggregationHandleCount() override {}
 
   AggregationState* createInitialState() const override {
     return new AggregationStateCount();
@@ -100,7 +104,7 @@ class AggregationHandleCount : public AggregationConcreteHandle {
 
   AggregationStateHashTableBase* createGroupByHashTable(
       const HashTableImplType hash_table_impl,
-      const std::vector<const Type*> &group_by_types,
+      const std::vector<const Type *> &group_by_types,
       const std::size_t estimated_num_groups,
       StorageManager *storage_manager) const override;
 
@@ -108,21 +112,59 @@ class AggregationHandleCount : public AggregationConcreteHandle {
     state->count_.fetch_add(1, std::memory_order_relaxed);
   }
 
+  inline void iterateNullaryInlFast(std::uint8_t *byte_ptr) const {
+    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+    (*count_ptr)++;
+  }
+
   /**
    * @brief Iterate with count aggregation state.
    */
-  inline void iterateUnaryInl(AggregationStateCount *state, const TypedValue &value) const {
+  inline void iterateUnaryInl(AggregationStateCount *state,
+                              const TypedValue &value) const {
     if ((!nullable_type) || (!value.isNull())) {
       state->count_.fetch_add(1, std::memory_order_relaxed);
     }
   }
 
-  AggregationState* accumulateNullary(const std::size_t num_tuples) const override {
+  inline void iterateUnaryInlFast(const TypedValue &value,
+                                  std::uint8_t *byte_ptr) const {
+    if ((!nullable_type) || (!value.isNull())) {
+      std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+      (*count_ptr)++;
+    }
+  }
+
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    if (!block_update_) {
+      iterateUnaryInlFast(argument, byte_ptr);
+    }
+  }
+
+  inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
+    if (!block_update_) {
+      iterateNullaryInlFast(byte_ptr);
+    }
+  }
+
+  void blockUpdate() override { block_update_ = true; }
+
+  void allowUpdate() override { block_update_ = false; }
+
+  void initPayload(std::uint8_t *byte_ptr) const override {
+    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+    *count_ptr = 0;
+  }
+
+  AggregationState* accumulateNullary(
+      const std::size_t num_tuples) const override {
     return new AggregationStateCount(num_tuples);
   }
 
   AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
+      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+      const override;
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   AggregationState* accumulateValueAccessor(
@@ -139,36 +181,54 @@ class AggregationHandleCount : public AggregationConcreteHandle {
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
+  void mergeStatesFast(const std::uint8_t *source,
+                       std::uint8_t *destination) const override;
+
   TypedValue finalize(const AggregationState &state) const override {
-    return TypedValue(static_cast<const AggregationStateCount&>(state).count_.load(std::memory_order_relaxed));
+    return TypedValue(
+        static_cast<const AggregationStateCount &>(state).count_.load(
+            std::memory_order_relaxed));
+  }
+
+  inline TypedValue finalizeHashTableEntry(
+      const AggregationState &state) const {
+    return TypedValue(
+        static_cast<const AggregationStateCount &>(state).count_.load(
+            std::memory_order_relaxed));
   }
 
-  inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
-    return TypedValue(static_cast<const AggregationStateCount&>(state).count_.load(std::memory_order_relaxed));
+  inline TypedValue finalizeHashTableEntryFast(
+      const std::uint8_t *byte_ptr) const {
+    const std::int64_t *count_ptr =
+        reinterpret_cast<const std::int64_t *>(byte_ptr);
+    return TypedValue(*count_ptr);
   }
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const override;
 
   /**
-   * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
+   * @brief Implementation of
+   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
    *        for SUM aggregation.
    */
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table) const override;
+      const AggregationStateHashTableBase &distinctify_hash_table)
+      const override;
 
   /**
-   * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
+   * @brief Implementation of
+   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
    *        for SUM aggregation.
    */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table) const override;
+      AggregationStateHashTableBase *aggregation_hash_table,
+      std::size_t index) const override;
 
-  void mergeGroupByHashTables(
-      const AggregationStateHashTableBase &source_hash_table,
-      AggregationStateHashTableBase *destination_hash_table) const override;
+  std::size_t getPayloadSize() const override { return sizeof(std::int64_t); }
 
  private:
   friend class AggregateFunctionCount;
@@ -176,8 +236,9 @@ class AggregationHandleCount : public AggregationConcreteHandle {
   /**
    * @brief Constructor.
    **/
-  AggregationHandleCount() {
-  }
+  AggregationHandleCount() : block_update_(false) {}
+
+  bool block_update_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
index 68fcd4c..0dc8b56 100644
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ b/expressions/aggregation/AggregationHandleDistinct.cpp
@@ -65,14 +65,15 @@ void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable(
 
 ColumnVector* AggregationHandleDistinct::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys) const {
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
   DCHECK(group_by_keys->empty());
 
   const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
                                                const bool &dumb_placeholder) -> void {
     group_by_keys->emplace_back(std::move(group_by_key));
   };
-  static_cast<const AggregationStateHashTable<bool>&>(hash_table).forEachCompositeKey(&keys_retriever);
+  static_cast<const AggregationStateFastHashTable&>(hash_table).forEachCompositeKeyFast(&keys_retriever);
 
   return nullptr;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
index 8524fcc..838bfdd 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -49,27 +49,32 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
   /**
    * @brief Constructor.
    **/
-  AggregationHandleDistinct() {
-  }
+  AggregationHandleDistinct() {}
 
   AggregationState* createInitialState() const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support createInitialState().";
+    LOG(FATAL)
+        << "AggregationHandleDistinct does not support createInitialState().";
   }
 
-  AggregationState* accumulateNullary(const std::size_t num_tuples) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support accumulateNullary().";
+  AggregationState* accumulateNullary(
+      const std::size_t num_tuples) const override {
+    LOG(FATAL)
+        << "AggregationHandleDistinct does not support accumulateNullary().";
   }
 
   AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support accumulateColumnVectors().";
+      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
+      const override {
+    LOG(FATAL) << "AggregationHandleDistinct does not support "
+                  "accumulateColumnVectors().";
   }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   AggregationState* accumulateValueAccessor(
       ValueAccessor *accessor,
       const std::vector<attribute_id> &accessor_ids) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support accumulateValueAccessor().";
+    LOG(FATAL) << "AggregationHandleDistinct does not support "
+                  "accumulateValueAccessor().";
   }
 #endif
 
@@ -83,21 +88,23 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
   }
 
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table) const override {
+      const AggregationStateHashTableBase &distinctify_hash_table)
+      const override {
     LOG(FATAL) << "AggregationHandleDistinct does not support "
                << "aggregateOnDistinctifyHashTableForSingle().";
   }
 
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *groupby_hash_table) const override {
+      AggregationStateHashTableBase *groupby_hash_table,
+      std::size_t index) const override {
     LOG(FATAL) << "AggregationHandleDistinct does not support "
                << "aggregateOnDistinctifyHashTableForGroupBy().";
   }
 
   AggregationStateHashTableBase* createGroupByHashTable(
       const HashTableImplType hash_table_impl,
-      const std::vector<const Type*> &group_by_types,
+      const std::vector<const Type *> &group_by_types,
       const std::size_t estimated_num_groups,
       StorageManager *storage_manager) const override;
 
@@ -109,14 +116,8 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
-
-  void mergeGroupByHashTables(
-      const AggregationStateHashTableBase &source_hash_table,
-      AggregationStateHashTableBase *destination_hash_table) const override {
-    LOG(FATAL)
-        << "AggregationHandleDistinct does not support mergeGroupByHashTables";
-  }
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const override;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index 435f5f2..c2d571b 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -39,22 +39,19 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleMax::AggregationHandleMax(const Type &type)
-    : type_(type) {
-  fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kGreater)
-                         .makeUncheckedComparatorForTypes(type,
-                                                          type.getNonNullableVersion()));
+    : type_(type), block_update_(false) {
+  fast_comparator_.reset(
+      ComparisonFactory::GetComparison(ComparisonID::kGreater)
+          .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
 }
 
 AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable(
     const HashTableImplType hash_table_impl,
-    const std::vector<const Type*> &group_by_types,
+    const std::vector<const Type *> &group_by_types,
     const std::size_t estimated_num_groups,
     StorageManager *storage_manager) const {
   return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable(
-      hash_table_impl,
-      group_by_types,
-      estimated_num_groups,
-      storage_manager);
+      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
 }
 
 AggregationState* AggregationHandleMax::accumulateColumnVectors(
@@ -62,9 +59,8 @@ AggregationState* AggregationHandleMax::accumulateColumnVectors(
   DCHECK_EQ(1u, column_vectors.size())
       << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size();
 
-  return new AggregationStateMax(
-      fast_comparator_->accumulateColumnVector(type_.getNullableVersion().makeNullValue(),
-                                               *column_vectors.front()));
+  return new AggregationStateMax(fast_comparator_->accumulateColumnVector(
+      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
 }
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -74,10 +70,10 @@ AggregationState* AggregationHandleMax::accumulateValueAccessor(
   DCHECK_EQ(1u, accessor_ids.size())
       << "Got wrong number of attributes for MAX: " << accessor_ids.size();
 
-  return new AggregationStateMax(
-      fast_comparator_->accumulateValueAccessor(type_.getNullableVersion().makeNullValue(),
-                                                accessor,
-                                                accessor_ids.front()));
+  return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
+      type_.getNullableVersion().makeNullValue(),
+      accessor,
+      accessor_ids.front()));
 }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
@@ -88,66 +84,54 @@ void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
     AggregationStateHashTableBase *hash_table) const {
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of arguments for MAX: " << argument_ids.size();
-
-  aggregateValueAccessorIntoHashTableUnaryHelper<
-      AggregationHandleMax,
-      AggregationStateMax,
-      AggregationStateHashTable<AggregationStateMax>>(
-          accessor,
-          argument_ids.front(),
-          group_by_key_ids,
-          AggregationStateMax(type_),
-          hash_table);
 }
 
-void AggregationHandleMax::mergeStates(
-    const AggregationState &source,
-    AggregationState *destination) const {
-  const AggregationStateMax &max_source = static_cast<const AggregationStateMax&>(source);
-  AggregationStateMax *max_destination = static_cast<AggregationStateMax*>(destination);
+void AggregationHandleMax::mergeStates(const AggregationState &source,
+                                       AggregationState *destination) const {
+  const AggregationStateMax &max_source =
+      static_cast<const AggregationStateMax &>(source);
+  AggregationStateMax *max_destination =
+      static_cast<AggregationStateMax *>(destination);
 
   if (!max_source.max_.isNull()) {
     compareAndUpdate(max_destination, max_source.max_);
   }
 }
 
+void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source,
+                                           std::uint8_t *destination) const {
+  const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source);
+  TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination);
+  if (!(src_max_ptr->isNull())) {
+    compareAndUpdateFast(dst_max_ptr, *src_max_ptr);
+  }
+}
+
 ColumnVector* AggregationHandleMax::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys) const {
-  return finalizeHashTableHelper<AggregationHandleMax,
-                                 AggregationStateHashTable<AggregationStateMax>>(
-      type_.getNullableVersion(),
-      hash_table,
-      group_by_keys);
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
+  return finalizeHashTableHelperFast<AggregationHandleMax,
+                                     AggregationStateFastHashTable>(
+      type_.getNullableVersion(), hash_table, group_by_keys, index);
 }
 
-AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
+AggregationState*
+AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
       AggregationHandleMax,
-      AggregationStateMax>(
-          distinctify_hash_table);
+      AggregationStateMax>(distinctify_hash_table);
 }
 
 void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+    AggregationStateHashTableBase *aggregation_hash_table,
+    std::size_t index) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
       AggregationHandleMax,
-      AggregationStateMax,
-      AggregationStateHashTable<AggregationStateMax>>(
-          distinctify_hash_table,
-          AggregationStateMax(type_),
-          aggregation_hash_table);
-}
-
-void AggregationHandleMax::mergeGroupByHashTables(
-    const AggregationStateHashTableBase &source_hash_table,
-    AggregationStateHashTableBase *destination_hash_table) const {
-  mergeGroupByHashTablesHelper<AggregationHandleMax,
-                               AggregationStateMax,
-                               AggregationStateHashTable<AggregationStateMax>>(
-      source_hash_table, destination_hash_table);
+      AggregationStateFastHashTable>(
+      distinctify_hash_table, aggregation_hash_table, index);
 }
 
 }  // namespace quickstep