You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/07 22:35:31 UTC

[12/14] incubator-quickstep git commit: - Adds CollisionFreeVectorTable to support specialized fast path aggregation for range-bounded single integer group-by key. - Supports copy elision for aggregation.

- Adds CollisionFreeVectorTable to support specialized fast path aggregation for range-bounded single integer group-by key.
- Supports copy elision for aggregation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2d89e4fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2d89e4fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2d89e4fb

Branch: refs/heads/aggregate-on-left-outer-join
Commit: 2d89e4fbf3b51d7768d928d56b25ec0ab52faabb
Parents: ec76096
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Jan 30 14:46:39 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Tue Feb 7 15:29:19 2017 -0600

----------------------------------------------------------------------
 .../aggregation/AggregateFunctionCount.cpp      |    6 +-
 .../aggregation/AggregationConcreteHandle.cpp   |   32 +-
 .../aggregation/AggregationConcreteHandle.hpp   |  160 +-
 expressions/aggregation/AggregationHandle.hpp   |  193 +-
 .../aggregation/AggregationHandleAvg.cpp        |   95 +-
 .../aggregation/AggregationHandleAvg.hpp        |  115 +-
 .../aggregation/AggregationHandleCount.cpp      |  147 +-
 .../aggregation/AggregationHandleCount.hpp      |  154 +-
 .../aggregation/AggregationHandleDistinct.cpp   |   81 -
 .../aggregation/AggregationHandleDistinct.hpp   |  130 -
 .../aggregation/AggregationHandleMax.cpp        |   91 +-
 .../aggregation/AggregationHandleMax.hpp        |  111 +-
 .../aggregation/AggregationHandleMin.cpp        |   91 +-
 .../aggregation/AggregationHandleMin.hpp        |  120 +-
 .../aggregation/AggregationHandleSum.cpp        |   93 +-
 .../aggregation/AggregationHandleSum.hpp        |  114 +-
 expressions/aggregation/CMakeLists.txt          |   58 +-
 .../tests/AggregationHandleAvg_unittest.cpp     |  110 +-
 .../tests/AggregationHandleCount_unittest.cpp   |  145 +-
 .../tests/AggregationHandleMax_unittest.cpp     |  158 +-
 .../tests/AggregationHandleMin_unittest.cpp     |  158 +-
 .../tests/AggregationHandleSum_unittest.cpp     |  109 +-
 query_execution/QueryContext.hpp                |   14 -
 query_optimizer/CMakeLists.txt                  |    4 +
 query_optimizer/ExecutionGenerator.cpp          |  147 +-
 query_optimizer/ExecutionGenerator.hpp          |   20 +-
 relational_operators/CMakeLists.txt             |   15 +
 .../DestroyAggregationStateOperator.cpp         |    7 -
 .../FinalizeAggregationOperator.cpp             |   16 +-
 .../FinalizeAggregationOperator.hpp             |   14 +-
 .../InitializeAggregationOperator.cpp           |   72 +
 .../InitializeAggregationOperator.hpp           |  122 +
 relational_operators/WorkOrderFactory.cpp       |    3 +
 storage/AggregationOperationState.cpp           |  834 +++---
 storage/AggregationOperationState.hpp           |  178 +-
 storage/CMakeLists.txt                          |  131 +-
 storage/CollisionFreeVectorTable.cpp            |  285 +++
 storage/CollisionFreeVectorTable.hpp            |  730 ++++++
 storage/FastHashTable.hpp                       | 2403 ------------------
 storage/FastHashTableFactory.hpp                |  224 --
 storage/FastSeparateChainingHashTable.hpp       | 1551 -----------
 storage/HashTable.proto                         |    7 +-
 storage/HashTableBase.hpp                       |   42 +-
 storage/HashTableFactory.hpp                    |   63 +-
 storage/HashTablePool.hpp                       |   79 +-
 storage/PackedPayloadHashTable.cpp              |  463 ++++
 storage/PackedPayloadHashTable.hpp              |  995 ++++++++
 storage/PartitionedHashTablePool.hpp            |   56 +-
 storage/StorageBlock.cpp                        |  274 +-
 storage/StorageBlock.hpp                        |  167 --
 storage/ValueAccessorMultiplexer.hpp            |  145 ++
 .../BarrieredReadWriteConcurrentBitVector.hpp   |  282 ++
 utility/CMakeLists.txt                          |    7 +
 53 files changed, 4722 insertions(+), 7099 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregateFunctionCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregateFunctionCount.cpp b/expressions/aggregation/AggregateFunctionCount.cpp
