You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:53 UTC

[29/30] incubator-quickstep git commit: Initial commit for QUICKSTEP-28 and QUICKSTEP-29. Code refactoring and cleanup, some more optimizations are pending.

Initial commit for QUICKSTEP-28 and QUICKSTEP-29. Code refactoring and cleanup, some more optimizations are pending.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/39485d62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/39485d62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/39485d62

Branch: refs/heads/quickstep-28-29
Commit: 39485d620402273890e92072e05f856fd9da7f9d
Parents: ccea2ff
Author: rathijit <ra...@node-2.hashtable.quickstep-pg0.wisc.cloudlab.us>
Authored: Mon Jul 4 02:44:48 2016 -0500
Committer: rathijit <ra...@node-2.aggregation.quickstep-pg0.wisc.cloudlab.us>
Committed: Fri Aug 5 14:23:35 2016 -0500

----------------------------------------------------------------------
 .../aggregation/AggregationConcreteHandle.cpp   |   29 +-
 .../aggregation/AggregationConcreteHandle.hpp   |  223 ++
 expressions/aggregation/AggregationHandle.hpp   |    8 +-
 .../aggregation/AggregationHandleAvg.cpp        |   40 +-
 .../aggregation/AggregationHandleAvg.hpp        |   62 +-
 .../aggregation/AggregationHandleCount.cpp      |   38 +-
 .../aggregation/AggregationHandleCount.hpp      |   50 +-
 .../aggregation/AggregationHandleDistinct.cpp   |    2 +-
 .../aggregation/AggregationHandleDistinct.hpp   |    2 +-
 .../aggregation/AggregationHandleMax.cpp        |   29 +-
 .../aggregation/AggregationHandleMax.hpp        |   39 +-
 .../aggregation/AggregationHandleMin.cpp        |   30 +-
 .../aggregation/AggregationHandleMin.hpp        |   44 +-
 .../aggregation/AggregationHandleSum.cpp        |   31 +-
 .../aggregation/AggregationHandleSum.hpp        |   52 +-
 expressions/aggregation/CMakeLists.txt          |    7 +
 storage/AggregationOperationState.cpp           |   95 +-
 storage/AggregationOperationState.hpp           |    7 +-
 storage/CMakeLists.txt                          |   58 +
 storage/FastHashTable.hpp                       | 2640 ++++++++++++++++++
 storage/FastHashTableFactory.hpp                |  300 ++
 storage/FastSeparateChainingHashTable.hpp       | 1761 ++++++++++++
 storage/HashTableBase.hpp                       |    2 +-
 storage/HashTablePool.hpp                       |   42 +
 storage/StorageBlock.cpp                        |   88 +-
 storage/StorageBlock.hpp                        |    8 +
 threading/SpinMutex.hpp                         |    2 +
 27 files changed, 5587 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index 05ca58d..d808302 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -22,6 +22,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "storage/HashTable.hpp"
+#include "storage/FastHashTable.hpp"
 #include "storage/HashTableFactory.hpp"
 
 namespace quickstep {
@@ -49,22 +50,24 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
     AggregationStateHashTableBase *distinctify_hash_table) const {
   // If the key-value pair is already there, we don't need to update the value,
   // which should always be "true". I.e. the value is just a placeholder.
-  const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {};
+//  const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {};
 
-  AggregationStateHashTable<bool> *hash_table =
-      static_cast<AggregationStateHashTable<bool>*>(distinctify_hash_table);
+  AggregationStateFastHashTable *hash_table =
+      static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
   if (key_ids.size() == 1) {
-    hash_table->upsertValueAccessor(accessor,
-                                    key_ids[0],
-                                    true /* check_for_null_keys */,
-                                    true /* initial_value */,
-                                    &noop_upserter);
+// TODO(rathijit): fix
+//    hash_table->upsertValueAccessor(accessor,
+//                                    key_ids[0],
+//                                    true /* check_for_null_keys */,
+//                                    true /* initial_value */,
+//                                    &noop_upserter);
   } else {
-    hash_table->upsertValueAccessorCompositeKey(accessor,
-                                                key_ids,
-                                                true /* check_for_null_keys */,
-                                                true /* initial_value */,
-                                                &noop_upserter);
+    std::vector<std::vector<attribute_id>> empty_args;
+    empty_args.resize(1);
+    hash_table->upsertValueAccessorCompositeKeyFast(empty_args,
+                                                    accessor,
+                                                    key_ids,
+                                                    true /* check_for_null_keys */);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 0267e17..c5d7b3c 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -31,6 +31,7 @@
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "utility/Macros.hpp"
+#include "threading/SpinMutex.hpp"
 
 #include "glog/logging.h"
 
@@ -79,6 +80,37 @@ class HashTableStateUpserter {
   DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter);
 };
 
+template <typename HandleT>
+class HashTableStateUpserterFast {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param handle The aggregation handle being used.
+   * @param source_state The aggregation state in the source aggregation hash
+   *        table. The corresponding state (for the same key) in the destination
+   *        hash table will be upserted.
+   **/
+  HashTableStateUpserterFast(const HandleT &handle, const uint8_t *source_state)
+      : handle_(handle), source_state_(source_state) {}
+
+  /**
+   * @brief The operator for the functor required for the upsert.
+   *
+   * @param destination_state The aggregation state in the aggregation hash
+   *        table that is being upserted.
+   **/
+  void operator()(uint8_t *destination_state) {
+    handle_.mergeStatesFast(source_state_, destination_state);
+  }
+
+ private:
+  const HandleT &handle_;
+  const uint8_t *source_state_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast);
+};
+
 /**
  * @brief A class to support the functor for merging group by hash tables.
  **/
@@ -129,6 +161,53 @@ class HashTableMerger {
   DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
 };
 
