You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/31 23:20:07 UTC

[13/13] incubator-quickstep git commit: Initial commit.

Initial commit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b46bc73c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b46bc73c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b46bc73c

Branch: refs/heads/collision-free-agg
Commit: b46bc73c81adb4c36a076e79c5782d38c15a9a7a
Parents: 5ffdaaf
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Jan 30 14:46:39 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Tue Jan 31 17:00:16 2017 -0600

----------------------------------------------------------------------
 .../aggregation/AggregateFunctionCount.cpp      |    6 +-
 .../aggregation/AggregationConcreteHandle.cpp   |   44 -
 .../aggregation/AggregationConcreteHandle.hpp   |  131 +-
 expressions/aggregation/AggregationHandle.hpp   |  202 +-
 .../aggregation/AggregationHandleAvg.cpp        |   84 +-
 .../aggregation/AggregationHandleAvg.hpp        |  116 +-
 .../aggregation/AggregationHandleCount.cpp      |  127 +-
 .../aggregation/AggregationHandleCount.hpp      |  136 +-
 .../aggregation/AggregationHandleDistinct.cpp   |   34 +-
 .../aggregation/AggregationHandleDistinct.hpp   |   56 +-
 .../aggregation/AggregationHandleMax.cpp        |   84 +-
 .../aggregation/AggregationHandleMax.hpp        |  101 +-
 .../aggregation/AggregationHandleMin.cpp        |   84 +-
 .../aggregation/AggregationHandleMin.hpp        |  111 +-
 .../aggregation/AggregationHandleSum.cpp        |   81 +-
 .../aggregation/AggregationHandleSum.hpp        |  118 +-
 expressions/aggregation/AggregationID.hpp       |    4 +-
 expressions/aggregation/CMakeLists.txt          |   34 +-
 query_execution/QueryContext.hpp                |   14 -
 query_optimizer/CMakeLists.txt                  |    3 +
 query_optimizer/ExecutionGenerator.cpp          |  137 +-
 query_optimizer/ExecutionGenerator.hpp          |    8 +-
 query_optimizer/cost_model/CMakeLists.txt       |    3 +
 .../cost_model/StarSchemaSimpleCostModel.cpp    |  126 +-
 .../cost_model/StarSchemaSimpleCostModel.hpp    |   79 +
 query_optimizer/expressions/ExpressionUtil.hpp  |    8 +-
 relational_operators/CMakeLists.txt             |   15 +
 .../DestroyAggregationStateOperator.cpp         |    7 -
 .../FinalizeAggregationOperator.cpp             |   16 +-
 .../FinalizeAggregationOperator.hpp             |   14 +-
 .../InitializeAggregationStateOperator.cpp      |   68 +
 .../InitializeAggregationStateOperator.hpp      |  103 +
 storage/AggregationOperationState.cpp           |  661 ++---
 storage/AggregationOperationState.hpp           |  142 +-
 storage/CMakeLists.txt                          |  114 +-
 .../CollisionFreeAggregationStateHashTable.cpp  |  254 ++
 .../CollisionFreeAggregationStateHashTable.hpp  |  568 +++++
 storage/FastHashTable.hpp                       | 2403 ------------------
 storage/FastHashTableFactory.hpp                |  224 --
 storage/FastSeparateChainingHashTable.hpp       | 1551 -----------
 storage/HashTable.proto                         |    7 +-
 storage/HashTableBase.hpp                       |   44 +-
 storage/HashTableFactory.hpp                    |   44 +-
 storage/HashTablePool.hpp                       |   74 +-
 .../PackedPayloadAggregationStateHashTable.cpp  |  434 ++++
 .../PackedPayloadAggregationStateHashTable.hpp  |  721 ++++++
 storage/PartitionedHashTablePool.hpp            |   50 +-
 storage/StorageBlock.cpp                        |  272 --
 storage/StorageBlock.hpp                        |  165 --
 utility/CMakeLists.txt                          |    5 +
 utility/ConcurrentBitVector.hpp                 |  209 ++
 51 files changed, 3648 insertions(+), 6448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregateFunctionCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregateFunctionCount.cpp b/expressions/aggregation/AggregateFunctionCount.cpp
index 466ff2f..9795b4a 100644
--- a/expressions/aggregation/AggregateFunctionCount.cpp
+++ b/expressions/aggregation/AggregateFunctionCount.cpp
@@ -53,16 +53,16 @@ AggregationHandle* AggregateFunctionCount::createHandle(
 
   if (argument_types.empty()) {
     // COUNT(*)
-    return new AggregationHandleCount<true, false>();
+    return new AggregationHandleCount<true, false>(nullptr);
   } else if (argument_types.front()->isNullable()) {
     // COUNT(some_nullable_argument)
-    return new AggregationHandleCount<false, true>();
+    return new AggregationHandleCount<false, true>(argument_types.front());
   } else {
     // COUNT(non_nullable_argument)
     //
     // TODO(chasseur): Modify query optimizer to optimize-away COUNT with a
     // non-nullable argument and convert it to COUNT(*).
-    return new AggregationHandleCount<false, false>();
+    return new AggregationHandleCount<false, false>(argument_types.front());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index e3fb520..3151a91 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -19,50 +19,6 @@
 
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 
-#include <cstddef>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
-
 namespace quickstep {
 
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type*> &key_types,
-    const std::size_t estimated_num_distinct_keys,
-    StorageManager *storage_manager) const {
-  // Create a hash table with key types as key_types and value type as bool.
-  return AggregationStateHashTableFactory<bool>::CreateResizable(
-      hash_table_impl,
-      key_types,
-      estimated_num_distinct_keys,
-      storage_manager);
-}
-
-void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &key_ids,
-    AggregationStateHashTableBase *distinctify_hash_table) const {
-  // If the key-value pair is already there, we don't need to update the value,
-  // which should always be "true". I.e. the value is just a placeholder.
-
-  AggregationStateFastHashTable *hash_table =
-      static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
-  if (key_ids.size() == 1) {
-    hash_table->upsertValueAccessorFast(
-        key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
-  } else {
-    std::vector<attribute_id> empty_args {kInvalidAttributeID};
-    hash_table->upsertValueAccessorCompositeKeyFast(
-        empty_args, accessor, key_ids, true /* check_for_null_keys */);
-  }
-}
-
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 398a032..93e9bd0 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -26,8 +26,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTable.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/TypedValue.hpp"
@@ -51,7 +50,7 @@ class ValueAccessor;
  *        merging two group by hash tables.
  **/
 template <typename HandleT>
-class HashTableStateUpserterFast {
+class HashTableStateUpserter {
  public:
   /**
    * @brief Constructor.
@@ -61,7 +60,7 @@ class HashTableStateUpserterFast {
    *        table. The corresponding state (for the same key) in the destination
    *        hash table will be upserted.
    **/
-  HashTableStateUpserterFast(const HandleT &handle,
+  HashTableStateUpserter(const HandleT &handle,
                              const std::uint8_t *source_state)
       : handle_(handle), source_state_(source_state) {}
 
@@ -72,14 +71,14 @@ class HashTableStateUpserterFast {
    *        table that is being upserted.
    **/
   void operator()(std::uint8_t *destination_state) {
-    handle_.mergeStatesFast(source_state_, destination_state);
+    handle_.mergeStates(source_state_, destination_state);
   }
 
  private:
   const HandleT &handle_;
   const std::uint8_t *source_state_;
 
-  DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast);
+  DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter);
 };
 
 /**
@@ -102,50 +101,19 @@ class AggregationConcreteHandle : public AggregationHandle {
                << "takes at least one argument.";
   }
 
-  /**
-   * @brief Implementaion for AggregationHandle::createDistinctifyHashTable()
-   *        that creates a new HashTable for the distinctify step for
-   *        DISTINCT aggregation.
-   */
-  AggregationStateHashTableBase* createDistinctifyHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &key_types,
-      const std::size_t estimated_num_distinct_keys,
-      StorageManager *storage_manager) const override;
-
-  /**
-   * @brief Implementaion for
-   * AggregationHandle::insertValueAccessorIntoDistinctifyHashTable()
-   * that inserts the GROUP BY expressions and aggregation arguments together
-   * as keys into the distinctify hash table.
-   */
-  void insertValueAccessorIntoDistinctifyHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &key_ids,
-      AggregationStateHashTableBase *distinctify_hash_table) const override;
-
  protected:
-  AggregationConcreteHandle() {}
-
-  template <typename HandleT, typename StateT>
-  StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
-      const AggregationStateHashTableBase &distinctify_hash_table) const;
+  AggregationConcreteHandle(const AggregationID agg_id)
+      : AggregationHandle(agg_id) {}
 
   template <typename HandleT, typename HashTableT>