index 466ff2f..9795b4a 100644
--- a/expressions/aggregation/AggregateFunctionCount.cpp
+++ b/expressions/aggregation/AggregateFunctionCount.cpp
@@ -53,16 +53,16 @@ AggregationHandle* AggregateFunctionCount::createHandle(
 
   if (argument_types.empty()) {
     // COUNT(*)
-    return new AggregationHandleCount<true, false>();
+    return new AggregationHandleCount<true, false>(nullptr);
   } else if (argument_types.front()->isNullable()) {
     // COUNT(some_nullable_argument)
-    return new AggregationHandleCount<false, true>();
+    return new AggregationHandleCount<false, true>(argument_types.front());
   } else {
     // COUNT(non_nullable_argument)
     //
     // TODO(chasseur): Modify query optimizer to optimize-away COUNT with a
     // non-nullable argument and convert it to COUNT(*).
-    return new AggregationHandleCount<false, false>();
+    return new AggregationHandleCount<false, false>(argument_types.front());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index e3fb520..fa21056 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -22,16 +22,14 @@
 #include <cstddef>
 #include <vector>
 
-#include "catalog/CatalogTypedefs.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTable.hpp"
 #include "storage/HashTableFactory.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 
 namespace quickstep {
 
 class StorageManager;
 class Type;
-class ValueAccessor;
 
 AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashTable(
     const HashTableImplType hash_table_impl,
@@ -39,30 +37,26 @@ AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashT
     const std::size_t estimated_num_distinct_keys,
     StorageManager *storage_manager) const {
   // Create a hash table with key types as key_types and value type as bool.
-  return AggregationStateHashTableFactory<bool>::CreateResizable(
+  return AggregationStateHashTableFactory::CreateResizable(
       hash_table_impl,
       key_types,
       estimated_num_distinct_keys,
+      {},
       storage_manager);
 }
 
 void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &key_ids,
+    const std::vector<MultiSourceAttributeId> &argument_ids,
+    const std::vector<MultiSourceAttributeId> &key_ids,
+    const ValueAccessorMultiplexer &accessor_mux,
     AggregationStateHashTableBase *distinctify_hash_table) const {
-  // If the key-value pair is already there, we don't need to update the value,
-  // which should always be "true". I.e. the value is just a placeholder.
-
-  AggregationStateFastHashTable *hash_table =
-      static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
-  if (key_ids.size() == 1) {
-    hash_table->upsertValueAccessorFast(
-        key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
-  } else {
-    std::vector<attribute_id> empty_args {kInvalidAttributeID};
-    hash_table->upsertValueAccessorCompositeKeyFast(
-        empty_args, accessor, key_ids, true /* check_for_null_keys */);
+  std::vector<MultiSourceAttributeId> concatenated_ids(key_ids);
+  for (const MultiSourceAttributeId &arg_id : argument_ids) {
+    concatenated_ids.emplace_back(arg_id);
   }
+
+  static_cast<PackedPayloadHashTable *>(distinctify_hash_table)
+      ->upsertValueAccessorCompositeKey({}, concatenated_ids, accessor_mux);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 398a032..2287ca1 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -21,14 +21,15 @@
 #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <utility>
 #include <vector>
 
-#include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTable.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -40,7 +41,6 @@ namespace quickstep {
 
 class StorageManager;
 class Type;
-class ValueAccessor;
 
 /** \addtogroup Expressions
  *  @{
@@ -51,7 +51,7 @@ class ValueAccessor;
  *        merging two group by hash tables.
  **/
 template <typename HandleT>
-class HashTableStateUpserterFast {
+class HashTableStateUpserter {
  public:
   /**
    * @brief Constructor.
@@ -61,8 +61,8 @@ class HashTableStateUpserterFast {
    *        table. The corresponding state (for the same key) in the destination
    *        hash table will be upserted.
    **/
-  HashTableStateUpserterFast(const HandleT &handle,
-                             const std::uint8_t *source_state)
+  HashTableStateUpserter(const HandleT &handle,
+                         const std::uint8_t *source_state)
       : handle_(handle), source_state_(source_state) {}
 
   /**
@@ -72,14 +72,14 @@ class HashTableStateUpserterFast {
    *        table that is being upserted.
    **/
   void operator()(std::uint8_t *destination_state) {
-    handle_.mergeStatesFast(source_state_, destination_state);
+    handle_.mergeStates(source_state_, destination_state);
   }
 
  private:
   const HandleT &handle_;
   const std::uint8_t *source_state_;
 
-  DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast);
+  DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter);
 };
 
 /**
@@ -93,74 +93,62 @@ class HashTableStateUpserterFast {
  **/
 class AggregationConcreteHandle : public AggregationHandle {
  public:
-  /**
-   * @brief Default implementaion for AggregationHandle::accumulateNullary().
-   */
-  AggregationState* accumulateNullary(
-      const std::size_t num_tuples) const override {
-    LOG(FATAL) << "Called accumulateNullary on an AggregationHandle that "
-               << "takes at least one argument.";
-  }
-
-  /**
-   * @brief Implementaion for AggregationHandle::createDistinctifyHashTable()
-   *        that creates a new HashTable for the distinctify step for
-   *        DISTINCT aggregation.
-   */
   AggregationStateHashTableBase* createDistinctifyHashTable(
       const HashTableImplType hash_table_impl,
       const std::vector<const Type *> &key_types,
       const std::size_t estimated_num_distinct_keys,
       StorageManager *storage_manager) const override;
 
-  /**
-   * @brief Implementaion for
-   * AggregationHandle::insertValueAccessorIntoDistinctifyHashTable()
-   * that inserts the GROUP BY expressions and aggregation arguments together
-   * as keys into the distinctify hash table.
-   */
   void insertValueAccessorIntoDistinctifyHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &key_ids,
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_ids,
+      const ValueAccessorMultiplexer &accessor_mux,
       AggregationStateHashTableBase *distinctify_hash_table) const override;
 
+  void blockUpdate() override {
+    block_update_ = true;
+  }
+
+  void allowUpdate() override {
+    block_update_ = false;
+  }
+
  protected:
-  AggregationConcreteHandle() {}
+  explicit AggregationConcreteHandle(const AggregationID agg_id)
+      : AggregationHandle(agg_id),
+        block_update_(false) {}
 
   template <typename HandleT, typename StateT>
-  StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+  StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelper(
       const AggregationStateHashTableBase &distinctify_hash_table) const;
 
-  template <typename HandleT, typename HashTableT>
-  void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+  template <typename HandleT>
+  void aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *hash_table,
-      std::size_t index) const;
+      const std::size_t index,
+      AggregationStateHashTableBase *hash_table) const;
 
-  template <typename HandleT, typename HashTableT>
-  ColumnVector* finalizeHashTableHelperFast(
+  template <typename HandleT>
+  ColumnVector* finalizeHashTableHelper(
       const Type &result_type,
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const;
 
-  template <typename HandleT, typename HashTableT>
-  inline TypedValue finalizeGroupInHashTableFast(
+  template <typename HandleT>
+  inline TypedValue finalizeGroupInHashTable(
       const AggregationStateHashTableBase &hash_table,
-      const std::vector<TypedValue> &group_key,
-      int index) const {
+      const std::size_t index,
+      const std::vector<TypedValue> &group_key) const {
     const std::uint8_t *group_state =
-        static_cast<const HashTableT &>(hash_table).getSingleCompositeKey(group_key, index);
+        static_cast<const PackedPayloadHashTable &>(hash_table)
+            .getSingleCompositeKey(group_key, index);
     DCHECK(group_state != nullptr)
         << "Could not find entry for specified group_key in HashTable";
-    return static_cast<const HandleT *>(this)->finalizeHashTableEntryFast(
-        group_state);
+    return static_cast<const HandleT *>(this)->finalizeHashTableEntry(group_state);
   }
 
-  template <typename HandleT, typename HashTableT>
-  void mergeGroupByHashTablesHelperFast(
-      const AggregationStateHashTableBase &source_hash_table,
-      AggregationStateHashTableBase *destination_hash_table) const;
+  bool block_update_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle);
@@ -185,17 +173,10 @@ class HashTableAggregateFinalizer {
         output_column_vector_(output_column_vector) {}
 
   inline void operator()(const std::vector<TypedValue> &group_by_key,
-                         const AggregationState &group_state) {
-    group_by_keys_->emplace_back(group_by_key);
-    output_column_vector_->appendTypedValue(
-        handle_.finalizeHashTableEntry(group_state));
-  }
-
-  inline void operator()(const std::vector<TypedValue> &group_by_key,
-                         const unsigned char *byte_ptr) {
+                         const std::uint8_t *byte_ptr) {
     group_by_keys_->emplace_back(group_by_key);
     output_column_vector_->appendTypedValue(
-        handle_.finalizeHashTableEntryFast(byte_ptr));
+        handle_.finalizeHashTableEntry(byte_ptr));
   }
 
  private:
@@ -211,7 +192,7 @@ class HashTableAggregateFinalizer {
 
 template <typename HandleT, typename StateT>
 StateT* AggregationConcreteHandle::
-    aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+    aggregateOnDistinctifyHashTableForSingleUnaryHelper(
         const AggregationStateHashTableBase &distinctify_hash_table) const {
   const HandleT &handle = static_cast<const HandleT &>(*this);
   StateT *state = static_cast<StateT *>(createInitialState());
@@ -219,15 +200,14 @@ StateT* AggregationConcreteHandle::
   // A lambda function which will be called on each key from the distinctify
   // hash table.
   const auto aggregate_functor = [&handle, &state](
-      const TypedValue &key, const std::uint8_t &dumb_placeholder) {
+      const TypedValue &key, const std::uint8_t *dumb_placeholder) {
     // For each (unary) key in the distinctify hash table, aggregate the key
     // into "state".
     handle.iterateUnaryInl(state, key);
   };
 
-  const AggregationStateFastHashTable &hash_table =
-      static_cast<const AggregationStateFastHashTable &>(
-          distinctify_hash_table);
+  const auto &hash_table =
+      static_cast<const PackedPayloadHashTable &>(distinctify_hash_table);
   // Invoke the lambda function "aggregate_functor" on each key from the
   // distinctify hash table.
   hash_table.forEach(&aggregate_functor);
@@ -235,20 +215,20 @@ StateT* AggregationConcreteHandle::
   return state;
 }
 
-template <typename HandleT, typename HashTableT>
+template <typename HandleT>
 void AggregationConcreteHandle::
-    aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+    aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
         const AggregationStateHashTableBase &distinctify_hash_table,
-        AggregationStateHashTableBase *aggregation_hash_table,
-        std::size_t index) const {
+        const std::size_t index,
+        AggregationStateHashTableBase *aggregation_hash_table) const {
   const HandleT &handle = static_cast<const HandleT &>(*this);
-  HashTableT *target_hash_table =
-      static_cast<HashTableT *>(aggregation_hash_table);
+  PackedPayloadHashTable *target_hash_table =
+      static_cast<PackedPayloadHashTable *>(aggregation_hash_table);
 
   // A lambda function which will be called on each key-value pair from the
   // distinctify hash table.
   const auto aggregate_functor = [&handle, &target_hash_table, &index](
-      std::vector<TypedValue> &key, const bool &dumb_placeholder) {
+      std::vector<TypedValue> &key, const std::uint8_t *dumb_placeholder) {
     // For each (composite) key vector in the distinctify hash table with size N.
     // The first N-1 entries are GROUP BY columns and the last entry is the
     // argument to be aggregated on.
@@ -258,28 +238,28 @@ void AggregationConcreteHandle::
     // An upserter as lambda function for aggregating the argument into its
     // GROUP BY group's entry inside aggregation_hash_table.
     const auto upserter = [&handle, &argument](std::uint8_t *state) {
-      handle.iterateUnaryInlFast(argument, state);
+      handle.iterateUnaryInl(argument, state);
     };
 
-    target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index);
+    target_hash_table->upsertCompositeKey(key, &upserter, index);
   };
 
-  const HashTableT &source_hash_table =
-      static_cast<const HashTableT &>(distinctify_hash_table);
+  const PackedPayloadHashTable &source_hash_table =
+      static_cast<const PackedPayloadHashTable &>(distinctify_hash_table);
   // Invoke the lambda function "aggregate_functor" on each composite key vector
   // from the distinctify hash table.
-  source_hash_table.forEachCompositeKeyFast(&aggregate_functor);
+  source_hash_table.forEachCompositeKey(&aggregate_functor);
 }
 
-template <typename HandleT, typename HashTableT>
-ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
+template <typename HandleT>
+ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
     const Type &result_type,
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
+    const std::size_t index,
+    std::vector<std::vector<TypedValue>> *group_by_keys) const {
   const HandleT &handle = static_cast<const HandleT &>(*this);
-  const HashTableT &hash_table_concrete =
-      static_cast<const HashTableT &>(hash_table);
+  const PackedPayloadHashTable &hash_table_concrete =
+      static_cast<const PackedPayloadHashTable &>(hash_table);
 
   if (group_by_keys->empty()) {
     if (NativeColumnVector::UsableForType(result_type)) {
@@ -287,14 +267,14 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
           new NativeColumnVector(result_type, hash_table_concrete.numEntries());
       HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer(
           handle, group_by_keys, result);
-      hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
+      hash_table_concrete.forEachCompositeKey(&finalizer, index);
       return result;
     } else {
       IndirectColumnVector *result = new IndirectColumnVector(
           result_type, hash_table_concrete.numEntries());
       HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer(
           handle, group_by_keys, result);
-      hash_table_concrete.forEachCompositeKeyFast(&finalizer, index);
+      hash_table_concrete.forEachCompositeKey(&finalizer, index);
       return result;
     }
   } else {
@@ -303,8 +283,8 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
           new NativeColumnVector(result_type, group_by_keys->size());
       for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
         result->appendTypedValue(
-            finalizeGroupInHashTableFast<HandleT, HashTableT>(
-                hash_table, group_by_key, index));
+            finalizeGroupInHashTable<HandleT>(
+                hash_table, index, group_by_key));
       }
       return result;
     } else {
@@ -312,8 +292,8 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast(
           result_type, hash_table_concrete.numEntries());
       for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
         result->appendTypedValue(
-            finalizeGroupInHashTableFast<HandleT, HashTableT>(
-                hash_table, group_by_key, index));
+            finalizeGroupInHashTable<HandleT>(
+                hash_table, index, group_by_key));
       }
       return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 4b51179..9c7b166 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -21,20 +21,21 @@
 #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_HPP_
 
 #include <cstddef>
-#include <memory>
 #include <vector>
 
-#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
 
+#include "glog/logging.h"
+
 namespace quickstep {
 
 class ColumnVector;
 class StorageManager;
 class Type;
-class ValueAccessor;
 
 /** \addtogroup Expressions
  *  @{
@@ -109,34 +110,34 @@ class AggregationHandle {
   virtual ~AggregationHandle() {}
 
   /**
-   * @brief Create an initial "blank" state for this aggregation.
+   * @brief Get the ID of this aggregation.
    *
-   * @return An initial "blank" state for this particular aggregation.
+   * @return The AggregationID of this AggregationHandle.
    **/
-  virtual AggregationState* createInitialState() const = 0;
+  inline AggregationID getAggregationID() const {
+    return agg_id_;
+  }
 
   /**
-   * @brief Create a new HashTable for aggregation with GROUP BY.
+   * @brief Get the list of Types (in order) for arguments to this aggregation.
    *
-   * @param hash_table_impl The choice of which concrete HashTable
-   *        implementation to use.
-   * @param group_by_types The types of the GROUP BY columns/expressions. These
-   *        correspond to the (composite) key type for the HashTable.
-   * @param estimated_num_groups The estimated number of distinct groups for
-   *        the GROUP BY aggregation. This is used to size the initial
-   *        HashTable. This is an estimate only, and the HashTable will be
-   *        resized if it becomes over-full.
-   * @param storage_manager The StorageManager to use to create the HashTable.
-   *        A StorageBlob will be allocated to serve as the HashTable's
-   *        in-memory storage.
-   * @return A new HashTable instance with the appropriate state type for this
-   *         aggregate.
+   * @return The list of Types for arguments to this aggregation.
+   */
+  virtual std::vector<const Type *> getArgumentTypes() const = 0;
+
+  /**
+   * @brief Get the result Type of this aggregation.
+   *
+   * @return The result Type of this aggregation.
+   */
+  virtual const Type* getResultType() const = 0;
+
+  /**
+   * @brief Create an initial "blank" state for this aggregation.
+   *
+   * @return An initial "blank" state for this particular aggregation.
    **/
-  virtual AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const = 0;
+  virtual AggregationState* createInitialState() const = 0;
 
   /**
    * @brief Accumulate over tuples for a nullary aggregate function (one that
@@ -146,70 +147,31 @@ class AggregationHandle {
    *        data is accessed, the only thing that a nullary aggeregate can know
    *        about input is its cardinality.
    * @return A new AggregationState which contains the accumulated results from
-   *         applying the (nullary) aggregate to the specified number of
-   *         tuples.
+   *         applying the (nullary) aggregate to the specified number of tuples.
    **/
   virtual AggregationState* accumulateNullary(
-      const std::size_t num_tuples) const = 0;
-
-  /**
-   * @brief Accumulate (iterate over) all values in one or more ColumnVectors
-   *        and return a new AggregationState which can be merged with other
-   *        states or finalized.
-   *
-   * @param column_vectors One or more ColumnVectors that the aggregate will be
-   *        applied to. These correspond to the aggregate function's arguments,
-   *        in order.
-   * @return A new AggregationState which contains the accumulated results from
-   *         applying the aggregate to column_vectors. Caller is responsible
-   *         for deleting the returned AggregationState.
-   **/
-  virtual AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const = 0;
+      const std::size_t num_tuples) const {
+    LOG(FATAL) << "Called accumulateNullary on an AggregationHandle that "
+               << "takes at least one argument.";
+  }
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   /**
    * @brief Accumulate (iterate over) all values in columns accessible through
-   *        a ValueAccessor and return a new AggregationState which can be
-   *        merged with other states or finalized.
+   *        the ValueAccessors from a ValueAccessorMultiplexer and return a new
+   *        AggregationState which can be merged with other states or finalized.
    *
-   * @param accessor A ValueAccessor that the columns to be aggregated can be
-   *        accessed through.
-   * @param accessor_ids The attribute_ids that correspond to the columns in
-   *        accessor to aggeregate. These correspond to the aggregate
-   *        function's arguments, in order.
+   * @param argument_ids The multi-source attribute ids that correspond to the
+   *        columns in \p accessor_mux to aggeregate. These correspond to the
+   *        aggregate function's arguments, in order.
+   * @param accessor_mux A ValueAccessorMultiplexer object that contains the
+   *        ValueAccessors.
    * @return A new AggregationState which contains the accumulated results from
-   *         applying the aggregate to the specified columns in accessor.
+   *         applying the aggregate to the specified columns.
    *         Caller is responsible for deleting the returned AggregationState.
    **/
   virtual AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const = 0;
-#endif
-
-  /**
-   * @brief Perform an aggregation with GROUP BY over all the tuples accessible
-   *        through a ValueAccessor, upserting states in a HashTable.
-   *
-   * @note Implementations of this method are threadsafe with respect to
-   *       hash_table, and can be called concurrently from multiple threads
-   *       with the same HashTable object.
-   *
-   * @param accessor The ValueAccessor that will be iterated over to read
-   *        tuples.
-   * @param argument_ids The attribute_ids of the arguments to this aggregate
-   *        in accessor, in order.
-   * @param group_by_key_ids The attribute_ids of the group-by
-   *        columns/expressions in accessor.
-   * @param hash_table The HashTable to upsert AggregationStates in. This
-   *        should have been created by calling createGroupByHashTable() on
-   *        this same AggregationHandle.
-   **/
-  virtual void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const = 0;
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      const ValueAccessorMultiplexer &accessor_mux) const = 0;
 
   /**
    * @brief Merge two AggregationStates, updating one in-place. This computes a
@@ -253,24 +215,24 @@ class AggregationHandle {
    * @param hash_table The HashTable to finalize states from. This should have
    *        have been created by calling createGroupByHashTable() on this same
    *        AggregationHandle.
+   * @param index The index of the AggregationHandle to be finalized.
    * @param group_by_keys A pointer to a vector of vectors of GROUP BY keys. If
    *        this is initially empty, it will be filled in with the GROUP BY
    *        keys visited by this method in the same order as the finalized
    *        values returned in the ColumnVector. If this is already filled in,
    *        then this method will visit the GROUP BY keys in the exact order
    *        specified.
-   * @param index The index of the AggregationHandle to be finalized.
    *
    * @return A ColumnVector containing each group's finalized aggregate value.
    **/
   virtual ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const = 0;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const = 0;
 
   /**
    * @brief Create a new HashTable for the distinctify step for DISTINCT
-   * aggregation.
+   *        aggregation.
    *
    * Distinctify is the first step for DISTINCT aggregation. This step inserts
    * the GROUP BY expression values and aggregation arguments together as keys
@@ -283,8 +245,8 @@ class AggregationHandle {
    * we simply treat it as a special GROUP BY case that the GROUP BY expression
    * vector is empty.
    *
-   * @param hash_table_impl The choice of which concrete HashTable
-   *        implementation to use.
+   * @param hash_table_impl The choice of which concrete HashTable implementation
+   *        to use.
    * @param key_types The types of the GROUP BY expressions together with the
    *        types of the aggregation arguments.
    * @param estimated_num_distinct_keys The estimated number of distinct keys
@@ -307,13 +269,14 @@ class AggregationHandle {
 
   /**
    * @brief Inserts the GROUP BY expressions and aggregation arguments together
-   * as keys into the distinctify hash table.
+   *        as keys into the distinctify hash table.
    *
-   * @param accessor The ValueAccessor that will be iterated over to read
-   *        tuples.
-   * @param key_ids The attribute_ids of the GROUP BY expressions in accessor
-   *        together with the attribute_ids of the arguments to this aggregate
-   *        in accessor, in order.
+   * @param argument_ids The argument ids that correspond to the columns in
+   *        \p accessor_mux.
+   * @param key_ids The group-by key ids that correspond to the columns in
+   *        \p accessor_mux.
+   * @param accessor_mux A ValueAccessorMultiplexer object that contains the
+   *        ValueAccessors to be iterated over to read tuples.
    * @param distinctify_hash_table The HashTable to store the GROUP BY
    *        expressions and the aggregation arguments together as hash table
    *        keys and a bool constant \c true as hash table value (So the hash
@@ -321,13 +284,14 @@ class AggregationHandle {
    *        by calling createDistinctifyHashTable();
    */
   virtual void insertValueAccessorIntoDistinctifyHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &key_ids,
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_ids,
+      const ValueAccessorMultiplexer &accessor_mux,
       AggregationStateHashTableBase *distinctify_hash_table) const = 0;
 
   /**
    * @brief Perform single (i.e. without GROUP BY) aggregation on the keys from
-   * the distinctify hash table to actually compute the aggregated results.
+   *        the distinctify hash table to actually compute the aggregated results.
    *
    * @param distinctify_hash_table Hash table which stores the distinctified
    *        aggregation arguments as hash table keys. This should have been
@@ -346,25 +310,26 @@ class AggregationHandle {
    * @param distinctify_hash_table Hash table which stores the GROUP BY
    *        expression values and aggregation arguments together as hash table
    *        keys.
+   * @param index The index of the AggregationHandle to perform aggregation.
    * @param aggregation_hash_table The HashTable to upsert AggregationStates in.
    *        This should have been created by calling createGroupByHashTable() on
    *        this same AggregationHandle.
-   * @param index The index of the distinctify hash table for which we perform
-   *        the DISTINCT aggregation.
    */
   virtual void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const = 0;
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const = 0;
 
   /**
    * @brief Get the number of bytes needed to store the aggregation handle's
    *        state.
    **/