+template <typename HandleT, typename HashTableT>
+class HashTableMergerFast {
+ public:
+  /**
+   * @brief Constructor
+   *
+   * @param handle The Aggregation handle being used.
+   * @param destination_hash_table The destination hash table to which other
+   *        hash tables will be merged.
+   **/
+  HashTableMergerFast(const HandleT &handle,
+                  AggregationStateHashTableBase *destination_hash_table)
+      : handle_(handle),
+        destination_hash_table_(
+            static_cast<HashTableT *>(destination_hash_table)) {}
+
+  /**
+   * @brief The operator for the functor.
+   *
+   * @param group_by_key The group by key being merged.
+   * @param source_state The aggregation state for the given key in the source
+   *        aggregation hash table.
+   **/
+  inline void operator()(const std::vector<TypedValue> &group_by_key,
+                         const uint8_t *source_state) {
+    const uint8_t *original_state =
+        destination_hash_table_->getSingleCompositeKey(group_by_key);
+    if (original_state != nullptr) {
+      HashTableStateUpserterFast<HandleT> upserter(
+          handle_, source_state);
+      // The CHECK is required as upsertCompositeKey can return false if the
+      // hash table runs out of space during the upsert process. The ideal
+      // solution will be to retry again if the upsert fails.
+      CHECK(destination_hash_table_->upsertCompositeKeyFast(
+          group_by_key, original_state, &upserter));
+    } else {
+      destination_hash_table_->putCompositeKeyFast(group_by_key, source_state);
+    }
+  }
+
+ private:
+  const HandleT &handle_;
+  HashTableT *destination_hash_table_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashTableMergerFast);
+};
+
 /**
  * @brief The helper intermediate subclass of AggregationHandle that provides
  *        virtual method implementations as well as helper methods that are
@@ -208,11 +287,26 @@ class AggregationConcreteHandle : public AggregationHandle {
 
   template <typename HandleT,
             typename HashTableT>
+  void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+      const AggregationStateHashTableBase &distinctify_hash_table,
+      AggregationStateHashTableBase *hash_table) const;
+
+
+  template <typename HandleT,
+            typename HashTableT>
   ColumnVector* finalizeHashTableHelper(
       const Type &result_type,
       const AggregationStateHashTableBase &hash_table,
       std::vector<std::vector<TypedValue>> *group_by_keys) const;
 
+  template <typename HandleT,
+            typename HashTableT>
+  ColumnVector* finalizeHashTableHelperFast(
+      const Type &result_type,
+      const AggregationStateHashTableBase &hash_table,
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const;
+
   template <typename HandleT, typename HashTableT>
   inline TypedValue finalizeGroupInHashTable(
       const AggregationStateHashTableBase &hash_table,
@@ -224,11 +318,29 @@ class AggregationConcreteHandle : public AggregationHandle {
     return static_cast<const HandleT*>(this)->finalizeHashTableEntry(*group_state);
   }
 
+  template <typename HandleT, typename HashTableT>
+  inline TypedValue finalizeGroupInHashTableFast(
+      const AggregationStateHashTableBase &hash_table,
+      const std::vector<TypedValue> &group_key,
+      int index) const {
+    const std::uint8_t *group_state
+        = static_cast<const HashTableT&>(hash_table).getSingleCompositeKey(group_key, index);
+    DCHECK(group_state != nullptr)
+        << "Could not find entry for specified group_key in HashTable";
+    return static_cast<const HandleT*>(this)->finalizeHashTableEntryFast(group_state);
+  }
+
   template <typename HandleT, typename StateT, typename HashTableT>
   void mergeGroupByHashTablesHelper(
       const AggregationStateHashTableBase &source_hash_table,
       AggregationStateHashTableBase *destination_hash_table) const;
 
+  template <typename HandleT, typename HashTableT>
+  void mergeGroupByHashTablesHelperFast(
+      const AggregationStateHashTableBase &source_hash_table,
+      AggregationStateHashTableBase *destination_hash_table) const;
+
+
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle);
 };
@@ -302,6 +414,12 @@ class HashTableAggregateFinalizer {
     output_column_vector_->appendTypedValue(handle_.finalizeHashTableEntry(group_state));
   }
 
+  inline void operator()(const std::vector<TypedValue> &group_by_key,
+                         const unsigned char *byte_ptr) {
+    group_by_keys_->emplace_back(group_by_key);
+    output_column_vector_->appendTypedValue(handle_.finalizeHashTableEntryFast(byte_ptr));
+  }
+
  private:
   const HandleT &handle_;
   std::vector<std::vector<TypedValue>> *group_by_keys_;
@@ -414,6 +532,42 @@ void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHe
 
 template <typename HandleT,
           typename HashTableT>
+void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+    const AggregationStateHashTableBase &distinctify_hash_table,
+    AggregationStateHashTableBase *aggregation_hash_table) const {
+  const HandleT& handle = static_cast<const HandleT&>(*this);
+  HashTableT *target_hash_table = static_cast<HashTableT*>(aggregation_hash_table);
+
+  // A lambda function which will be called on each key-value pair from the
+  // distinctify hash table.
+  const auto aggregate_functor = [&handle, &target_hash_table](
+      std::vector<TypedValue> &key,
+      const bool &dumb_placeholder) {
+    // For each (composite) key vector in the distinctify hash table with size N.
+    // The first N-1 entries are GROUP BY columns and the last entry is the argument
+    // to be aggregated on.
+    const TypedValue argument(std::move(key.back()));
+    key.pop_back();
+
+    // An upserter as lambda function for aggregating the argument into its
+    // GROUP BY group's entry inside aggregation_hash_table.
+    const auto upserter = [&handle, &argument](std::uint8_t *state) {
+      handle.iterateUnaryInlFast(argument, state+sizeof(SpinMutex));
+    };
+
+    target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter);
+  };
+
+  const HashTableT &source_hash_table =
+      static_cast<const HashTableT&>(distinctify_hash_table);
+  // Invoke the lambda function "aggregate_functor" on each composite key vector
+  // from the distinctify hash table.
+  source_hash_table.forEachCompositeKeyFast(&aggregate_functor);
+}
+
+
+template <typename HandleT,
+          typename HashTableT>
 ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
     const Type &result_type,
     const AggregationStateHashTableBase &hash_table,
@@ -463,6 +617,59 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
 }
 
 template <typename HandleT,
+          typename HashTableT>
+ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
+    const Type &result_type,
+    const AggregationStateHashTableBase &hash_table,
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
+  const HandleT &handle = static_cast<const HandleT&>(*this);
+  const HashTableT &hash_table_concrete = static_cast<const HashTableT&>(hash_table);
+
+  if (group_by_keys->empty()) {
+    if (NativeColumnVector::UsableForType(result_type)) {
+      NativeColumnVector *result = new NativeColumnVector(result_type,
+                                                          hash_table_concrete.numEntries());
+      HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer(
+          handle,
+          group_by_keys,
+          result);
+      hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
+      return result;
+    } else {
+      IndirectColumnVector *result = new IndirectColumnVector(result_type,
+                                                              hash_table_concrete.numEntries());
+      HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer(
+          handle,
+          group_by_keys,
+          result);
+      hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
+      return result;
+    }
+  } else {
+    if (NativeColumnVector::UsableForType(result_type)) {
+      NativeColumnVector *result = new NativeColumnVector(result_type,
+                                                          group_by_keys->size());
+      for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
+        result->appendTypedValue(finalizeGroupInHashTableFast<HandleT, HashTableT>(hash_table,
+                                                                                   group_by_key,
+                                                                                   index));
+      }
+      return result;
+    } else {
+      IndirectColumnVector *result = new IndirectColumnVector(result_type,
+                                                              hash_table_concrete.numEntries());
+      for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
+        result->appendTypedValue(finalizeGroupInHashTableFast<HandleT, HashTableT>(hash_table,
+                                                                                   group_by_key,
+                                                                                   index));
+      }
+      return result;
+    }
+  }
+}
+
+template <typename HandleT,
           typename StateT,
           typename HashTableT>
 void AggregationConcreteHandle::mergeGroupByHashTablesHelper(
@@ -478,6 +685,22 @@ void AggregationConcreteHandle::mergeGroupByHashTablesHelper(
   source_hash_table_concrete.forEachCompositeKey(&merger);
 }
 
+template <typename HandleT,
+          typename HashTableT>
+void AggregationConcreteHandle::mergeGroupByHashTablesHelperFast(
+    const AggregationStateHashTableBase &source_hash_table,
+    AggregationStateHashTableBase *destination_hash_table) const {
+  const HandleT &handle = static_cast<const HandleT &>(*this);
+  const HashTableT &source_hash_table_concrete =
+      static_cast<const HashTableT &>(source_hash_table);
+
+  HashTableMergerFast<HandleT, HashTableT> merger(handle,
+                                              destination_hash_table);
+
+  source_hash_table_concrete.forEachCompositeKeyFast(&merger);
+}
+
+
 }  // namespace quickstep
 
 #endif  // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index cdebb03..d9c3897 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -265,7 +265,7 @@ class AggregationHandle {
    **/
   virtual ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const = 0;