-  void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *hash_table,
-      std::size_t index) const;
-
-  template <typename HandleT, typename HashTableT>
-  ColumnVector* finalizeHashTableHelperFast(
+  ColumnVector* finalizeHashTableHelper(
       const Type &result_type,
       const AggregationStateHashTableBase &hash_table,
       std::vector<std::vector<TypedValue>> *group_by_keys,
       int index) const;
 
   template <typename HandleT, typename HashTableT>
-  inline TypedValue finalizeGroupInHashTableFast(
+  inline TypedValue finalizeGroupInHashTable(
       const AggregationStateHashTableBase &hash_table,
       const std::vector<TypedValue> &group_key,
       int index) const {
@@ -153,15 +121,10 @@ class AggregationConcreteHandle : public AggregationHandle {
         static_cast<const HashTableT &>(hash_table).getSingleCompositeKey(group_key, index);
     DCHECK(group_state != nullptr)
         << "Could not find entry for specified group_key in HashTable";
-    return static_cast<const HandleT *>(this)->finalizeHashTableEntryFast(
+    return static_cast<const HandleT *>(this)->finalizeHashTableEntry(
         group_state);
   }
 
-  template <typename HandleT, typename HashTableT>
-  void mergeGroupByHashTablesHelperFast(
-      const AggregationStateHashTableBase &source_hash_table,
-      AggregationStateHashTableBase *destination_hash_table) const;
-
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle);
 };
@@ -195,7 +158,7 @@ class HashTableAggregateFinalizer {
                          const unsigned char *byte_ptr) {
     group_by_keys_->emplace_back(group_by_key);
     output_column_vector_->appendTypedValue(
-        handle_.finalizeHashTableEntryFast(byte_ptr));
+        handle_.finalizeHashTableEntry(byte_ptr));
   }
 
  private:
@@ -209,70 +172,8 @@ class HashTableAggregateFinalizer {
 // ----------------------------------------------------------------------------
 // Implementations of templated methods follow:
 
-template <typename HandleT, typename StateT>
-StateT* AggregationConcreteHandle::
-    aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
-        const AggregationStateHashTableBase &distinctify_hash_table) const {
-  const HandleT &handle = static_cast<const HandleT &>(*this);
-  StateT *state = static_cast<StateT *>(createInitialState());
-
-  // A lambda function which will be called on each key from the distinctify
-  // hash table.
-  const auto aggregate_functor = [&handle, &state](
-      const TypedValue &key, const std::uint8_t &dumb_placeholder) {
-    // For each (unary) key in the distinctify hash table, aggregate the key
-    // into "state".
-    handle.iterateUnaryInl(state, key);
-  };
-
-  const AggregationStateFastHashTable &hash_table =
-      static_cast<const AggregationStateFastHashTable &>(
-          distinctify_hash_table);
-  // Invoke the lambda function "aggregate_functor" on each key from the
-  // distinctify hash table.
-  hash_table.forEach(&aggregate_functor);
-
-  return state;
-}
-
-template <typename HandleT, typename HashTableT>
-void AggregationConcreteHandle::
-    aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
-        const AggregationStateHashTableBase &distinctify_hash_table,
-        AggregationStateHashTableBase *aggregation_hash_table,
-        std::size_t index) const {
-  const HandleT &handle = static_cast<const HandleT &>(*this);
-  HashTableT *target_hash_table =
-      static_cast<HashTableT *>(aggregation_hash_table);
-
-  // A lambda function which will be called on each key-value pair from the
-  // distinctify hash table.
-  const auto aggregate_functor = [&handle, &target_hash_table, &index](
-      std::vector<TypedValue> &key, const bool &dumb_placeholder) {
-    // For each (composite) key vector in the distinctify hash table with size N.
-    // The first N-1 entries are GROUP BY columns and the last entry is the
-    // argument to be aggregated on.
-    const TypedValue argument(std::move(key.back()));
-    key.pop_back();
-
-    // An upserter as lambda function for aggregating the argument into its
-    // GROUP BY group's entry inside aggregation_hash_table.
-    const auto upserter = [&handle, &argument](std::uint8_t *state) {
-      handle.iterateUnaryInlFast(argument, state);
-    };
-
-    target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index);
-  };
-
-  const HashTableT &source_hash_table =
-      static_cast<const HashTableT &>(distinctify_hash_table);
-  // Invoke the lambda function "aggregate_functor" on each composite key vector
-  // from the distinctify hash table.
-  source_hash_table.forEachCompositeKeyFast(&aggregate_functor);
-}
-
 template <typename HandleT, typename HashTableT>
-ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
+ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
     const Type &result_type,
     const AggregationStateHashTableBase &hash_table,
     std::vector<std::vector<TypedValue>> *group_by_keys,