-  virtual std::size_t getPayloadSize() const { return 1; }
+  virtual std::size_t getPayloadSize() const {
+    return 1u;
+  }
 
   /**
-   * @brief Update the aggregation state for nullary aggregation function e.g.
+   * @brief Update the aggregation state for nullary aggregation function, e.g.
    *        COUNT(*).
    *
    * @note This function should be overloaded by those aggregation function
@@ -372,7 +337,10 @@ class AggregationHandle {
    *
    * @param byte_ptr The pointer where the aggregation state is stored.
    **/
-  virtual void updateStateNullary(std::uint8_t *byte_ptr) const {}
+  virtual void updateStateNullary(std::uint8_t *byte_ptr) const {
+    LOG(FATAL) << "Called updateStateNullary on an AggregationHandle that "
+               << "takes at least one argument.";
+  }
 
   /**
    * @brief Update the aggregation state for unary aggregation function e.g.
@@ -383,7 +351,7 @@ class AggregationHandle {
    * @param byte_ptr The pointer where the aggregation state is stored.
    **/
   virtual void updateStateUnary(const TypedValue &argument,
-                                std::uint8_t *byte_ptr) const {}
+                                std::uint8_t *byte_ptr) const = 0;
 
   /**
    * @brief Merge two aggregation states for this aggregation handle.
@@ -394,8 +362,8 @@ class AggregationHandle {
    * @param src A pointer to the source aggregation state.
    * @param dst A pointer to the destination aggregation state.
    **/