+      std::vector<std::vector<TypedValue>> *group_by_keys, int index) const = 0;
 
   /**
    * @brief Create a new HashTable for the distinctify step for DISTINCT aggregation.
@@ -362,6 +362,12 @@ class AggregationHandle {
       const AggregationStateHashTableBase &source_hash_table,
       AggregationStateHashTableBase *destination_hash_table) const = 0;
 
+  virtual size_t getPayloadSize() const {return 8;}
+  virtual void setPayloadOffset(std::size_t) {}
+  virtual void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) {}
+  virtual void mergeStatesFast(const uint8_t *src, uint8_t *dst) const {}
+  virtual void initPayload(uint8_t *byte_ptr) {}
+
  protected:
   AggregationHandle() {
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 42a2fb9..d0bd3b8 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -137,8 +137,7 @@ void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
     AggregationStateHashTableBase *hash_table) const {
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of arguments for AVG: " << argument_ids.size();
-
-  aggregateValueAccessorIntoHashTableUnaryHelper<
+/*  aggregateValueAccessorIntoHashTableUnaryHelper<
       AggregationHandleAvg,
       AggregationStateAvg,
       AggregationStateHashTable<AggregationStateAvg>>(
@@ -146,7 +145,14 @@ void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
           argument_ids.front(),
           group_by_key_ids,
           blank_state_,
-          hash_table);
+          hash_table); */
+
+/*     static_cast<AggregationStateFastHashTable *>(hash_table)->upsertValueAccessorCompositeKeyFast(
+      argument_ids.front(),
+      accessor,
+      group_by_key_ids,
+      true,
+      const_cast<AggregationHandleAvg *>(this));*/
 }
 
 void AggregationHandleAvg::mergeStates(
@@ -161,6 +167,19 @@ void AggregationHandleAvg::mergeStates(
                                                                   avg_source.sum_);
 }
 
+void AggregationHandleAvg::mergeStatesFast(
+    const uint8_t *source,
+    uint8_t *destination) const {
+    const TypedValue *src_sum_ptr = reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset);
+    const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(source + blank_state_.count_offset);
+    TypedValue *dst_sum_ptr = reinterpret_cast<TypedValue *>(destination+blank_state_.sum_offset);
+    std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(destination + blank_state_.count_offset);
+    (*dst_count_ptr) += (*src_count_ptr);
+    *dst_sum_ptr = merge_add_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
+}
+
+
+
 TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
   const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state);
   if (agg_state.count_ == 0) {
@@ -175,12 +194,14 @@ TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
 
 ColumnVector* AggregationHandleAvg::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys) const {
-  return finalizeHashTableHelper<AggregationHandleAvg,
-                                 AggregationStateHashTable<AggregationStateAvg>>(
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
+  return finalizeHashTableHelperFast<AggregationHandleAvg,
+                                 AggregationStateFastHashTable>(
       *result_type_,
       hash_table,
-      group_by_keys);
+      group_by_keys,
+      index);
 }
 
 AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