@@ -287,14 +188,14 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
           new NativeColumnVector(result_type, hash_table_concrete.numEntries());
       HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer(
           handle, group_by_keys, result);
-      hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
+      hash_table_concrete.forEach(&finalizer, index);
       return result;
     } else {
       IndirectColumnVector *result = new IndirectColumnVector(
           result_type, hash_table_concrete.numEntries());
       HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer(
           handle, group_by_keys, result);
-      hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
+      hash_table_concrete.forEach(&finalizer, index);
       return result;
     }
   } else {
@@ -303,7 +204,7 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
           new NativeColumnVector(result_type, group_by_keys->size());
       for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
         result->appendTypedValue(
-            finalizeGroupInHashTableFast<HandleT, HashTableT>(
+            finalizeGroupInHashTable<HandleT, HashTableT>(
                 hash_table, group_by_key, index));
       }
       return result;
@@ -312,7 +213,7 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
           result_type, hash_table_concrete.numEntries());
       for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
         result->appendTypedValue(
-            finalizeGroupInHashTableFast<HandleT, HashTableT>(
+            finalizeGroupInHashTable<HandleT, HashTableT>(
                 hash_table, group_by_key, index));
       }
       return result;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 4b51179..8e2aea6 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
@@ -32,6 +33,7 @@
 namespace quickstep {
 
 class ColumnVector;
+class ColumnVectorsValueAccessor;
 class StorageManager;
 class Type;
 class ValueAccessor;
@@ -108,6 +110,14 @@ class AggregationHandle {
    **/
   virtual ~AggregationHandle() {}
 
+  AggregationID getAggregationID() const {
+    return agg_id_;
+  }
+
+  virtual std::vector<const Type *> getArgumentTypes() const = 0;
+
+  virtual const Type* getResultType() const = 0;
+
   /**
    * @brief Create an initial "blank" state for this aggregation.
    *
@@ -116,29 +126,6 @@ class AggregationHandle {
   virtual AggregationState* createInitialState() const = 0;
 
   /**
-   * @brief Create a new HashTable for aggregation with GROUP BY.
-   *
-   * @param hash_table_impl The choice of which concrete HashTable
-   *        implementation to use.
-   * @param group_by_types The types of the GROUP BY columns/expressions. These
-   *        correspond to the (composite) key type for the HashTable.
-   * @param estimated_num_groups The estimated number of distinct groups for
-   *        the GROUP BY aggregation. This is used to size the initial
-   *        HashTable. This is an estimate only, and the HashTable will be
-   *        resized if it becomes over-full.
-   * @param storage_manager The StorageManager to use to create the HashTable.
-   *        A StorageBlob will be allocated to serve as the HashTable's
-   *        in-memory storage.
-   * @return A new HashTable instance with the appropriate state type for this
-   *         aggregate.
-   **/
-  virtual AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const = 0;
-
-  /**
    * @brief Accumulate over tuples for a nullary aggregate function (one that
    *        has zero arguments, i.e. COUNT(*)).
    *
@@ -153,63 +140,16 @@ class AggregationHandle {
       const std::size_t num_tuples) const = 0;
 
   /**
-   * @brief Accumulate (iterate over) all values in one or more ColumnVectors
-   *        and return a new AggregationState which can be merged with other
-   *        states or finalized.
+   * @brief TODO
    *
-   * @param column_vectors One or more ColumnVectors that the aggregate will be
-   *        applied to. These correspond to the aggregate function's arguments,
-   *        in order.
    * @return A new AggregationState which contains the accumulated results from
    *         applying the aggregate to column_vectors. Caller is responsible
    *         for deleting the returned AggregationState.
    **/
-  virtual AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const = 0;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  /**
-   * @brief Accumulate (iterate over) all values in columns accessible through
-   *        a ValueAccessor and return a new AggregationState which can be
-   *        merged with other states or finalized.
-   *
-   * @param accessor A ValueAccessor that the columns to be aggregated can be
-   *        accessed through.
-   * @param accessor_ids The attribute_ids that correspond to the columns in
-   *        accessor to aggeregate. These correspond to the aggregate
-   *        function's arguments, in order.
-   * @return A new AggregationState which contains the accumulated results from
-   *         applying the aggregate to the specified columns in accessor.
-   *         Caller is responsible for deleting the returned AggregationState.
-   **/
-  virtual AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const = 0;
-#endif
-
-  /**
-   * @brief Perform an aggregation with GROUP BY over all the tuples accessible
-   *        through a ValueAccessor, upserting states in a HashTable.
-   *
-   * @note Implementations of this method are threadsafe with respect to
-   *       hash_table, and can be called concurrently from multiple threads
-   *       with the same HashTable object.
-   *
-   * @param accessor The ValueAccessor that will be iterated over to read
-   *        tuples.
-   * @param argument_ids The attribute_ids of the arguments to this aggregate
-   *        in accessor, in order.
-   * @param group_by_key_ids The attribute_ids of the group-by
-   *        columns/expressions in accessor.
-   * @param hash_table The HashTable to upsert AggregationStates in. This
-   *        should have been created by calling createGroupByHashTable() on
-   *        this same AggregationHandle.
-   **/
-  virtual void aggregateValueAccessorIntoHashTable(
+  virtual AggregationState* accumulate(
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const = 0;
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const = 0;
 
   /**
    * @brief Merge two AggregationStates, updating one in-place. This computes a
@@ -269,99 +209,12 @@ class AggregationHandle {
       int index) const = 0;
 
   /**
-   * @brief Create a new HashTable for the distinctify step for DISTINCT
-   * aggregation.
-   *
-   * Distinctify is the first step for DISTINCT aggregation. This step inserts
-   * the GROUP BY expression values and aggregation arguments together as keys
-   * into the distinctify hash table, so that arguments are distinctified within
-   * each GROUP BY group. Later, a second-round aggregation on the distinctify
-   * hash table will be performed to actually compute the aggregated result for
-   * each GROUP BY group.
-   *
-   * In the case of single aggregation where there is no GROUP BY expressions,
-   * we simply treat it as a special GROUP BY case that the GROUP BY expression
-   * vector is empty.
-   *
-   * @param hash_table_impl The choice of which concrete HashTable
-   *        implementation to use.
-   * @param key_types The types of the GROUP BY expressions together with the
-   *        types of the aggregation arguments.
-   * @param estimated_num_distinct_keys The estimated number of distinct keys
-   *        (i.e. GROUP BY expressions together with aggregation arguments) for
-   *        the distinctify step. This is used to size the initial HashTable.
-   *        This is an estimate only, and the HashTable will be resized if it
-   *        becomes over-full.
-   * @param storage_manager The StorageManager to use to create the HashTable.
-   *        A StorageBlob will be allocated to serve as the HashTable's
-   *        in-memory storage.
-   *
-   * @return A new HashTable instance with the appropriate state type for this
-   *         aggregate.
-   */
-  virtual AggregationStateHashTableBase* createDistinctifyHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &key_types,
-      const std::size_t estimated_num_distinct_keys,
-      StorageManager *storage_manager) const = 0;
-
-  /**
-   * @brief Inserts the GROUP BY expressions and aggregation arguments together
-   * as keys into the distinctify hash table.
-   *
-   * @param accessor The ValueAccessor that will be iterated over to read
-   *        tuples.
-   * @param key_ids The attribute_ids of the GROUP BY expressions in accessor
-   *        together with the attribute_ids of the arguments to this aggregate
-   *        in accessor, in order.
-   * @param distinctify_hash_table The HashTable to store the GROUP BY
-   *        expressions and the aggregation arguments together as hash table
-   *        keys and a bool constant \c true as hash table value (So the hash
-   *        table actually serves as a hash set). This should have been created
-   *        by calling createDistinctifyHashTable();
-   */
-  virtual void insertValueAccessorIntoDistinctifyHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &key_ids,
-      AggregationStateHashTableBase *distinctify_hash_table) const = 0;
-
-  /**
-   * @brief Perform single (i.e. without GROUP BY) aggregation on the keys from
-   * the distinctify hash table to actually compute the aggregated results.
-   *
-   * @param distinctify_hash_table Hash table which stores the distinctified
-   *        aggregation arguments as hash table keys. This should have been
-   *        created by calling createDistinctifyHashTable();
-   * @return A new AggregationState which contains the aggregated results from
-   *         applying the aggregate to the distinctify hash table.
-   *         Caller is responsible for deleting the returned AggregationState.
-   */
-  virtual AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table) const = 0;
-
-  /**
-   * @brief Perform GROUP BY aggregation on the keys from the distinctify hash
-   * table and upserts states into the aggregation hash table.
-   *
-   * @param distinctify_hash_table Hash table which stores the GROUP BY
-   *        expression values and aggregation arguments together as hash table
-   *        keys.
-   * @param aggregation_hash_table The HashTable to upsert AggregationStates in.
-   *        This should have been created by calling createGroupByHashTable() on
-   *        this same AggregationHandle.
-   * @param index The index of the distinctify hash table for which we perform
-   *        the DISTINCT aggregation.
-   */
-  virtual void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const = 0;
-
-  /**
    * @brief Get the number of bytes needed to store the aggregation handle's
    *        state.
    **/
-  virtual std::size_t getPayloadSize() const { return 1; }
+  virtual std::size_t getPayloadSize() const {
+    return 1u;
+  }
 
   /**
    * @brief Update the aggregation state for nullary aggregation function e.g.
@@ -394,8 +247,8 @@ class AggregationHandle {
    * @param src A pointer to the source aggregation state.
    * @param dst A pointer to the destination aggregation state.
    **/
-  virtual void mergeStatesFast(const std::uint8_t *src,
-                               std::uint8_t *dst) const {}
+  virtual void mergeStates(const std::uint8_t *src,
+                           std::uint8_t *dst) const {}
 
   /**
    * @brief Initialize the payload (in the aggregation hash table) for the given
@@ -413,20 +266,11 @@ class AggregationHandle {
    **/
   virtual void destroyPayload(std::uint8_t *byte_ptr) const {}
 
-  /**
-   * @brief Inform the aggregation handle to block (prohibit) updates on the
-   *        aggregation state.
-   **/
-  virtual void blockUpdate() {}
-
-  /**
-   * @brief Inform the aggregation handle to allow updates on the
-   *        aggregation state.
-   **/
-  virtual void allowUpdate() {}
-
  protected:
-  AggregationHandle() {}
+  AggregationHandle(const AggregationID agg_id)
+      : agg_id_(agg_id) {}
+
+  const AggregationID agg_id_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationHandle);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 2481092..d81c179 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -24,8 +24,8 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypeFactory.hpp"
@@ -42,7 +42,8 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleAvg::AggregationHandleAvg(const Type &type)
-    : argument_type_(type), block_update_(false) {
+    : AggregationConcreteHandle(AggregationID::kAvg),
+      argument_type_(type) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_precision_id;
@@ -87,52 +88,28 @@ AggregationHandleAvg::AggregationHandleAvg(const Type &type)
             ->getNullableVersion());
 }
 
-AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable(
-      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
-
-AggregationState* AggregationHandleAvg::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size();
+AggregationState* AggregationHandleAvg::accumulate(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor,
+    const std::vector<attribute_id> &argument_ids) const {
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for AVG: " << argument_ids.size();
 
-  AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
-  std::size_t count = 0;
-  state->sum_ = fast_add_operator_->accumulateColumnVector(
-      state->sum_, *column_vectors.front(), &count);
-  state->count_ = count;
-  return state;
-}
+  const attribute_id argument_id = argument_ids.front();
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleAvg::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for AVG: " << accessor_ids.size();
+  ValueAccessor *target_accessor =
+      argument_id >= 0 ? accessor : aux_accessor;
+  const attribute_id target_argument_id =
+      argument_id >= 0 ? argument_id : -(argument_id+2);
 
   AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
   std::size_t count = 0;
   state->sum_ = fast_add_operator_->accumulateValueAccessor(
-      state->sum_, accessor, accessor_ids.front(), &count);
+      state->sum_, target_accessor, target_argument_id, &count);
   state->count_ = count;
   return state;
 }
-#endif
-
-void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(1u, argument_ids.size())
-      << "Got wrong number of arguments for AVG: " << argument_ids.size();
-}
 
 void AggregationHandleAvg::mergeStates(const AggregationState &source,
                                        AggregationState *destination) const {
@@ -147,8 +124,8 @@ void AggregationHandleAvg::mergeStates(const AggregationState &source,
       avg_destination->sum_, avg_source.sum_);
 }
 