-  virtual void mergeStatesFast(const std::uint8_t *src,
-                               std::uint8_t *dst) const {}
+  virtual void mergeStates(const std::uint8_t *src,
+                           std::uint8_t *dst) const = 0;
 
   /**
    * @brief Initialize the payload (in the aggregation hash table) for the given
@@ -403,7 +371,7 @@ class AggregationHandle {
    *
    * @param byte_ptr The pointer to the aggregation state in the hash table.
    **/
-  virtual void initPayload(std::uint8_t *byte_ptr) const {}
+  virtual void initPayload(std::uint8_t *byte_ptr) const = 0;
 
   /**
    * @brief Destroy the payload (in the aggregation hash table) for the given
@@ -411,22 +379,25 @@ class AggregationHandle {
    *
    * @param byte_ptr The pointer to the aggregation state in the hash table.
    **/
-  virtual void destroyPayload(std::uint8_t *byte_ptr) const {}
+  virtual void destroyPayload(std::uint8_t *byte_ptr) const = 0;
 
   /**
    * @brief Inform the aggregation handle to block (prohibit) updates on the
    *        aggregation state.
    **/
-  virtual void blockUpdate() {}
+  virtual void blockUpdate() = 0;
 
   /**
-   * @brief Inform the aggregation handle to allow updates on the
-   *        aggregation state.
+   * @brief Inform the aggregation handle to allow updates on the aggregation
+   *        state.
    **/