@@ -206,9 +227,8 @@ void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
 void AggregationHandleAvg::mergeGroupByHashTables(
     const AggregationStateHashTableBase &source_hash_table,
     AggregationStateHashTableBase *destination_hash_table) const {
-  mergeGroupByHashTablesHelper<AggregationHandleAvg,
-                               AggregationStateAvg,
-                               AggregationStateHashTable<AggregationStateAvg>>(
+  mergeGroupByHashTablesHelperFast<AggregationHandleAvg,
+                               AggregationStateFastHashTable>(
       source_hash_table, destination_hash_table);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 4ad4b21..fe178f6 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -29,6 +29,7 @@
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -57,7 +58,10 @@ class AggregationStateAvg : public AggregationState {
    */
   AggregationStateAvg(const AggregationStateAvg &orig)
       : sum_(orig.sum_),
-        count_(orig.count_) {
+        count_(orig.count_),
+        sum_offset(orig.sum_offset),
+        count_offset(orig.count_offset),
+        mutex_offset(orig.mutex_offset) {
   }
 
   /**
@@ -65,11 +69,19 @@ class AggregationStateAvg : public AggregationState {
    */
   ~AggregationStateAvg() override {}
 
+  size_t getPayloadSize() const {
+     size_t p1 = reinterpret_cast<size_t>(&sum_);
+     size_t p2 = reinterpret_cast<size_t>(&mutex_);
+     return (p2-p1);
+  }
+
  private:
   friend class AggregationHandleAvg;
 
   AggregationStateAvg()
-      : sum_(0), count_(0) {
+      : sum_(0), count_(0), sum_offset(0),
+        count_offset(reinterpret_cast<uint8_t *>(&count_)-reinterpret_cast<uint8_t *>(&sum_)),
+        mutex_offset(reinterpret_cast<uint8_t *>(&mutex_)-reinterpret_cast<uint8_t *>(&sum_)) {
   }
 
   // TODO(shoban): We might want to specialize sum_ and count_ to use atomics
@@ -77,6 +89,8 @@ class AggregationStateAvg : public AggregationState {
   TypedValue sum_;
   std::int64_t count_;
   SpinMutex mutex_;
+
+  int sum_offset, count_offset, mutex_offset;
 };
 
 /**
@@ -109,6 +123,26 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
     ++state->count_;
   }
 
+  inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+    DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
+    if (value.isNull()) return;
+    TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
+    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset);
+    *sum_ptr = fast_add_operator_->applyToTypedValues(*sum_ptr, value);
+    ++(*count_ptr);
+  }
+
+  inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+     iterateUnaryInlFast(arguments.front(), byte_ptr);
+  }
+
+  void initPayload(uint8_t *byte_ptr) override {
+    TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
+    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset);
+    *sum_ptr = blank_state_.sum_;
+    *count_ptr = blank_state_.count_;
+  }
+
   AggregationState* accumulateColumnVectors(
       const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
 
@@ -127,6 +161,9 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
+  void mergeStatesFast(const uint8_t *source,
+                   uint8_t *destination) const override;
+
   TypedValue finalize(const AggregationState &state) const override;
 
   inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
@@ -139,9 +176,24 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
                                                 TypedValue(static_cast<double>(agg_state.count_)));
   }
 
+  inline TypedValue finalizeHashTableEntryFast(const uint8_t *byte_ptr) const {
+//    const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state);
+    // TODO(chasseur): Could improve performance further if we made a special
+    // version of finalizeHashTable() that collects all the sums into one
+    // ColumnVector and all the counts into another and then applies
+    // '*divide_operator_' to them in bulk.
+
+    uint8_t *value_ptr = const_cast<uint8_t*>(byte_ptr);
+    TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset);
+    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(value_ptr + blank_state_.count_offset);
+    return divide_operator_->applyToTypedValues(*sum_ptr,
+                                                TypedValue(static_cast<double>(*count_ptr)));
+  }
+
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const override;
 
   /**
    * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
@@ -162,6 +214,10 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
       const AggregationStateHashTableBase &source_hash_table,
       AggregationStateHashTableBase *destination_hash_table) const override;
 
+  size_t getPayloadSize() const override {
+      return blank_state_.getPayloadSize();
+  }
+
  private:
   friend class AggregateFunctionAvg;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 964b7c2..dc11dac 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -135,18 +135,18 @@ template <bool count_star, bool nullable_type>
   if (count_star) {
     DCHECK_EQ(0u, argument_ids.size())
         << "Got wrong number of arguments for COUNT(*): " << argument_ids.size();
-    aggregateValueAccessorIntoHashTableNullaryHelper<
+/*    aggregateValueAccessorIntoHashTableNullaryHelper<
         AggregationHandleCount<count_star, nullable_type>,
         AggregationStateCount,
         AggregationStateHashTable<AggregationStateCount>>(
             accessor,
             group_by_key_ids,
             AggregationStateCount(),
-            hash_table);
+            hash_table);*/
   } else {
     DCHECK_EQ(1u, argument_ids.size())
         << "Got wrong number of arguments for COUNT: " << argument_ids.size();
-    aggregateValueAccessorIntoHashTableUnaryHelper<
+/*    aggregateValueAccessorIntoHashTableUnaryHelper<
         AggregationHandleCount<count_star, nullable_type>,
         AggregationStateCount,
         AggregationStateHashTable<AggregationStateCount>>(
@@ -154,7 +154,7 @@ template <bool count_star, bool nullable_type>
             argument_ids.front(),
             group_by_key_ids,
             AggregationStateCount(),
-            hash_table);
+            hash_table); */
   }
 }
 
@@ -170,14 +170,25 @@ template <bool count_star, bool nullable_type>
 }
 
 template <bool count_star, bool nullable_type>
+void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast(
+    const uint8_t *source,
+    uint8_t *destination) const {
+    const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(source);
+    std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(destination);
+    (*dst_count_ptr) += (*src_count_ptr);
+}
+
+template <bool count_star, bool nullable_type>
     ColumnVector* AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
         const AggregationStateHashTableBase &hash_table,
-        std::vector<std::vector<TypedValue>> *group_by_keys) const {
-  return finalizeHashTableHelper<AggregationHandleCount<count_star, nullable_type>,
-                                 AggregationStateHashTable<AggregationStateCount>>(
+        std::vector<std::vector<TypedValue>> *group_by_keys,
+        int index) const {
+  return finalizeHashTableHelperFast<AggregationHandleCount<count_star, nullable_type>,
+                                 AggregationStateFastHashTable>(
       TypeFactory::GetType(kLong),
       hash_table,
-      group_by_keys);
+      group_by_keys,
+      index);
 }
 
 template <bool count_star, bool nullable_type>
@@ -197,12 +208,10 @@ void AggregationHandleCount<count_star, nullable_type>
         const AggregationStateHashTableBase &distinctify_hash_table,
         AggregationStateHashTableBase *aggregation_hash_table) const {
   DCHECK_EQ(count_star, false);
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
       AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateCount,
-      AggregationStateHashTable<AggregationStateCount>>(
+      AggregationStateFastHashTable>(
           distinctify_hash_table,
-          AggregationStateCount(),
           aggregation_hash_table);
 }
 
@@ -210,10 +219,9 @@ template <bool count_star, bool nullable_type>
 void AggregationHandleCount<count_star, nullable_type>::mergeGroupByHashTables(
     const AggregationStateHashTableBase &source_hash_table,
     AggregationStateHashTableBase *destination_hash_table) const {
-  mergeGroupByHashTablesHelper<
+  mergeGroupByHashTablesHelperFast<
       AggregationHandleCount,
-      AggregationStateCount,
-      AggregationStateHashTable<AggregationStateCount>>(source_hash_table,
+      AggregationStateFastHashTable>(source_hash_table,
                                                         destination_hash_table);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 50138b9..4046106 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -30,6 +30,7 @@
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
 
@@ -62,6 +63,10 @@ class AggregationStateCount : public AggregationState {
    */
   ~AggregationStateCount() override {}
 
+  size_t getPayloadSize() const {
+     return sizeof(count_);
+  }
+
  private:
   friend class AggregationHandleCount<false, false>;
   friend class AggregationHandleCount<false, true>;
@@ -108,6 +113,11 @@ class AggregationHandleCount : public AggregationConcreteHandle {
     state->count_.fetch_add(1, std::memory_order_relaxed);
   }
 
+  inline void iterateNullaryInlFast(uint8_t *byte_ptr) {
+      std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+      (*count_ptr)++;
+  }
+
   /**
    * @brief Iterate with count aggregation state.
    */
@@ -117,6 +127,25 @@ class AggregationHandleCount : public AggregationConcreteHandle {
     }
   }
 
+  inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) const {
+    if ((!nullable_type) || (!value.isNull())) {
+      std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+      (*count_ptr)++;
+    }
+  }
+
+  inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+     if (arguments.size())
+         iterateUnaryInlFast(arguments.front(), byte_ptr);
+     else
+         iterateNullaryInlFast(byte_ptr);
+  }
+
+  void initPayload(uint8_t *byte_ptr) override {
+    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+    *count_ptr = 0;
+  }
+
   AggregationState* accumulateNullary(const std::size_t num_tuples) const override {
     return new AggregationStateCount(num_tuples);
   }
@@ -139,6 +168,9 @@ class AggregationHandleCount : public AggregationConcreteHandle {
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
+  void mergeStatesFast(const uint8_t *source,
+                   uint8_t *destination) const override;
+
   TypedValue finalize(const AggregationState &state) const override {
     return TypedValue(static_cast<const AggregationStateCount&>(state).count_.load(std::memory_order_relaxed));
   }
@@ -147,9 +179,21 @@ class AggregationHandleCount : public AggregationConcreteHandle {
     return TypedValue(static_cast<const AggregationStateCount&>(state).count_.load(std::memory_order_relaxed));
   }
 
+  inline TypedValue finalizeHashTableEntryFast(const uint8_t *byte_ptr) const {
+//    const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state);
+    // TODO(chasseur): Could improve performance further if we made a special
+    // version of finalizeHashTable() that collects all the sums into one
+    // ColumnVector and all the counts into another and then applies
+    // '*divide_operator_' to them in bulk.
+
+    const std::int64_t *count_ptr = reinterpret_cast<const std::int64_t *>(byte_ptr);
+    return TypedValue(*count_ptr);
+  }
+
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const override;
 
   /**
    * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
@@ -170,6 +214,10 @@ class AggregationHandleCount : public AggregationConcreteHandle {
       const AggregationStateHashTableBase &source_hash_table,
       AggregationStateHashTableBase *destination_hash_table) const override;
 
+  size_t getPayloadSize() const override {
+      return sizeof(std::int64_t);
+  }
+
  private:
   friend class AggregateFunctionCount;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
index fe8ffcf..0088239 100644
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ b/expressions/aggregation/AggregationHandleDistinct.cpp
@@ -63,7 +63,7 @@ void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable(
 
 ColumnVector* AggregationHandleDistinct::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys) const {
+    std::vector<std::vector<TypedValue>> *group_by_keys, int index) const {
   DCHECK(group_by_keys->empty());
 
   const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
index 6342c2b..1e36845 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -107,7 +107,7 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+      std::vector<std::vector<TypedValue>> *group_by_keys, int index) const override;
 
   void mergeGroupByHashTables(
       const AggregationStateHashTableBase &source_hash_table,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index a7a4a52..f0eebde 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -87,7 +87,7 @@ void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of arguments for MAX: " << argument_ids.size();
 
-  aggregateValueAccessorIntoHashTableUnaryHelper<
+/*  aggregateValueAccessorIntoHashTableUnaryHelper<
       AggregationHandleMax,
       AggregationStateMax,
       AggregationStateHashTable<AggregationStateMax>>(
@@ -95,7 +95,7 @@ void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
           argument_ids.front(),
           group_by_key_ids,
           AggregationStateMax(type_),
-          hash_table);
+          hash_table);*/
 }
 
 void AggregationHandleMax::mergeStates(
@@ -109,14 +109,26 @@ void AggregationHandleMax::mergeStates(
   }
 }
 
+void AggregationHandleMax::mergeStatesFast(
+    const std::uint8_t *source,
+    std::uint8_t *destination) const {
+    const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source);
+    TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination);
+    if (!(src_max_ptr->isNull())) {
+      compareAndUpdateFast(dst_max_ptr, *src_max_ptr);
+  }
+}
+
 ColumnVector* AggregationHandleMax::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys) const {
-  return finalizeHashTableHelper<AggregationHandleMax,
-                                 AggregationStateHashTable<AggregationStateMax>>(
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
+  return finalizeHashTableHelperFast<AggregationHandleMax,
+                                 AggregationStateFastHashTable>(
       type_.getNullableVersion(),
       hash_table,
-      group_by_keys);
+      group_by_keys,
+      index);
 }
 
 AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