-void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source,
-                                           std::uint8_t *destination) const {
+void AggregationHandleAvg::mergeStates(const std::uint8_t *source,
+                                       std::uint8_t *destination) const {
   const TypedValue *src_sum_ptr =
       reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
   const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(
@@ -179,27 +156,10 @@ ColumnVector* AggregationHandleAvg::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
     std::vector<std::vector<TypedValue>> *group_by_keys,
     int index) const {
-  return finalizeHashTableHelperFast<AggregationHandleAvg,
-                                     AggregationStateFastHashTable>(
-      *result_type_, hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
-    const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleAvg,
-      AggregationStateAvg>(distinctify_hash_table);
-}
-
-void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
-    const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+  return finalizeHashTableHelper<
       AggregationHandleAvg,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
+      PackedPayloadSeparateChainingAggregationStateHashTable>(
+          *result_type_, hash_table, group_by_keys, index);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 47132c6..aa5f427 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -28,7 +28,6 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
@@ -106,16 +105,18 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
  public:
   ~AggregationHandleAvg() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {&argument_type_};
+  }
+
+  const Type* getResultType() const override {
+    return result_type_;
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateAvg(blank_state_);
   }
 
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
   /**
    * @brief Iterate method with average aggregation state.
    **/
@@ -129,28 +130,19 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
     ++state->count_;
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
-    DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
-    if (value.isNull()) return;
-    TypedValue *sum_ptr =
-        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
-    std::int64_t *count_ptr =
-        reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset_);
-    *sum_ptr = fast_add_operator_->applyToTypedValues(*sum_ptr, value);
-    ++(*count_ptr);
-  }
+  AggregationState* accumulate(
+      ValueAccessor *accessor,
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const override;
 
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
+  void mergeStates(const AggregationState &source,
+                   AggregationState *destination) const override;
 
-  void blockUpdate() override { block_update_ = true; }
+  TypedValue finalize(const AggregationState &state) const override;
 
-  void allowUpdate() override { block_update_ = false; }
+  std::size_t getPayloadSize() const override {
+    return blank_state_.getPayloadSize();
+  }
 
   void initPayload(std::uint8_t *byte_ptr) const override {
     TypedValue *sum_ptr =
@@ -169,43 +161,22 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
     }
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_id) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    DCHECK(argument.isPlausibleInstanceOf(argument_type_.getSignature()));
+    if (argument.isNull()) return;
+    TypedValue *sum_ptr =
+        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+    std::int64_t *count_ptr =
+        reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset_);
+    *sum_ptr = fast_add_operator_->applyToTypedValues(*sum_ptr, argument);
+    ++(*count_ptr);
+  }
 
-  TypedValue finalize(const AggregationState &state) const override;
+  void mergeStates(const std::uint8_t *source,
+                   std::uint8_t *destination) const override;
 
   inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    const AggregationStateAvg &agg_state =
-        static_cast<const AggregationStateAvg &>(state);
-    // TODO(chasseur): Could improve performance further if we made a special
-    // version of finalizeHashTable() that collects all the sums into one
-    // ColumnVector and all the counts into another and then applies
-    // '*divide_operator_' to them in bulk.
-    return divide_operator_->applyToTypedValues(
-        agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
-  }
-
-  inline TypedValue finalizeHashTableEntryFast(
       const std::uint8_t *byte_ptr) const {
     std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
     TypedValue *sum_ptr =
@@ -221,29 +192,6 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
       std::vector<std::vector<TypedValue>> *group_by_keys,
       int index) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for AVG aggregation.
-   */
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for AVG aggregation.
-   */
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override {
-    return blank_state_.getPayloadSize();
-  }
-
  private:
   friend class AggregateFunctionAvg;
 
@@ -261,8 +209,6 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
   std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
   std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
 
-  bool block_update_;
-
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 034c942..a5c9fd8 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -25,14 +25,9 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
-#endif
-
 #include "types/TypeFactory.hpp"
 #include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
@@ -48,73 +43,32 @@ class Type;
 class ValueAccessor;
 
 template <bool count_star, bool nullable_type>
-AggregationStateHashTableBase*
-AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<
-      AggregationStateCount>::CreateResizable(hash_table_impl,
-                                              group_by_types,
-                                              estimated_num_groups,
-                                              storage_manager);
-}
-
-template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+AggregationState* AggregationHandleCount<count_star, nullable_type>::accumulate(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor,
+    const std::vector<attribute_id> &argument_ids) const {
   DCHECK(!count_star)
       << "Called non-nullary accumulation method on an AggregationHandleCount "
       << "set up for nullary COUNT(*)";
 
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for COUNT: "
-      << column_vectors.size();
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for COUNT: " << argument_ids.size();
 
-  std::size_t count = 0;
-  InvokeOnColumnVector(
-      *column_vectors.front(),
-      [&](const auto &column_vector) -> void {  // NOLINT(build/c++11)
-        if (nullable_type) {
-          // TODO(shoban): Iterating over the ColumnVector is a rather slow way
-          // to do this. We should look at extending the ColumnVector interface
-          // to do a quick count of the non-null values (i.e. the length minus
-          // the population count of the null bitmap). We should do something
-          // similar for ValueAccessor too.
-          for (std::size_t pos = 0; pos < column_vector.size(); ++pos) {
-            count += !column_vector.getTypedValue(pos).isNull();
-          }
-        } else {
-          count = column_vector.size();
-        }
-      });
+  const attribute_id argument_id = argument_ids.front();
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
-  return new AggregationStateCount(count);
-}
+  ValueAccessor *target_accessor =
+      argument_id >= 0 ? accessor : aux_accessor;
+  const attribute_id target_argument_id =
+      argument_id >= 0 ? argument_id : -(argument_id+2);
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK(!count_star)
-      << "Called non-nullary accumulation method on an AggregationHandleCount "
-      << "set up for nullary COUNT(*)";
-
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for COUNT: " << accessor_ids.size();
-
-  const attribute_id accessor_id = accessor_ids.front();
   std::size_t count = 0;
   InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-      accessor,