-  virtual void allowUpdate() {}
+  virtual void allowUpdate() = 0;
 
  protected:
-  AggregationHandle() {}
+  explicit AggregationHandle(const AggregationID agg_id)
+      : agg_id_(agg_id) {}
+
+  const AggregationID agg_id_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationHandle);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 2481092..46bec1e 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -20,12 +20,13 @@
 #include "expressions/aggregation/AggregationHandleAvg.hpp"
 
 #include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypeFactory.hpp"
@@ -39,10 +40,11 @@
 
 namespace quickstep {
 
-class StorageManager;
+class ColumnVector;
 
 AggregationHandleAvg::AggregationHandleAvg(const Type &type)
-    : argument_type_(type), block_update_(false) {
+    : AggregationConcreteHandle(AggregationID::kAvg),
+      argument_type_(type) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_precision_id;
@@ -87,52 +89,29 @@ AggregationHandleAvg::AggregationHandleAvg(const Type &type)
             ->getNullableVersion());
 }
 
-AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable(
-      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
+AggregationState* AggregationHandleAvg::accumulateValueAccessor(
+    const std::vector<MultiSourceAttributeId> &argument_ids,
+    const ValueAccessorMultiplexer &accessor_mux) const {
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for AVG: " << argument_ids.size();
 
-AggregationState* AggregationHandleAvg::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size();
+  const ValueAccessorSource argument_source = argument_ids.front().source;
+  const attribute_id argument_id = argument_ids.front().attr_id;
 
-  AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
-  std::size_t count = 0;
-  state->sum_ = fast_add_operator_->accumulateColumnVector(
-      state->sum_, *column_vectors.front(), &count);
-  state->count_ = count;
-  return state;
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleAvg::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for AVG: " << accessor_ids.size();
+  DCHECK(argument_source != ValueAccessorSource::kInvalid);
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
   AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
   std::size_t count = 0;
-  state->sum_ = fast_add_operator_->accumulateValueAccessor(
-      state->sum_, accessor, accessor_ids.front(), &count);
+  state->sum_ =
+      fast_add_operator_->accumulateValueAccessor(
+          state->sum_,
+          accessor_mux.getValueAccessorBySource(argument_source),
+          argument_id,
+          &count);
   state->count_ = count;
   return state;
 }
-#endif
-
-void AggregationHandleAvg::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(1u, argument_ids.size())
-      << "Got wrong number of arguments for AVG: " << argument_ids.size();
-}
 
 void AggregationHandleAvg::mergeStates(const AggregationState &source,
                                        AggregationState *destination) const {
@@ -147,8 +126,8 @@ void AggregationHandleAvg::mergeStates(const AggregationState &source,
       avg_destination->sum_, avg_source.sum_);
 }
 
-void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source,
-                                           std::uint8_t *destination) const {
+void AggregationHandleAvg::mergeStates(const std::uint8_t *source,
+                                       std::uint8_t *destination) const {
   const TypedValue *src_sum_ptr =
       reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
   const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(
@@ -177,29 +156,25 @@ TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
 
 ColumnVector* AggregationHandleAvg::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelperFast<AggregationHandleAvg,
-                                     AggregationStateFastHashTable>(
-      *result_type_, hash_table, group_by_keys, index);
+    const std::size_t index,
+    std::vector<std::vector<TypedValue>> *group_by_keys) const {
+  return finalizeHashTableHelper<AggregationHandleAvg>(
+      *result_type_, hash_table, index, group_by_keys);
 }
 
-AggregationState*
-AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
+AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleAvg,
-      AggregationStateAvg>(distinctify_hash_table);
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+      AggregationHandleAvg, AggregationStateAvg>(
+          distinctify_hash_table);
 }
 
 void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
-      AggregationHandleAvg,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
+    const std::size_t index,
+    AggregationStateHashTableBase *aggregation_hash_table) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleAvg>(
+      distinctify_hash_table, index, aggregation_hash_table);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 47132c6..970561c 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -25,11 +25,9 @@
 #include <memory>
 #include <vector>
 