@@ -142,9 +154,8 @@ void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
 void AggregationHandleMax::mergeGroupByHashTables(
     const AggregationStateHashTableBase &source_hash_table,
     AggregationStateHashTableBase *destination_hash_table) const {
-  mergeGroupByHashTablesHelper<AggregationHandleMax,
-                               AggregationStateMax,
-                               AggregationStateHashTable<AggregationStateMax>>(
+  mergeGroupByHashTablesHelperFast<AggregationHandleMax,
+                               AggregationStateFastHashTable>(
       source_hash_table, destination_hash_table);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 5af5a12..ce7c702 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -29,6 +29,7 @@
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -105,6 +106,22 @@ class AggregationHandleMax : public AggregationConcreteHandle {
     compareAndUpdate(static_cast<AggregationStateMax*>(state), value);
   }
 
+  inline void iterateUnaryInlFast(const TypedValue &value, std::uint8_t *byte_ptr) const {
+    DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
+    TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+    compareAndUpdateFast(max_ptr, value);
+  }
+
+  inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+    iterateUnaryInlFast(arguments.front(), byte_ptr);
+  }
+
+  void initPayload(uint8_t *byte_ptr) override {
+    TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+    TypedValue t1 = (type_.getNullableVersion().makeNullValue());
+    *max_ptr = t1;
+  }
+
   AggregationState* accumulateColumnVectors(
       const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
 
@@ -123,6 +140,9 @@ class AggregationHandleMax : public AggregationConcreteHandle {
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
+  void mergeStatesFast(const std::uint8_t *source,
+                   std::uint8_t *destination) const override;
+
   TypedValue finalize(const AggregationState &state) const override {
     return TypedValue(static_cast<const AggregationStateMax&>(state).max_);
   }
@@ -131,9 +151,15 @@ class AggregationHandleMax : public AggregationConcreteHandle {
     return TypedValue(static_cast<const AggregationStateMax&>(state).max_);
   }
 
+  inline TypedValue finalizeHashTableEntryFast(const std::uint8_t *byte_ptr) const {
+    const TypedValue *max_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
+    return TypedValue(*max_ptr);
+  }
+
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const override;
 
   /**
    * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
@@ -155,6 +181,10 @@ class AggregationHandleMax : public AggregationConcreteHandle {
       const AggregationStateHashTableBase &source_hash_table,
       AggregationStateHashTableBase *destination_hash_table) const override;
 
+  size_t getPayloadSize() const override {
+      return sizeof(TypedValue);
+  }
+
  private:
   friend class AggregateFunctionMax;
 
@@ -181,6 +211,13 @@ class AggregationHandleMax : public AggregationConcreteHandle {
     }
   }
 
+  inline void compareAndUpdateFast(TypedValue *max_ptr, const TypedValue &value) const {
+    if (value.isNull()) return;
+    if (max_ptr->isNull() || fast_comparator_->compareTypedValues(value, *max_ptr)) {
+      *max_ptr = value;
+    }
+  }
+
   const Type &type_;
   std::unique_ptr<UncheckedComparator> fast_comparator_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index ca9b163..cbedd9b 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -89,7 +89,7 @@ void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of arguments for MIN: " << argument_ids.size();
 
-  aggregateValueAccessorIntoHashTableUnaryHelper<
+/*  aggregateValueAccessorIntoHashTableUnaryHelper<
       AggregationHandleMin,
       AggregationStateMin,
       AggregationStateHashTable<AggregationStateMin>>(
@@ -97,7 +97,7 @@ void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
           argument_ids.front(),
           group_by_key_ids,
           AggregationStateMin(type_),
-          hash_table);
+          hash_table);*/
 }
 
 void AggregationHandleMin::mergeStates(
@@ -111,14 +111,27 @@ void AggregationHandleMin::mergeStates(
   }
 }
 
+void AggregationHandleMin::mergeStatesFast(
+    const std::uint8_t *source,
+    std::uint8_t *destination) const {
+    const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source);
+    TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination);
+
+    if (!(src_min_ptr->isNull())) {
+      compareAndUpdateFast(dst_min_ptr, *src_min_ptr);
+    }
+}
+
 ColumnVector* AggregationHandleMin::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys) const {
-  return finalizeHashTableHelper<AggregationHandleMin,
-                                 AggregationStateHashTable<AggregationStateMin>>(
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
+  return finalizeHashTableHelperFast<AggregationHandleMin,
+                                 AggregationStateFastHashTable>(
       type_.getNonNullableVersion(),
       hash_table,
-      group_by_keys);
+      group_by_keys,
+      index);
 }
 
 AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