-      [&accessor_id, &count](auto *accessor) -> void {  // NOLINT(build/c++11)
+      target_accessor,
+      [&target_argument_id, &count](auto *accessor) -> void {  // NOLINT(build/c++11)
         if (nullable_type) {
           while (accessor->next()) {
-            count += !accessor->getTypedValue(accessor_id).isNull();
+            count += !accessor->getTypedValue(target_argument_id).isNull();
           }
         } else {
           count = accessor->getNumTuples();
@@ -123,24 +77,6 @@ AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
 
   return new AggregationStateCount(count);
 }
-#endif
-
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::
-    aggregateValueAccessorIntoHashTable(
-        ValueAccessor *accessor,
-        const std::vector<attribute_id> &argument_ids,
-        const std::vector<attribute_id> &group_by_key_ids,
-        AggregationStateHashTableBase *hash_table) const {
-  if (count_star) {
-    DCHECK_EQ(0u, argument_ids.size())
-        << "Got wrong number of arguments for COUNT(*): "
-        << argument_ids.size();
-  } else {
-    DCHECK_EQ(1u, argument_ids.size())
-        << "Got wrong number of arguments for COUNT: " << argument_ids.size();
-  }
-}
 
 template <bool count_star, bool nullable_type>
 void AggregationHandleCount<count_star, nullable_type>::mergeStates(
@@ -156,7 +92,7 @@ void AggregationHandleCount<count_star, nullable_type>::mergeStates(
 }
 
 template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast(
+void AggregationHandleCount<count_star, nullable_type>::mergeStates(
     const std::uint8_t *source, std::uint8_t *destination) const {
   const std::int64_t *src_count_ptr =
       reinterpret_cast<const std::int64_t *>(source);
@@ -170,33 +106,10 @@ AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
     std::vector<std::vector<TypedValue>> *group_by_keys,
     int index) const {
-  return finalizeHashTableHelperFast<
-      AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateFastHashTable>(
-      TypeFactory::GetType(kLong), hash_table, group_by_keys, index);
-}
-
-template <bool count_star, bool nullable_type>
-AggregationState* AggregationHandleCount<count_star, nullable_type>::
-    aggregateOnDistinctifyHashTableForSingle(
-        const AggregationStateHashTableBase &distinctify_hash_table) const {
-  DCHECK_EQ(count_star, false);
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateCount>(distinctify_hash_table);
-}
-
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::
-    aggregateOnDistinctifyHashTableForGroupBy(
-        const AggregationStateHashTableBase &distinctify_hash_table,
-        AggregationStateHashTableBase *aggregation_hash_table,
-        std::size_t index) const {
-  DCHECK_EQ(count_star, false);
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+  return finalizeHashTableHelper<
       AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
+      PackedPayloadSeparateChainingAggregationStateHashTable>(
+          TypeFactory::GetType(kLong), hash_table, group_by_keys, index);
 }
 
 // Explicitly instantiate and compile in the different versions of

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 6aab0cd..bf9450f 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -29,8 +29,9 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
+#include "types/LongType.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
 
@@ -98,25 +99,26 @@ class AggregationHandleCount : public AggregationConcreteHandle {
  public:
   ~AggregationHandleCount() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    if (argument_type_ == nullptr) {
+      return {};
+    } else {
+      return {argument_type_};
+    }
+  }
+
+  const Type* getResultType() const override {
+    return &LongType::InstanceNonNullable();
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateCount();
   }
 
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
   inline void iterateNullaryInl(AggregationStateCount *state) const {
     state->count_.fetch_add(1, std::memory_order_relaxed);
   }
 
-  inline void iterateNullaryInlFast(std::uint8_t *byte_ptr) const {
-    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
-    (*count_ptr)++;
-  }
-
   /**
    * @brief Iterate with count aggregation state.
    */
@@ -127,81 +129,50 @@ class AggregationHandleCount : public AggregationConcreteHandle {
     }
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
-    if ((!nullable_type) || (!value.isNull())) {
-      std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
-      (*count_ptr)++;
-    }
-  }
-
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
-
-  inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateNullaryInlFast(byte_ptr);
-    }
-  }
-
-  void blockUpdate() override { block_update_ = true; }
-
-  void allowUpdate() override { block_update_ = false; }
-
-  void initPayload(std::uint8_t *byte_ptr) const override {
-    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
-    *count_ptr = 0;
-  }
-
   AggregationState* accumulateNullary(
       const std::size_t num_tuples) const override {
     return new AggregationStateCount(num_tuples);
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_id) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
+  AggregationState* accumulate(
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const override;
 
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
-
   TypedValue finalize(const AggregationState &state) const override {
     return TypedValue(
         static_cast<const AggregationStateCount &>(state).count_.load(
             std::memory_order_relaxed));
   }
 
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    return TypedValue(
-        static_cast<const AggregationStateCount &>(state).count_.load(
-            std::memory_order_relaxed));
+  std::size_t getPayloadSize() const override {
+    return sizeof(std::int64_t);
+  }
+
+  void initPayload(std::uint8_t *byte_ptr) const override {
+    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+    *count_ptr = 0;
+  }
+
+  inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
+    ++(*reinterpret_cast<std::int64_t *>(byte_ptr));
   }
 
-  inline TypedValue finalizeHashTableEntryFast(
-      const std::uint8_t *byte_ptr) const {
-    const std::int64_t *count_ptr =
-        reinterpret_cast<const std::int64_t *>(byte_ptr);
-    return TypedValue(*count_ptr);
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    if ((!nullable_type) || (!argument.isNull())) {
+      ++(*reinterpret_cast<std::int64_t *>(byte_ptr));
+    }
+  }
+
+  void mergeStates(const std::uint8_t *source,
+                   std::uint8_t *destination) const override;
+
+  inline TypedValue finalizeHashTableEntry(const std::uint8_t *byte_ptr) const {
+    return TypedValue(*reinterpret_cast<const std::int64_t *>(byte_ptr));
   }
 
   ColumnVector* finalizeHashTable(
@@ -209,36 +180,17 @@ class AggregationHandleCount : public AggregationConcreteHandle {
       std::vector<std::vector<TypedValue>> *group_by_keys,
       int index) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for SUM aggregation.
-   */
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for SUM aggregation.
-   */
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override { return sizeof(std::int64_t); }
-
  private:
   friend class AggregateFunctionCount;
 
   /**
    * @brief Constructor.
    **/
-  AggregationHandleCount() : block_update_(false) {}
+  AggregationHandleCount(const Type *argument_type)
+      : AggregationConcreteHandle(AggregationID::kCount),
+        argument_type_(argument_type) {}
 
-  bool block_update_;
+  const Type *argument_type_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
index 0dc8b56..c6c47c7 100644
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ b/expressions/aggregation/AggregationHandleDistinct.cpp
@@ -22,10 +22,9 @@
 #include <cstddef>
 #include <memory>
 #include <vector>
-#include <utility>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
 
 #include "types/TypedValue.hpp"
 
@@ -34,34 +33,6 @@
 namespace quickstep {
 
 class ColumnVector;
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-AggregationStateHashTableBase* AggregationHandleDistinct::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type*> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return createDistinctifyHashTable(
-      hash_table_impl,
-      group_by_types,
-      estimated_num_groups,
-      storage_manager);
-}
-
-void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(argument_ids.size(), 0u);
-
-  insertValueAccessorIntoDistinctifyHashTable(
-      accessor,
-      group_by_key_ids,
-      hash_table);
-}
 
 ColumnVector* AggregationHandleDistinct::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
@@ -73,7 +44,8 @@ ColumnVector* AggregationHandleDistinct::finalizeHashTable(
                                                const bool &dumb_placeholder) -> void {
     group_by_keys->emplace_back(std::move(group_by_key));
   };
-  static_cast<const AggregationStateFastHashTable&>(hash_table).forEachCompositeKeyFast(&keys_retriever);
+  static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
+      hash_table).forEach(&keys_retriever);
 
   return nullptr;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