-#include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -40,9 +38,8 @@
 
 namespace quickstep {
 
+class AggregationStateHashTableBase;
 class ColumnVector;
-class StorageManager;
-class ValueAccessor;
 
 /** \addtogroup Expressions
  *  @{
@@ -106,19 +103,18 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
  public:
   ~AggregationHandleAvg() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {&argument_type_};
+  }
+
+  const Type* getResultType() const override {
+    return result_type_;
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateAvg(blank_state_);
   }
 
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
-  /**
-   * @brief Iterate method with average aggregation state.
-   **/
   inline void iterateUnaryInl(AggregationStateAvg *state,
                               const TypedValue &value) const {
     DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
@@ -129,8 +125,8 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
     ++state->count_;
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
+  inline void iterateUnaryInl(const TypedValue &value,
+                              std::uint8_t *byte_ptr) const {
     DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
     if (value.isNull()) return;
     TypedValue *sum_ptr =
@@ -141,16 +137,18 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
     ++(*count_ptr);
   }
 
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
-    }
-  }
+  AggregationState* accumulateValueAccessor(
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      const ValueAccessorMultiplexer &accessor_mux) const override;
 
-  void blockUpdate() override { block_update_ = true; }
+  void mergeStates(const AggregationState &source,
+                   AggregationState *destination) const override;
 
-  void allowUpdate() override { block_update_ = false; }
+  TypedValue finalize(const AggregationState &state) const override;
+
+  std::size_t getPayloadSize() const override {
+    return blank_state_.getPayloadSize();
+  }
 
   void initPayload(std::uint8_t *byte_ptr) const override {
     TypedValue *sum_ptr =
@@ -169,43 +167,17 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
     }
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_id) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override;
-
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    if (!block_update_) {
+      iterateUnaryInl(argument, byte_ptr);
+    }
+  }
 
-  TypedValue finalize(const AggregationState &state) const override;
+  void mergeStates(const std::uint8_t *source,
+                   std::uint8_t *destination) const override;
 
   inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    const AggregationStateAvg &agg_state =
-        static_cast<const AggregationStateAvg &>(state);
-    // TODO(chasseur): Could improve performance further if we made a special
-    // version of finalizeHashTable() that collects all the sums into one
-    // ColumnVector and all the counts into another and then applies
-    // '*divide_operator_' to them in bulk.
-    return divide_operator_->applyToTypedValues(
-        agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_)));
-  }
-
-  inline TypedValue finalizeHashTableEntryFast(
       const std::uint8_t *byte_ptr) const {
     std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
     TypedValue *sum_ptr =
@@ -218,31 +190,16 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for AVG aggregation.
-   */
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
+      const AggregationStateHashTableBase &distinctify_hash_table) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for AVG aggregation.
-   */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override {
-    return blank_state_.getPayloadSize();
-  }
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const override;
 
  private:
   friend class AggregateFunctionAvg;
@@ -261,8 +218,6 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
   std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_;
   std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
 
-  bool block_update_;
-
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 034c942..cf92ec7 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -21,100 +21,48 @@
 
 #include <atomic>
 #include <cstddef>
-#include <memory>
+#include <cstdint>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 #include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "storage/ValueAccessorUtil.hpp"
-#endif
-
 #include "types/TypeFactory.hpp"
 #include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
-#include "types/containers/ColumnVector.hpp"
-#include "types/containers/ColumnVectorUtil.hpp"
 
 #include "glog/logging.h"
 
 namespace quickstep {
 
-class StorageManager;
-class Type;
-class ValueAccessor;
+class ColumnVector;
 
 template <bool count_star, bool nullable_type>
-AggregationStateHashTableBase*
-AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<
-      AggregationStateCount>::CreateResizable(hash_table_impl,
-                                              group_by_types,
-                                              estimated_num_groups,
-                                              storage_manager);
-}
-
-template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
+AggregationState* AggregationHandleCount<count_star, nullable_type>
+    ::accumulateValueAccessor(
+        const std::vector<MultiSourceAttributeId> &argument_ids,
+        const ValueAccessorMultiplexer &accessor_mux) const {
   DCHECK(!count_star)
       << "Called non-nullary accumulation method on an AggregationHandleCount "
       << "set up for nullary COUNT(*)";
 
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for COUNT: "
-      << column_vectors.size();
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for COUNT: " << argument_ids.size();
 
-  std::size_t count = 0;
-  InvokeOnColumnVector(
-      *column_vectors.front(),
-      [&](const auto &column_vector) -> void {  // NOLINT(build/c++11)
-        if (nullable_type) {
-          // TODO(shoban): Iterating over the ColumnVector is a rather slow way
-          // to do this. We should look at extending the ColumnVector interface
-          // to do a quick count of the non-null values (i.e. the length minus
-          // the population count of the null bitmap). We should do something
-          // similar for ValueAccessor too.
-          for (std::size_t pos = 0; pos < column_vector.size(); ++pos) {
-            count += !column_vector.getTypedValue(pos).isNull();
-          }
-        } else {
-          count = column_vector.size();
-        }
-      });
+  const ValueAccessorSource argument_source = argument_ids.front().source;
+  const attribute_id argument_id = argument_ids.front().attr_id;
 
-  return new AggregationStateCount(count);
-}
+  DCHECK(argument_source != ValueAccessorSource::kInvalid);
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-template <bool count_star, bool nullable_type>
-AggregationState*
-AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK(!count_star)
-      << "Called non-nullary accumulation method on an AggregationHandleCount "
-      << "set up for nullary COUNT(*)";
-
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for COUNT: " << accessor_ids.size();
-
-  const attribute_id accessor_id = accessor_ids.front();
   std::size_t count = 0;
   InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-      accessor,