@@ -144,9 +157,8 @@ void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
 void AggregationHandleMin::mergeGroupByHashTables(
     const AggregationStateHashTableBase &source_hash_table,
     AggregationStateHashTableBase *destination_hash_table) const {
-  mergeGroupByHashTablesHelper<AggregationHandleMin,
-                               AggregationStateMin,
-                               AggregationStateHashTable<AggregationStateMin>>(
+  mergeGroupByHashTablesHelperFast<AggregationHandleMin,
+                               AggregationStateFastHashTable>(
       source_hash_table, destination_hash_table);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index f68bb9d..7c7869e 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -29,6 +29,7 @@
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -64,6 +65,11 @@ class AggregationStateMin : public AggregationState {
    */
   ~AggregationStateMin() override {}
 
+  size_t getPayloadSize() const {
+     return sizeof(TypedValue);
+  }
+
+
  private:
   friend class AggregationHandleMin;
 
@@ -104,6 +110,22 @@ class AggregationHandleMin : public AggregationConcreteHandle {
     compareAndUpdate(state, value);
   }
 
+  inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+      DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
+      TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+      compareAndUpdateFast(min_ptr, value);
+  }
+
+  inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+    iterateUnaryInlFast(arguments.front(), byte_ptr);
+  }
+
+  void initPayload(uint8_t *byte_ptr) override {
+    TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+    TypedValue t1 = (type_.getNullableVersion().makeNullValue());
+    *min_ptr = t1;
+  }
+
   AggregationState* accumulateColumnVectors(
       const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
 
@@ -122,6 +144,9 @@ class AggregationHandleMin : public AggregationConcreteHandle {
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
+  void mergeStatesFast(const uint8_t *source,
+                   uint8_t *destination) const override;
+
   TypedValue finalize(const AggregationState &state) const override {
     return static_cast<const AggregationStateMin&>(state).min_;
   }
@@ -130,9 +155,15 @@ class AggregationHandleMin : public AggregationConcreteHandle {
     return static_cast<const AggregationStateMin&>(state).min_;
   }
 
+  inline TypedValue finalizeHashTableEntryFast(const std::uint8_t *byte_ptr) const {
+    const TypedValue *min_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
+    return TypedValue(*min_ptr);
+  }
+
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+      std::vector<std::vector<TypedValue>> *group_by_keys,
+      int index) const override;
 
   /**
    * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
@@ -153,6 +184,10 @@ class AggregationHandleMin : public AggregationConcreteHandle {
       const AggregationStateHashTableBase &source_hash_table,
       AggregationStateHashTableBase *destination_hash_table) const override;
 
+  size_t getPayloadSize() const override {
+      return sizeof(TypedValue);
+  }
+
  private:
   friend class AggregateFunctionMin;
 
@@ -178,6 +213,13 @@ class AggregationHandleMin : public AggregationConcreteHandle {
     }
   }
 
+  inline void compareAndUpdateFast(TypedValue *min_ptr, const TypedValue &value) const {
+    if (value.isNull()) return;
+    if (min_ptr->isNull() || fast_comparator_->compareTypedValues(value, *min_ptr)) {
+      *min_ptr = value;
+    }
+  }
+
   const Type &type_;
   std::unique_ptr<UncheckedComparator> fast_comparator_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 691ff39..ad486eb 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -93,7 +93,6 @@ AggregationState* AggregationHandleSum::accumulateColumnVectors(
     const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
   DCHECK_EQ(1u, column_vectors.size())
       << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size();
-
   std::size_t num_tuples = 0;
   TypedValue cv_sum = fast_operator_->accumulateColumnVector(
       blank_state_.sum_,
@@ -127,7 +126,7 @@ void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of arguments for SUM: " << argument_ids.size();
 
-  aggregateValueAccessorIntoHashTableUnaryHelper<
+/*  aggregateValueAccessorIntoHashTableUnaryHelper<
       AggregationHandleSum,
       AggregationStateSum,
       AggregationStateHashTable<AggregationStateSum>>(
@@ -135,7 +134,7 @@ void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
           argument_ids.front(),
           group_by_key_ids,
           blank_state_,
-          hash_table);
+          hash_table);*/
 }
 
 void AggregationHandleSum::mergeStates(
@@ -150,6 +149,17 @@ void AggregationHandleSum::mergeStates(
   sum_destination->null_ = sum_destination->null_ && sum_source.null_;
 }
 
+void AggregationHandleSum::mergeStatesFast(
+    const uint8_t *source,
+    uint8_t *destination) const {
+    const TypedValue *src_sum_ptr = reinterpret_cast<const TypedValue *>(source+blank_state_.sum_offset);
+    const bool *src_null_ptr = reinterpret_cast<const bool *>(source+blank_state_.null_offset);
+    TypedValue *dst_sum_ptr = reinterpret_cast<TypedValue *>(destination+blank_state_.sum_offset);
+    bool *dst_null_ptr = reinterpret_cast<bool *>(destination+blank_state_.null_offset);
+    *dst_sum_ptr = merge_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr);
+    *dst_null_ptr = (*dst_null_ptr) && (*src_null_ptr);
+}
+
 TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
   const AggregationStateSum &agg_state = static_cast<const AggregationStateSum&>(state);
   if (agg_state.null_) {
@@ -162,12 +172,14 @@ TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
 
 ColumnVector* AggregationHandleSum::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys) const {
-  return finalizeHashTableHelper<AggregationHandleSum,
-                                 AggregationStateHashTable<AggregationStateSum>>(
+    std::vector<std::vector<TypedValue>> *group_by_keys,
+    int index) const {
+  return finalizeHashTableHelperFast<AggregationHandleSum,
+                                 AggregationStateFastHashTable>(
       *result_type_,
       hash_table,
-      group_by_keys);
+      group_by_keys,
+      index);
 }
 
 AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
@@ -193,9 +205,8 @@ void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
 void AggregationHandleSum::mergeGroupByHashTables(
     const AggregationStateHashTableBase &source_hash_table,
     AggregationStateHashTableBase *destination_hash_table) const {
-  mergeGroupByHashTablesHelper<AggregationHandleSum,
-                               AggregationStateSum,
-                               AggregationStateHashTable<AggregationStateSum>>(
+  mergeGroupByHashTablesHelperFast<AggregationHandleSum,
+                               AggregationStateFastHashTable>(
       source_hash_table, destination_hash_table);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index fdc0884..a8d2b5a 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -29,6 +29,7 @@
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -57,27 +58,39 @@ class AggregationStateSum : public AggregationState {
    */
   AggregationStateSum(const AggregationStateSum &orig)
       : sum_(orig.sum_),
-        null_(orig.null_) {
+        null_(orig.null_),
+        sum_offset(orig.sum_offset),
+        null_offset(orig.null_offset) {
   }
 
  private:
   friend class AggregationHandleSum;
 
   AggregationStateSum()
-      : sum_(0), null_(true) {
+      : sum_(0), null_(true), sum_offset(0),
+        null_offset(reinterpret_cast<uint8_t *>(&null_)-reinterpret_cast<uint8_t *>(&sum_)) {
   }
 
   AggregationStateSum(TypedValue &&sum, const bool is_null)
       : sum_(std::move(sum)), null_(is_null) {
   }
 
+  size_t getPayloadSize() const {
+     size_t p1 = reinterpret_cast<size_t>(&sum_);
+     size_t p2 = reinterpret_cast<size_t>(&mutex_);
+     return (p2-p1);
+  }
+
   // TODO(shoban): We might want to specialize sum_ to use atomics for int types
   // similar to in AggregationStateCount.
   TypedValue sum_;
   bool null_;
   SpinMutex mutex_;
+
+  int sum_offset, null_offset;
 };
 
+
 /**
  * @brief An aggregationhandle for sum.
  **/
@@ -105,6 +118,26 @@ class AggregationHandleSum : public AggregationConcreteHandle {
     state->null_ = false;
   }
 
+  inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) {
+    DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
+    if (value.isNull()) return;
+    TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
+    bool *null_ptr = reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset);
+    *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value);
+    *null_ptr = false;
+  }
+
+  inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override {
+     iterateUnaryInlFast(arguments.front(), byte_ptr);
+  }
+
+  void initPayload(uint8_t *byte_ptr) override {
+    TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset);
+    bool *null_ptr = reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset);
+    *sum_ptr = blank_state_.sum_;
+    *null_ptr = true;
+  }
+
   AggregationState* accumulateColumnVectors(
       const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override;
 
@@ -123,15 +156,24 @@ class AggregationHandleSum : public AggregationConcreteHandle {
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
+  void mergeStatesFast(const uint8_t *source,
+                   uint8_t *destination) const override;
+
   TypedValue finalize(const AggregationState &state) const override;
 
   inline TypedValue finalizeHashTableEntry(const AggregationState &state) const {
     return static_cast<const AggregationStateSum&>(state).sum_;
   }
 
+  inline TypedValue finalizeHashTableEntryFast(const uint8_t *byte_ptr) const {
+    uint8_t *value_ptr = const_cast<uint8_t*>(byte_ptr);
+    TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset);
+    return *sum_ptr;
+  }
+
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
+      std::vector<std::vector<TypedValue>> *group_by_keys, int index) const override;
 
   /**
    * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
@@ -152,6 +194,10 @@ class AggregationHandleSum : public AggregationConcreteHandle {
       const AggregationStateHashTableBase &source_hash_table,
       AggregationStateHashTableBase *destination_hash_table) const override;
 
+  size_t getPayloadSize() const override {
+      return blank_state_.getPayloadSize();
+  }
+
  private:
   friend class AggregateFunctionSum;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 5744c52..6f259fa 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -144,9 +144,11 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
                       glog
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
+                      quickstep_threading_SpinMutex
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_utility_Macros)
@@ -161,6 +163,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
@@ -178,6 +181,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
@@ -202,6 +206,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
@@ -218,6 +223,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
@@ -234,6 +240,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 4878cf1..185c6b1 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -83,6 +83,9 @@ AggregationOperationState::AggregationOperationState(
     group_by_types.emplace_back(&group_by_element->getType());
   }
 
+  std::vector<AggregationHandle *> group_by_handles;
+  group_by_handles.clear();
+
   if (aggregate_functions.size() == 0) {
     // If there is no aggregation function, then it is a distinctify operation
     // on the group-by expressions.
@@ -92,11 +95,17 @@ AggregationOperationState::AggregationOperationState(
     arguments_.push_back({});
     is_distinct_.emplace_back(false);
 
-    group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+ /*   group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
         new HashTablePool(estimated_num_entries,
                           hash_table_impl_type,
                           group_by_types,
                           handles_.back().get(),
+                          storage_manager)));*/
+    group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+        new HashTablePool(estimated_num_entries,
+                          hash_table_impl_type,
+                          group_by_types,
+                          handles_.back(),
                           storage_manager)));
   } else {
     // Set up each individual aggregate in this operation.
@@ -107,6 +116,7 @@ AggregationOperationState::AggregationOperationState(
     std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
     std::vector<HashTableImplType>::const_iterator distinctify_hash_table_impl_types_it
         = distinctify_hash_table_impl_types.begin();
+    std::vector<std::size_t> payload_sizes;
     for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) {
       // Get the Types of this aggregate's arguments so that we can create an
       // AggregationHandle.
@@ -126,12 +136,15 @@ AggregationOperationState::AggregationOperationState(
 
       if (!group_by_list_.empty()) {
         // Aggregation with GROUP BY: create a HashTable pool for per-group states.
-        group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+ /*       group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
             new HashTablePool(estimated_num_entries,
                               hash_table_impl_type,
                               group_by_types,
                               handles_.back().get(),
-                              storage_manager)));
+                              storage_manager)));*/
+         group_by_handles.emplace_back(handles_.back());
+         payload_sizes.emplace_back(handles_.back()->getPayloadSize());
+
       } else {
         // Aggregation without GROUP BY: create a single global state.
         single_states_.emplace_back(handles_.back()->createInitialState());
@@ -166,17 +179,40 @@ AggregationOperationState::AggregationOperationState(
         // the number of entries in the distinctify hash table. We may estimate
         // for each distinct aggregation an estimated_num_distinct_keys value during
         // query optimization, if it worths.
-        distinctify_hashtables_.emplace_back(
+ /*       distinctify_hashtables_.emplace_back(
             handles_.back()->createDistinctifyHashTable(
                 *distinctify_hash_table_impl_types_it,
                 key_types,
                 estimated_num_entries,
+                storage_manager));*/
+
+std::vector<AggregationHandle *> local;
+local.emplace_back(handles_.back());
+        distinctify_hashtables_.emplace_back(
+AggregationStateFastHashTableFactory::CreateResizable(
+                *distinctify_hash_table_impl_types_it,
+                key_types,
+                estimated_num_entries,
+                {0},
+                local,
                 storage_manager));
+
         ++distinctify_hash_table_impl_types_it;
       } else {
         distinctify_hashtables_.emplace_back(nullptr);
       }
     }
+
+      if (!group_by_handles.empty()) {
+        // Aggregation with GROUP BY: create a HashTable pool for per-group states.
+        group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+            new HashTablePool(estimated_num_entries,
+                              hash_table_impl_type,
+                              group_by_types,
+                              payload_sizes,
+                              group_by_handles,
+                              storage_manager)));
+      }
   }
 }
 