index 838bfdd..0d8905b 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -26,6 +26,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
@@ -49,7 +50,17 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
   /**
    * @brief Constructor.
    **/
-  AggregationHandleDistinct() {}
+  AggregationHandleDistinct()
+      : AggregationConcreteHandle(AggregationID::kDistinct) {}
+
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {};
+  }
+
+  const Type* getResultType() const override {
+    LOG(FATAL)
+        << "AggregationHandleDistinct does not support getResultType().";
+  }
 
   AggregationState* createInitialState() const override {
     LOG(FATAL)
@@ -62,21 +73,13 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
         << "AggregationHandleDistinct does not support accumulateNullary().";
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-                  "accumulateColumnVectors().";
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
+  AggregationState* accumulate(
       ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override {
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const override {
     LOG(FATAL) << "AggregationHandleDistinct does not support "
-                  "accumulateValueAccessor().";
+                  "accumulate().";
   }
-#endif
 
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override {
@@ -87,33 +90,6 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
     LOG(FATAL) << "AggregationHandleDistinct does not support finalize().";
   }
 
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-               << "aggregateOnDistinctifyHashTableForSingle().";
-  }
-
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *groupby_hash_table,
-      std::size_t index) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-               << "aggregateOnDistinctifyHashTableForGroupBy().";
-  }
-
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
       std::vector<std::vector<TypedValue>> *group_by_keys,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index c2d571b..327b2b2 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -23,8 +23,8 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -39,51 +39,32 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleMax::AggregationHandleMax(const Type &type)
-    : type_(type), block_update_(false) {
+    : AggregationConcreteHandle(AggregationID::kMax),
+      type_(type) {
   fast_comparator_.reset(
       ComparisonFactory::GetComparison(ComparisonID::kGreater)
           .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
 }
 
-AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable(
-      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
-
-AggregationState* AggregationHandleMax::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size();
+AggregationState* AggregationHandleMax::accumulate(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor,
+    const std::vector<attribute_id> &argument_ids) const {
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for MAX: " << argument_ids.size();
 
-  return new AggregationStateMax(fast_comparator_->accumulateColumnVector(
-      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
+  const attribute_id argument_id = argument_ids.front();
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMax::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for MAX: " << accessor_ids.size();
+  ValueAccessor *target_accessor =
+      argument_id >= 0 ? accessor : aux_accessor;
+  const attribute_id target_argument_id =
+      argument_id >= 0 ? argument_id : -(argument_id+2);
 
   return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
       type_.getNullableVersion().makeNullValue(),
-      accessor,
-      accessor_ids.front()));
-}
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
-void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(1u, argument_ids.size())
-      << "Got wrong number of arguments for MAX: " << argument_ids.size();
+      target_accessor,
+      target_argument_id));
 }
 
 void AggregationHandleMax::mergeStates(const AggregationState &source,
@@ -98,12 +79,12 @@ void AggregationHandleMax::mergeStates(const AggregationState &source,
   }
 }
 
-void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source,
-                                           std::uint8_t *destination) const {
+void AggregationHandleMax::mergeStates(const std::uint8_t *source,
+                                       std::uint8_t *destination) const {
   const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source);
   TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination);
   if (!(src_max_ptr->isNull())) {
-    compareAndUpdateFast(dst_max_ptr, *src_max_ptr);
+    compareAndUpdate(dst_max_ptr, *src_max_ptr);
   }
 }
 