-      [&accessor_id, &count](auto *accessor) -> void {  // NOLINT(build/c++11)
+      accessor_mux.getValueAccessorBySource(argument_source),
+      [&argument_id, &count](auto *accessor) -> void {  // NOLINT(build/c++11)
         if (nullable_type) {
           while (accessor->next()) {
-            count += !accessor->getTypedValue(accessor_id).isNull();
+            count += !accessor->getTypedValue(argument_id).isNull();
           }
         } else {
           count = accessor->getNumTuples();
@@ -123,24 +71,6 @@ AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor(
 
   return new AggregationStateCount(count);
 }
-#endif
-
-template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::
-    aggregateValueAccessorIntoHashTable(
-        ValueAccessor *accessor,
-        const std::vector<attribute_id> &argument_ids,
-        const std::vector<attribute_id> &group_by_key_ids,
-        AggregationStateHashTableBase *hash_table) const {
-  if (count_star) {
-    DCHECK_EQ(0u, argument_ids.size())
-        << "Got wrong number of arguments for COUNT(*): "
-        << argument_ids.size();
-  } else {
-    DCHECK_EQ(1u, argument_ids.size())
-        << "Got wrong number of arguments for COUNT: " << argument_ids.size();
-  }
-}
 
 template <bool count_star, bool nullable_type>
 void AggregationHandleCount<count_star, nullable_type>::mergeStates(
@@ -156,7 +86,7 @@ void AggregationHandleCount<count_star, nullable_type>::mergeStates(
 }
 
 template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast(
+void AggregationHandleCount<count_star, nullable_type>::mergeStates(
     const std::uint8_t *source, std::uint8_t *destination) const {
   const std::int64_t *src_count_ptr =
       reinterpret_cast<const std::int64_t *>(source);
@@ -165,38 +95,35 @@ void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast(
 }
 
 template <bool count_star, bool nullable_type>
-ColumnVector*
-AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
-    const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelperFast<
-      AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateFastHashTable>(
-      TypeFactory::GetType(kLong), hash_table, group_by_keys, index);
+ColumnVector* AggregationHandleCount<count_star, nullable_type>
+    ::finalizeHashTable(
+        const AggregationStateHashTableBase &hash_table,
+        const std::size_t index,
+        std::vector<std::vector<TypedValue>> *group_by_keys) const {
+  return finalizeHashTableHelper<
+      AggregationHandleCount<count_star, nullable_type>>(
+          TypeFactory::GetType(kLong), hash_table, index, group_by_keys);
 }
 
 template <bool count_star, bool nullable_type>
-AggregationState* AggregationHandleCount<count_star, nullable_type>::
-    aggregateOnDistinctifyHashTableForSingle(
+AggregationState* AggregationHandleCount<count_star, nullable_type>
+    ::aggregateOnDistinctifyHashTableForSingle(
         const AggregationStateHashTableBase &distinctify_hash_table) const {
-  DCHECK_EQ(count_star, false);
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
       AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateCount>(distinctify_hash_table);
+      AggregationStateCount>(
+          distinctify_hash_table);
 }
 
 template <bool count_star, bool nullable_type>
-void AggregationHandleCount<count_star, nullable_type>::
-    aggregateOnDistinctifyHashTableForGroupBy(
+void AggregationHandleCount<count_star, nullable_type>
+    ::aggregateOnDistinctifyHashTableForGroupBy(
         const AggregationStateHashTableBase &distinctify_hash_table,
-        AggregationStateHashTableBase *aggregation_hash_table,
-        std::size_t index) const {
-  DCHECK_EQ(count_star, false);
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
-      AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
+        const std::size_t index,
+        AggregationStateHashTableBase *aggregation_hash_table) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
+      AggregationHandleCount<count_star, nullable_type>>(
+          distinctify_hash_table, index, aggregation_hash_table);
 }
 
 // Explicitly instantiate and compile in the different versions of

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 6aab0cd..72ea923 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -23,23 +23,21 @@
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
-#include <memory>
 #include <vector>
 
-#include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
-#include "storage/HashTableBase.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "types/LongType.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
+class AggregationStateHashTableBase;
 class ColumnVector;
-class StorageManager;
 class Type;
-class ValueAccessor;
 
 template <bool, bool>
 class AggregationHandleCount;
@@ -98,28 +96,31 @@ class AggregationHandleCount : public AggregationConcreteHandle {
  public:
   ~AggregationHandleCount() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    if (argument_type_ == nullptr) {
+      return {};
+    } else {
+      return {argument_type_};
+    }
+  }
+
+  const Type* getResultType() const override {
+    return &LongType::InstanceNonNullable();
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateCount();
   }
 
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
   inline void iterateNullaryInl(AggregationStateCount *state) const {
     state->count_.fetch_add(1, std::memory_order_relaxed);
   }
 
-  inline void iterateNullaryInlFast(std::uint8_t *byte_ptr) const {
+  inline void iterateNullaryInl(std::uint8_t *byte_ptr) const {
     std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
-    (*count_ptr)++;
+    ++(*count_ptr);
   }
 
-  /**
-   * @brief Iterate with count aggregation state.
-   */
   inline void iterateUnaryInl(AggregationStateCount *state,
                               const TypedValue &value) const {
     if ((!nullable_type) || (!value.isNull())) {
@@ -127,118 +128,89 @@ class AggregationHandleCount : public AggregationConcreteHandle {
     }
   }
 
-  inline void iterateUnaryInlFast(const TypedValue &value,
-                                  std::uint8_t *byte_ptr) const {
+  inline void iterateUnaryInl(const TypedValue &value,
+                              std::uint8_t *byte_ptr) const {
     if ((!nullable_type) || (!value.isNull())) {
-      std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
-      (*count_ptr)++;
-    }
-  }
-
-  inline void updateStateUnary(const TypedValue &argument,
-                               std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateUnaryInlFast(argument, byte_ptr);
+      ++(*reinterpret_cast<std::int64_t *>(byte_ptr));
     }
   }
 
-  inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
-    if (!block_update_) {
-      iterateNullaryInlFast(byte_ptr);
-    }
-  }
-
-  void blockUpdate() override { block_update_ = true; }
-
-  void allowUpdate() override { block_update_ = false; }
-
-  void initPayload(std::uint8_t *byte_ptr) const override {
-    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
-    *count_ptr = 0;
-  }
-
   AggregationState* accumulateNullary(
       const std::size_t num_tuples) const override {
     return new AggregationStateCount(num_tuples);
   }
 
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_id) const override;
-#endif
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      const ValueAccessorMultiplexer &accessor_mux) const override;
 
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
 
-  void mergeStatesFast(const std::uint8_t *source,
-                       std::uint8_t *destination) const override;
-
   TypedValue finalize(const AggregationState &state) const override {
     return TypedValue(
         static_cast<const AggregationStateCount &>(state).count_.load(
             std::memory_order_relaxed));
   }
 
-  inline TypedValue finalizeHashTableEntry(
-      const AggregationState &state) const {
-    return TypedValue(
-        static_cast<const AggregationStateCount &>(state).count_.load(
-            std::memory_order_relaxed));
+  std::size_t getPayloadSize() const override {
+    return sizeof(std::int64_t);
+  }
+
+  void initPayload(std::uint8_t *byte_ptr) const override {
+    std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr);
+    *count_ptr = 0;
   }
 
-  inline TypedValue finalizeHashTableEntryFast(
-      const std::uint8_t *byte_ptr) const {
-    const std::int64_t *count_ptr =
-        reinterpret_cast<const std::int64_t *>(byte_ptr);
-    return TypedValue(*count_ptr);
+  void destroyPayload(std::uint8_t *byte_ptr) const override {}
+
+  inline void updateStateNullary(std::uint8_t *byte_ptr) const override {
+    if (!block_update_) {
+      iterateNullaryInl(byte_ptr);
+    }
+  }
+
+  inline void updateStateUnary(const TypedValue &argument,
+                               std::uint8_t *byte_ptr) const override {
+    if (!block_update_) {
+      iterateUnaryInl(argument, byte_ptr);
+    }
+  }
+
+  void mergeStates(const std::uint8_t *source,
+                   std::uint8_t *destination) const override;
+
+  inline TypedValue finalizeHashTableEntry(const std::uint8_t *byte_ptr) const {
+    return TypedValue(*reinterpret_cast<const std::int64_t *>(byte_ptr));
   }
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
-   *        for SUM aggregation.
-   */
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
+      const AggregationStateHashTableBase &distinctify_hash_table) const override;
 
-  /**
-   * @brief Implementation of
-   *        AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
-   *        for SUM aggregation.
-   */
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
-
-  std::size_t getPayloadSize() const override { return sizeof(std::int64_t); }
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const override;
 
  private:
   friend class AggregateFunctionCount;
 
   /**
-   * @brief Constructor.
+   * @brief Initialize handle for count.
+   *
+   * @param argument_type Type of the value to be counted. The parameter should
+   *        be nullptr for nullary aggregation (i.e. COUNT(*)).
    **/
-  AggregationHandleCount() : block_update_(false) {}
+  explicit AggregationHandleCount(const Type *argument_type)
+      : AggregationConcreteHandle(AggregationID::kCount),
+        argument_type_(argument_type) {}
 
-  bool block_update_;
+  const Type *argument_type_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
deleted file mode 100644
index 0dc8b56..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-#include <utility>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-
-#include "types/TypedValue.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class ColumnVector;
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-AggregationStateHashTableBase* AggregationHandleDistinct::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type*> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return createDistinctifyHashTable(
-      hash_table_impl,
-      group_by_types,
-      estimated_num_groups,
-      storage_manager);
-}
-
-void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(argument_ids.size(), 0u);
-
-  insertValueAccessorIntoDistinctifyHashTable(
-      accessor,
-      group_by_key_ids,
-      hash_table);
-}
-
-ColumnVector* AggregationHandleDistinct::finalizeHashTable(
-    const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  DCHECK(group_by_keys->empty());
-
-  const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
-                                               const bool &dumb_placeholder) -> void {
-    group_by_keys->emplace_back(std::move(group_by_key));
-  };
-  static_cast<const AggregationStateFastHashTable&>(hash_table).forEachCompositeKeyFast(&keys_retriever);
-
-  return nullptr;
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
deleted file mode 100644
index 838bfdd..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
-#include "storage/HashTableBase.hpp"
-#include "types/TypedValue.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class AggregationState;
-class ColumnVector;
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-/** \addtogroup Expressions
- *  @{
- */
-
-class AggregationHandleDistinct : public AggregationConcreteHandle {
- public:
-  /**
-   * @brief Constructor.
-   **/
-  AggregationHandleDistinct() {}
-
-  AggregationState* createInitialState() const override {
-    LOG(FATAL)
-        << "AggregationHandleDistinct does not support createInitialState().";
-  }
-
-  AggregationState* accumulateNullary(
-      const std::size_t num_tuples) const override {
-    LOG(FATAL)
-        << "AggregationHandleDistinct does not support accumulateNullary().";
-  }
-
-  AggregationState* accumulateColumnVectors(
-      const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
-      const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-                  "accumulateColumnVectors().";
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &accessor_ids) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-                  "accumulateValueAccessor().";
-  }
-#endif
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support mergeStates().";
-  }
-
-  TypedValue finalize(const AggregationState &state) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support finalize().";
-  }
-
-  AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-               << "aggregateOnDistinctifyHashTableForSingle().";
-  }
-
-  void aggregateOnDistinctifyHashTableForGroupBy(
-      const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *groupby_hash_table,
-      std::size_t index) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-               << "aggregateOnDistinctifyHashTableForGroupBy().";
-  }
-
-  AggregationStateHashTableBase* createGroupByHashTable(
-      const HashTableImplType hash_table_impl,
-      const std::vector<const Type *> &group_by_types,
-      const std::size_t estimated_num_groups,
-      StorageManager *storage_manager) const override;
-
-  void aggregateValueAccessorIntoHashTable(
-      ValueAccessor *accessor,
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &group_by_key_ids,
-      AggregationStateHashTableBase *hash_table) const override;
-
-  ColumnVector* finalizeHashTable(
-      const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index c2d571b..fe1773f 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -19,15 +19,16 @@
 
 #include "expressions/aggregation/AggregationHandleMax.hpp"
 
+#include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
-#include "types/containers/ColumnVector.hpp"
 #include "types/operations/comparisons/Comparison.hpp"
 #include "types/operations/comparisons/ComparisonFactory.hpp"
 #include "types/operations/comparisons/ComparisonID.hpp"
@@ -36,54 +37,32 @@
 
 namespace quickstep {
 
-class StorageManager;
+class ColumnVector;
 
 AggregationHandleMax::AggregationHandleMax(const Type &type)
-    : type_(type), block_update_(false) {
+    : AggregationConcreteHandle(AggregationID::kMax),
+      type_(type) {
   fast_comparator_.reset(
       ComparisonFactory::GetComparison(ComparisonID::kGreater)
           .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));
 }
 
-AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable(
-    const HashTableImplType hash_table_impl,
-    const std::vector<const Type *> &group_by_types,
-    const std::size_t estimated_num_groups,
-    StorageManager *storage_manager) const {
-  return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable(
-      hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
+AggregationState* AggregationHandleMax::accumulateValueAccessor(
+    const std::vector<MultiSourceAttributeId> &argument_ids,
+    const ValueAccessorMultiplexer &accessor_mux) const {
+  DCHECK_EQ(1u, argument_ids.size())
+      << "Got wrong number of attributes for MAX: " << argument_ids.size();
 
-AggregationState* AggregationHandleMax::accumulateColumnVectors(
-    const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
-  DCHECK_EQ(1u, column_vectors.size())
-      << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size();
+  const ValueAccessorSource argument_source = argument_ids.front().source;
+  const attribute_id argument_id = argument_ids.front().attr_id;
 
-  return new AggregationStateMax(fast_comparator_->accumulateColumnVector(
-      type_.getNullableVersion().makeNullValue(), *column_vectors.front()));
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleMax::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &accessor_ids) const {
-  DCHECK_EQ(1u, accessor_ids.size())
-      << "Got wrong number of attributes for MAX: " << accessor_ids.size();
+  DCHECK(argument_source != ValueAccessorSource::kInvalid);
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
   return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
       type_.getNullableVersion().makeNullValue(),
-      accessor,
-      accessor_ids.front()));
-}
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
-void AggregationHandleMax::aggregateValueAccessorIntoHashTable(
-    ValueAccessor *accessor,
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &group_by_key_ids,
-    AggregationStateHashTableBase *hash_table) const {
-  DCHECK_EQ(1u, argument_ids.size())
-      << "Got wrong number of arguments for MAX: " << argument_ids.size();
+      accessor_mux.getValueAccessorBySource(argument_source),
+      argument_id));
 }
 
 void AggregationHandleMax::mergeStates(const AggregationState &source,
@@ -98,40 +77,36 @@ void AggregationHandleMax::mergeStates(const AggregationState &source,
   }
 }
 
-void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source,
-                                           std::uint8_t *destination) const {
+void AggregationHandleMax::mergeStates(const std::uint8_t *source,
+                                       std::uint8_t *destination) const {
   const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source);
   TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination);
   if (!(src_max_ptr->isNull())) {
-    compareAndUpdateFast(dst_max_ptr, *src_max_ptr);
+    compareAndUpdate(dst_max_ptr, *src_max_ptr);
   }
 }
 
 ColumnVector* AggregationHandleMax::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelperFast<AggregationHandleMax,
-                                     AggregationStateFastHashTable>(
-      type_.getNullableVersion(), hash_table, group_by_keys, index);
+    const std::size_t index,
+    std::vector<std::vector<TypedValue>> *group_by_keys) const {
+  return finalizeHashTableHelper<AggregationHandleMax>(
+      type_, hash_table, index, group_by_keys);
 }
 
-AggregationState*
-AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
+AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
-  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
-      AggregationHandleMax,
-      AggregationStateMax>(distinctify_hash_table);
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
+      AggregationHandleMax, AggregationStateMax>(
+          distinctify_hash_table);
 }
 
 void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
-      AggregationHandleMax,
-      AggregationStateFastHashTable>(
-      distinctify_hash_table, aggregation_hash_table, index);
+    const std::size_t index,
+    AggregationStateHashTableBase *aggregation_hash_table) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleMax>(
+      distinctify_hash_table, index, aggregation_hash_table);
 }
 
 }  // namespace quickstep