@@ -410,17 +446,24 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
       // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
       // directly into the (threadsafe) shared global HashTable for this
       // aggregate.
-      DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
-      AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
+      DCHECK(group_by_hashtable_pools_[0] != nullptr);
+      AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast();
       DCHECK(agg_hash_table != nullptr);
-      block->aggregateGroupBy(*handles_[agg_idx],
+ /*     block->aggregateGroupBy(*handles_[agg_idx],
                               arguments_[agg_idx],
                               group_by_list_,
                               predicate_.get(),
                               agg_hash_table,
                               &reuse_matches,
+                              &reuse_group_by_vectors);*/
+      block->aggregateGroupByFast(arguments_,
+                              group_by_list_,
+                              predicate_.get(),
+                              agg_hash_table,
+                              &reuse_matches,
                               &reuse_group_by_vectors);
-      group_by_hashtable_pools_[agg_idx]->returnHashTable(agg_hash_table);
+      group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
+      break;
     }
   }
 }
@@ -444,6 +487,12 @@ void AggregationOperationState::finalizeSingleState(InsertDestination *output_de
   output_destination->insertTuple(Tuple(std::move(attribute_values)));
 }
 
+void AggregationOperationState::mergeGroupByHashTables(AggregationStateHashTableBase *src,
+                                                       AggregationStateHashTableBase *dst) {
+    HashTableMergerNewFast merger(dst);
+    (static_cast<FastHashTable<true, false, true, false> *>(src))->forEachCompositeKeyFast(&merger);
+}
+
 void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) {
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
@@ -455,18 +504,21 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
 
   // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
   // e.g. Keep merging entries from smaller hash tables to larger.
+//  auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
+
+  auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
     if (hash_tables->size() > 1) {
       for (int hash_table_index = 0;
            hash_table_index < static_cast<int>(hash_tables->size() - 1);
            ++hash_table_index) {
         // Merge each hash table to the last hash table.
-        handles_[agg_idx]->mergeGroupByHashTables(
-            (*(*hash_tables)[hash_table_index]),
+        mergeGroupByHashTables(
+            (*hash_tables)[hash_table_index].get(),
             hash_tables->back().get());
       }
     }
+    break;
   }
 
   // Collect per-aggregate finalized values.
@@ -475,16 +527,16 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
        agg_idx < handles_.size();
        ++agg_idx) {
     if (is_distinct_[agg_idx]) {
-      DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
-      auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+      DCHECK(group_by_hashtable_pools_[0] != nullptr);
+      auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
       DCHECK(hash_tables != nullptr);
       if (hash_tables->empty()) {
         // We may have a case where hash_tables is empty, e.g. no input blocks.
         // However for aggregateOnDistinctifyHashTableForGroupBy to work
         // correctly, we should create an empty group by hash table.
-        AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
-        group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
-        hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+        AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[0]->getHashTableFast();
+        group_by_hashtable_pools_[0]->returnHashTable(new_hash_table);
+        hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
       }
       DCHECK(hash_tables->back() != nullptr);
       AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
@@ -494,21 +546,22 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
           agg_hash_table);
     }
 