@@ -111,27 +92,10 @@ ColumnVector* AggregationHandleMax::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
     std::vector<std::vector<TypedValue>> *group_by_keys,
     int index) const {
-  return finalizeHashTableHelperFast<AggregationHandleMax,
-                                     AggregationStateFastHashTable>(
-      type_.getNullableVersion(), hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
-    const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleMax,
-      AggregationStateMax>(distinctify_hash_table);
-}
-
-void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
-    const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+  return finalizeHashTableHelper<
       AggregationHandleMax,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
+      PackedPayloadSeparateChainingAggregationStateHashTable>(
+          type_.getNullableVersion(), hash_table, group_by_keys, index);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index d851a0c..635c7d8 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -28,7 +28,6 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
@@ -86,16 +85,18 @@ class AggregationHandleMax : public AggregationConcreteHandle {
  public:
   ~AggregationHandleMax() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {&type_};
+  }
+
+  const Type* getResultType() const override {
+    return &type_;
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateMax(type_);
   }
 
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
   /**
    * @brief Iterate with max aggregation state.
    */
@@ -105,23 +106,17 @@ class AggregationHandleMax : public AggregationConcreteHandle {
     compareAndUpdate(static_cast<AggregationStateMax *>(state), value);
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
-    DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
-    TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
-    compareAndUpdateFast(max_ptr, value);
-  }
-
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
+  AggregationState* accumulate(
+      ValueAccessor *accessor,
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const override;
 
-  void blockUpdate() override { block_update_ = true; }
+  void mergeStates(const AggregationState &source,
+                   AggregationState *destination) const override;
 
-  void allowUpdate() override { block_update_ = false; }
+  std::size_t getPayloadSize() const override {
+    return sizeof(TypedValue);
+  }
 
   void initPayload(std::uint8_t *byte_ptr) const override {
     TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
@@ -136,38 +131,21 @@ class AggregationHandleMax : public AggregationConcreteHandle {
     }
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    DCHECK(argument.isPlausibleInstanceOf(type_.getSignature()));
+    TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+    compareAndUpdate(max_ptr, argument);
+  }
 
   TypedValue finalize(const AggregationState &state) const override {
     return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
   }
 
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    return TypedValue(static_cast<const AggregationStateMax &>(state).max_);
-  }
+  void mergeStates(const std::uint8_t *source,
+                   std::uint8_t *destination) const override;
 
-  inline TypedValue finalizeHashTableEntryFast(
+  inline TypedValue finalizeHashTableEntry(
       const std::uint8_t *byte_ptr) const {
     const TypedValue *max_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
     return TypedValue(*max_ptr);
@@ -178,27 +156,6 @@ class AggregationHandleMax : public AggregationConcreteHandle {
       std::vector<std::vector<TypedValue>> *group_by_keys,
       int index) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for MAX aggregation.
-   */
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for MAX aggregation.
-   */
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
-
  private:
   friend class AggregateFunctionMax;
 
@@ -227,8 +184,8 @@ class AggregationHandleMax : public AggregationConcreteHandle {
     }
   }
 
-  inline void compareAndUpdateFast(TypedValue *max_ptr,
-                                   const TypedValue &value) const {
+  inline void compareAndUpdate(TypedValue *max_ptr,
+                               const TypedValue &value) const {
     if (value.isNull()) return;
     if (max_ptr->isNull() ||
         fast_comparator_->compareTypedValues(value, *max_ptr)) {
@@ -239,8 +196,6 @@ class AggregationHandleMax : public AggregationConcreteHandle {
   const Type &type_;
   std::unique_ptr<UncheckedComparator> fast_comparator_;
 
-  bool block_update_;
-
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index a07f299..fe4a61b 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -23,8 +23,8 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -39,51 +39,32 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleMin::AggregationHandleMin(const Type &type)
-    : type_(type), block_update_(false) {
+    : AggregationConcreteHandle(AggregationID::kMin),
+      type_(type) {
   fast_comparator_.reset(
       ComparisonFactory::GetComparison(ComparisonID::kLess)
           .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
 }
 
-AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateMin>::CreateResizable(
-      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
-
-AggregationState* AggregationHandleMin::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size();
+AggregationState* AggregationHandleMin::accumulate(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor,
+    const std::vector<attribute_id> &argument_ids) const {
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for MIN: " << argument_ids.size();
 
-  return new AggregationStateMin(fast_comparator_->accumulateColumnVector(
-      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
+  const attribute_id argument_id = argument_ids.front();
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMin::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for MIN: " << accessor_ids.size();
+  ValueAccessor *target_accessor =
+      argument_id >= 0 ? accessor : aux_accessor;
+  const attribute_id target_argument_id =
+      argument_id >= 0 ? argument_id : -(argument_id+2);
 
   return new AggregationStateMin(fast_comparator_->accumulateValueAccessor(
       type_.getNullableVersion().makeNullValue(),
-      accessor,
-      accessor_ids.front()));
-}
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
-void AggregationHandleMin::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(1u, argument_ids.size())
-      << "Got wrong number of arguments for MIN: " << argument_ids.size();
+      target_accessor,
+      target_argument_id));
 }
 
 void AggregationHandleMin::mergeStates(const AggregationState &source,
@@ -98,13 +79,13 @@ void AggregationHandleMin::mergeStates(const AggregationState &source,
   }
 }
 
-void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source,
-                                           std::uint8_t *destination) const {
+void AggregationHandleMin::mergeStates(const std::uint8_t *source,
+                                       std::uint8_t *destination) const {
   const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source);
   TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination);
 
   if (!(src_min_ptr->isNull())) {
-    compareAndUpdateFast(dst_min_ptr, *src_min_ptr);
+    compareAndUpdate(dst_min_ptr, *src_min_ptr);
   }
 }
 
@@ -112,27 +93,10 @@ ColumnVector* AggregationHandleMin::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
     std::vector<std::vector<TypedValue>> *group_by_keys,
     int index) const {
-  return finalizeHashTableHelperFast<AggregationHandleMin,
-                                     AggregationStateFastHashTable>(
-      type_.getNonNullableVersion(), hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
-    const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleMin,
-      AggregationStateMin>(distinctify_hash_table);
-}
-
-void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
-    const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+  return finalizeHashTableHelper<
       AggregationHandleMin,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
+      PackedPayloadSeparateChainingAggregationStateHashTable>(
+          type_.getNonNullableVersion(), hash_table, group_by_keys, index);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index e3472ec..3571f02 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -28,7 +28,6 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
@@ -88,42 +87,39 @@ class AggregationHandleMin : public AggregationConcreteHandle {
  public:
   ~AggregationHandleMin() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {&type_};
+  }
+
+  const Type* getResultType() const override {
+    return &type_;
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateMin(type_);
   }
 
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
-  /**
-   * @brief Iterate with min aggregation state.
-   */
   inline void iterateUnaryInl(AggregationStateMin *state,
                               const TypedValue &value) const {
     DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
     compareAndUpdate(state, value);
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
-    DCHECK(value.isPlausibleInstanceOf(type_.getSignature()));
-    TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
-    compareAndUpdateFast(min_ptr, value);
-  }
+  AggregationState* accumulate(
+      ValueAccessor *accessor,
+      ColumnVectorsValueAccessor *aux_accessor,
+      const std::vector<attribute_id> &argument_ids) const override;
 
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
+  void mergeStates(const AggregationState &source,
+                   AggregationState *destination) const override;
 
-  void blockUpdate() override { block_update_ = true; }
+  TypedValue finalize(const AggregationState &state) const override {
+    return static_cast<const AggregationStateMin &>(state).min_;
+  }
 
-  void allowUpdate() override { block_update_ = false; }
+  std::size_t getPayloadSize() const override {
+    return sizeof(TypedValue);
+  }
 
   void initPayload(std::uint8_t *byte_ptr) const override {
     TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
@@ -138,41 +134,19 @@ class AggregationHandleMin : public AggregationConcreteHandle {
     }
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
-
-  TypedValue finalize(const AggregationState &state) const override {
-    return static_cast<const AggregationStateMin &>(state).min_;
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    DCHECK(argument.isPlausibleInstanceOf(type_.getSignature()));
+    TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr);
+    compareAndUpdate(min_ptr, argument);
   }
 
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    return static_cast<const AggregationStateMin &>(state).min_;
-  }
+  void mergeStates(const std::uint8_t *source,
+                   std::uint8_t *destination) const override;
 
-  inline TypedValue finalizeHashTableEntryFast(
+  inline TypedValue finalizeHashTableEntry(
       const std::uint8_t *byte_ptr) const {
-    const TypedValue *min_ptr = reinterpret_cast<const TypedValue *>(byte_ptr);
-    return TypedValue(*min_ptr);
+    return *reinterpret_cast<const TypedValue *>(byte_ptr);
   }
 
   ColumnVector* finalizeHashTable(
@@ -180,27 +154,6 @@ class AggregationHandleMin : public AggregationConcreteHandle {
       std::vector<std::vector<TypedValue>> *group_by_keys,
       int index) const override;
 
-  /**
-   * @brief Implementation of
-   * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for MIN aggregation.
-   */
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
-
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for MIN aggregation.
-   */
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override { return sizeof(TypedValue); }
-
  private:
   friend class AggregateFunctionMin;
 
@@ -228,8 +181,8 @@ class AggregationHandleMin : public AggregationConcreteHandle {
     }
   }
 
-  inline void compareAndUpdateFast(TypedValue *min_ptr,
-                                   const TypedValue &value) const {
+  inline void compareAndUpdate(TypedValue *min_ptr,
+                               const TypedValue &value) const {
     if (value.isNull()) return;
     if (min_ptr->isNull() ||
         fast_comparator_->compareTypedValues(value, *min_ptr)) {
@@ -240,8 +193,6 @@ class AggregationHandleMin : public AggregationConcreteHandle {
   const Type &type_;
   std::unique_ptr<UncheckedComparator> fast_comparator_;
 
-  bool block_update_;
-
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin);
 };