-    auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+    auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
     DCHECK(hash_tables != nullptr);
     if (hash_tables->empty()) {
       // We may have a case where hash_tables is empty, e.g. no input blocks.
       // However for aggregateOnDistinctifyHashTableForGroupBy to work
       // correctly, we should create an empty group by hash table.
-      AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
-      group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
-      hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+      AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[0]->getHashTable();
+      group_by_hashtable_pools_[0]->returnHashTable(new_hash_table);
+      hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
     }
     AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
     DCHECK(agg_hash_table != nullptr);
     ColumnVector* agg_result_col =
         handles_[agg_idx]->finalizeHashTable(*agg_hash_table,
-                                             &group_by_keys);
+                                             &group_by_keys,
+                                              agg_idx);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 0199749..8934cda 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -165,6 +165,8 @@ class AggregationOperationState {
    **/
   void finalizeAggregate(InsertDestination *output_destination);
 
+  int dflag;
+
  private:
   // Merge locally (per storage block) aggregated states with global aggregation
   // states.
@@ -185,7 +187,8 @@ class AggregationOperationState {
 
   // Each individual aggregate in this operation has an AggregationHandle and
   // some number of Scalar arguments.
-  std::vector<std::unique_ptr<AggregationHandle>> handles_;
+//  std::vector<std::unique_ptr<AggregationHandle>> handles_;
+  std::vector<AggregationHandle *> handles_;
   std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
 
   // For each aggregate, whether DISTINCT should be applied to the aggregate's
@@ -215,6 +218,8 @@ class AggregationOperationState {
 
   StorageManager *storage_manager_;
 
+  void mergeGroupByHashTables(AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst);
+
   DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/39485d62/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 582effd..e67b21f 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -196,6 +196,9 @@ if (ENABLE_DISTRIBUTED)
 endif()
 
 add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
+add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp)
+add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp)
+add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp)
 add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
@@ -624,6 +627,55 @@ target_link_libraries(quickstep_storage_EvictionPolicy
                       quickstep_threading_SpinMutex
                       quickstep_threading_SpinSharedMutex
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastHashTable
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationHandleAvg
+                      quickstep_storage_HashTable
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_storage_TupleReference
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_threading_SpinMutex
+                      quickstep_threading_SpinSharedMutex
+                      quickstep_types_Type
+                      quickstep_types_TypedValue
+                      quickstep_utility_BloomFilter
+                      quickstep_utility_HashPair
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastHashTableFactory
+                      glog
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_FastSeparateChainingHashTable
+                      quickstep_storage_HashTable
+                      quickstep_storage_HashTable_proto
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableFactory
+                      quickstep_storage_LinearOpenAddressingHashTable
+                      quickstep_storage_SeparateChainingHashTable
+                      quickstep_storage_SimpleScalarSeparateChainingHashTable
+                      quickstep_storage_TupleReference
+                      quickstep_types_TypeFactory
+                      quickstep_utility_BloomFilter
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_HashTable
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableKeyManager
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_threading_SpinSharedMutex
+                      quickstep_types_Type
+                      quickstep_types_TypedValue
+                      quickstep_utility_Alignment
+                      quickstep_utility_Macros
+                      quickstep_utility_PrimeNumber)
 target_link_libraries(quickstep_storage_FileManager
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
@@ -709,6 +761,8 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
 target_link_libraries(quickstep_storage_HashTablePool
                       glog
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_FastHashTableFactory
                       quickstep_storage_HashTableBase
                       quickstep_threading_SpinMutex
                       quickstep_utility_Macros
@@ -913,6 +967,7 @@ target_link_libraries(quickstep_storage_StorageBlock
                       quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
                       quickstep_storage_CompressedPackedRowStoreTupleStorageSubBlock
                       quickstep_storage_CountedReference
+                      quickstep_storage_FastHashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_IndexSubBlock
                       quickstep_storage_InsertDestinationInterface
@@ -1096,6 +1151,9 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_EvictionPolicy
                       quickstep_storage_FileManager
                       quickstep_storage_FileManagerLocal
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_FastHashTableFactory
+                      quickstep_storage_FastSeparateChainingHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase